aboutsummaryrefslogtreecommitdiffstats
path: root/libpathod/utils.py
blob: 8aec9d19ddec567d89db3bed5a8323d06d6635a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import sys
from netlib import tcp

SSLVERSIONS = {
    'TLSv1.2': tcp.TLSv1_2_METHOD,
    'TLSv1.1': tcp.TLSv1_1_METHOD,
    'TLSv1': tcp.TLSv1_METHOD,
    'SSLv3': tcp.SSLv3_METHOD,
    'SSLv2': tcp.SSLv2_METHOD,
    'SSLv23': tcp.SSLv23_METHOD,
}

SIZE_UNITS = dict(
    b=1024 ** 0,
    k=1024 ** 1,
    m=1024 ** 2,
    g=1024 ** 3,
    t=1024 ** 4,
)


class MemBool(object):

    """
        Truth-checking with a memory, for use in chained if statements.
    """

    def __init__(self):
        self.v = None

    def __call__(self, v):
        self.v = v
        return bool(v)


def parse_size(s):
    try:
        return int(s)
    except ValueError:
        pass
    for i in SIZE_UNITS.keys():
        if s.endswith(i):
            try:
                return int(s[:-1]) * SIZE_UNITS[i]
            except ValueError:
                break
    raise ValueError("Invalid size specification.")


def parse_anchor_spec(s):
    """
        Return a tuple, or None on error.
    """
    if "=" not in s:
        return None
    return tuple(s.split("=", 1))


def xrepr(s):
    return repr(s)[1:-1]


def inner_repr(s):
    """
        Returns the inner portion of a string or unicode repr (i.e. without the
        quotes)
    """
    if isinstance(s, unicode):
        return repr(s)[2:-1]
    else:
        return repr(s)[1:-1]


def escape_unprintables(s):
    """
        Like inner_repr, but preserves line breaks.
    """
    s = s.replace("\r\n", "PATHOD_MARKER_RN")
    s = s.replace("\n", "PATHOD_MARKER_N")
    s = inner_repr(s)
    s = s.replace("PATHOD_MARKER_RN", "\n")
    s = s.replace("PATHOD_MARKER_N", "\n")
    return s


class Data(object):

    def __init__(self, name):
        m = __import__(name)
        dirname, _ = os.path.split(m.__file__)
        self.dirname = os.path.abspath(dirname)

    def path(self, path):
        """
            Returns a path to the package data housed at 'path' under this
            module.Path can be a path to a file, or to a directory.

            This function will raise ValueError if the path does not exist.
        """
        fullpath = os.path.join(self.dirname, path)
        if not os.path.exists(fullpath):
            raise ValueError("dataPath: %s does not exist." % fullpath)
        return fullpath


data = Data(__name__)


def daemonize(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):  # pragma: nocover
    try:
        pid = os.fork()
        if pid > 0:
            sys.exit(0)
    except OSError as e:
        sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror))
        sys.exit(1)
    os.chdir("/")
    os.umask(0)
    os.setsid()
    try:
        pid = os.fork()
        if pid > 0:
            sys.exit(0)
    except OSError as e:
        sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror))
        sys.exit(1)
    si = open(stdin, 'rb')
    so = open(stdout, 'a+b')
    se = open(stderr, 'a+b', 0)
    os.dup2(si.fileno(), sys.stdin.fileno())
    os.dup2(so.fileno(), sys.stdout.fileno())
    os.dup2(se.fileno(), sys.stderr.fileno())