aboutsummaryrefslogtreecommitdiffstats
path: root/netlib
diff options
context:
space:
mode:
authorSam Cleveland <sam@zombisoft.com>2015-11-11 11:32:02 -0600
committerSam Cleveland <sam@zombisoft.com>2015-11-11 11:32:02 -0600
commit823718348598efb324298ca29ad4cb7d5097c084 (patch)
tree104268236bb329593de27c053ea1498d8a5a5181 /netlib
parent9cab9ee5d6f39b658c1e9260950cc3575d3ad9db (diff)
downloadmitmproxy-823718348598efb324298ca29ad4cb7d5097c084.tar.gz
mitmproxy-823718348598efb324298ca29ad4cb7d5097c084.tar.bz2
mitmproxy-823718348598efb324298ca29ad4cb7d5097c084.zip
Porting netlib to python3.4
Updated utils.py using 2to3-3.4 Updated hexdump to use .format() with .encode() to support python 3.4 Python 3.5 supports .format() on bytes objects, but 3.4 is the current default on Ubuntu. samc$ py.test netlib/test/test_utils.py = test session starts = platform darwin -- Python 3.4.1, pytest-2.8.2, py-1.4.30, pluggy-0.3.1 rootdir: /Users/samc/src/python/netlib, inifile: collected 11 items netlib/test/test_utils.py ........... = 11 passed in 0.19 seconds =
Diffstat (limited to 'netlib')
-rw-r--r--netlib/utils.py16
-rw-r--r--netlib/utils.py.bak368
2 files changed, 376 insertions, 8 deletions
diff --git a/netlib/utils.py b/netlib/utils.py
index acc7ccd4..62f17012 100644
--- a/netlib/utils.py
+++ b/netlib/utils.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import, print_function, division
+
import os.path
import re
import string
@@ -61,11 +61,11 @@ def clean_bin(s, keep_spacing=True):
"""
if isinstance(s, six.text_type):
if keep_spacing:
- keep = u" \n\r\t"
+ keep = " \n\r\t"
else:
- keep = u" "
- return u"".join(
- ch if (unicodedata.category(ch)[0] not in "CZ" or ch in keep) else u"."
+ keep = " "
+ return "".join(
+ ch if (unicodedata.category(ch)[0] not in "CZ" or ch in keep) else "."
for ch in s
)
else:
@@ -85,9 +85,9 @@ def hexdump(s):
A generator of (offset, hex, str) tuples
"""
for i in range(0, len(s), 16):
- offset = b"%.10x" % i
+ offset = "{:0=10x}".format(i).encode()
part = s[i:i + 16]
- x = b" ".join(b"%.2x" % i for i in six.iterbytes(part))
+ x = b" ".join("{:0=2x}".format(i).encode() for i in six.iterbytes(part))
x = x.ljust(47) # 16*2 + 15
yield (offset, x, clean_bin(part, False))
@@ -122,7 +122,7 @@ class BiDi(object):
def __init__(self, **kwargs):
self.names = kwargs
self.values = {}
- for k, v in kwargs.items():
+ for k, v in list(kwargs.items()):
self.values[v] = k
if len(self.names) != len(self.values):
raise ValueError("Duplicate values not allowed.")
diff --git a/netlib/utils.py.bak b/netlib/utils.py.bak
new file mode 100644
index 00000000..acc7ccd4
--- /dev/null
+++ b/netlib/utils.py.bak
@@ -0,0 +1,368 @@
+from __future__ import absolute_import, print_function, division
+import os.path
+import re
+import string
+import unicodedata
+
+import six
+
+from six.moves import urllib
+
+
+def always_bytes(unicode_or_bytes, *encode_args):
+ if isinstance(unicode_or_bytes, six.text_type):
+ return unicode_or_bytes.encode(*encode_args)
+ return unicode_or_bytes
+
+
+def always_byte_args(*encode_args):
+ """Decorator that transparently encodes all arguments passed as unicode"""
+ def decorator(fun):
+ def _fun(*args, **kwargs):
+ args = [always_bytes(arg, *encode_args) for arg in args]
+ kwargs = {k: always_bytes(v, *encode_args) for k, v in six.iteritems(kwargs)}
+ return fun(*args, **kwargs)
+ return _fun
+ return decorator
+
+
+def native(s, *encoding_opts):
+ """
+ Convert :py:class:`bytes` or :py:class:`unicode` to the native
+ :py:class:`str` type, using latin1 encoding if conversion is necessary.
+
+ https://www.python.org/dev/peps/pep-3333/#a-note-on-string-types
+ """
+ if not isinstance(s, (six.binary_type, six.text_type)):
+ raise TypeError("%r is neither bytes nor unicode" % s)
+ if six.PY3:
+ if isinstance(s, six.binary_type):
+ return s.decode(*encoding_opts)
+ else:
+ if isinstance(s, six.text_type):
+ return s.encode(*encoding_opts)
+ return s
+
+
+def isascii(bytes):
+ try:
+ bytes.decode("ascii")
+ except ValueError:
+ return False
+ return True
+
+
+def clean_bin(s, keep_spacing=True):
+ """
+ Cleans binary data to make it safe to display.
+
+ Args:
+ keep_spacing: If False, tabs and newlines will also be replaced.
+ """
+ if isinstance(s, six.text_type):
+ if keep_spacing:
+ keep = u" \n\r\t"
+ else:
+ keep = u" "
+ return u"".join(
+ ch if (unicodedata.category(ch)[0] not in "CZ" or ch in keep) else u"."
+ for ch in s
+ )
+ else:
+ if keep_spacing:
+ keep = (9, 10, 13) # \t, \n, \r,
+ else:
+ keep = ()
+ return b"".join(
+ six.int2byte(ch) if (31 < ch < 127 or ch in keep) else b"."
+ for ch in six.iterbytes(s)
+ )
+
+
+def hexdump(s):
+ """
+ Returns:
+ A generator of (offset, hex, str) tuples
+ """
+ for i in range(0, len(s), 16):
+ offset = b"%.10x" % i
+ part = s[i:i + 16]
+ x = b" ".join(b"%.2x" % i for i in six.iterbytes(part))
+ x = x.ljust(47) # 16*2 + 15
+ yield (offset, x, clean_bin(part, False))
+
+
+def setbit(byte, offset, value):
+ """
+ Set a bit in a byte to 1 if value is truthy, 0 if not.
+ """
+ if value:
+ return byte | (1 << offset)
+ else:
+ return byte & ~(1 << offset)
+
+
+def getbit(byte, offset):
+ mask = 1 << offset
+ return bool(byte & mask)
+
+
+class BiDi(object):
+
+ """
+ A wee utility class for keeping bi-directional mappings, like field
+ constants in protocols. Names are attributes on the object, dict-like
+ access maps values to names:
+
+ CONST = BiDi(a=1, b=2)
+ assert CONST.a == 1
+ assert CONST.get_name(1) == "a"
+ """
+
+ def __init__(self, **kwargs):
+ self.names = kwargs
+ self.values = {}
+ for k, v in kwargs.items():
+ self.values[v] = k
+ if len(self.names) != len(self.values):
+ raise ValueError("Duplicate values not allowed.")
+
+ def __getattr__(self, k):
+ if k in self.names:
+ return self.names[k]
+ raise AttributeError("No such attribute: %s", k)
+
+ def get_name(self, n, default=None):
+ return self.values.get(n, default)
+
+
+def pretty_size(size):
+ suffixes = [
+ ("B", 2 ** 10),
+ ("kB", 2 ** 20),
+ ("MB", 2 ** 30),
+ ]
+ for suf, lim in suffixes:
+ if size >= lim:
+ continue
+ else:
+ x = round(size / float(lim / 2 ** 10), 2)
+ if x == int(x):
+ x = int(x)
+ return str(x) + suf
+
+
+class Data(object):
+
+ def __init__(self, name):
+ m = __import__(name)
+ dirname, _ = os.path.split(m.__file__)
+ self.dirname = os.path.abspath(dirname)
+
+ def path(self, path):
+ """
+ Returns a path to the package data housed at 'path' under this
+ module.Path can be a path to a file, or to a directory.
+
+ This function will raise ValueError if the path does not exist.
+ """
+ fullpath = os.path.join(self.dirname, '../test/', path)
+ if not os.path.exists(fullpath):
+ raise ValueError("dataPath: %s does not exist." % fullpath)
+ return fullpath
+
+
+_label_valid = re.compile(b"(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
+
+
+def is_valid_host(host):
+ """
+ Checks if a hostname is valid.
+
+ Args:
+ host (bytes): The hostname
+ """
+ try:
+ host.decode("idna")
+ except ValueError:
+ return False
+ if len(host) > 255:
+ return False
+ if host[-1] == b".":
+ host = host[:-1]
+ return all(_label_valid.match(x) for x in host.split(b"."))
+
+
+def is_valid_port(port):
+ return 0 <= port <= 65535
+
+
+# PY2 workaround
+def decode_parse_result(result, enc):
+ if hasattr(result, "decode"):
+ return result.decode(enc)
+ else:
+ return urllib.parse.ParseResult(*[x.decode(enc) for x in result])
+
+
+# PY2 workaround
+def encode_parse_result(result, enc):
+ if hasattr(result, "encode"):
+ return result.encode(enc)
+ else:
+ return urllib.parse.ParseResult(*[x.encode(enc) for x in result])
+
+
+def parse_url(url):
+ """
+ URL-parsing function that checks that
+ - port is an integer 0-65535
+ - host is a valid IDNA-encoded hostname with no null-bytes
+ - path is valid ASCII
+
+ Args:
+ A URL (as bytes or as unicode)
+
+ Returns:
+ A (scheme, host, port, path) tuple
+
+ Raises:
+ ValueError, if the URL is not properly formatted.
+ """
+ parsed = urllib.parse.urlparse(url)
+
+ if not parsed.hostname:
+ raise ValueError("No hostname given")
+
+ if isinstance(url, six.binary_type):
+ host = parsed.hostname
+
+ # this should not raise a ValueError,
+ # but we try to be very forgiving here and accept just everything.
+ # decode_parse_result(parsed, "ascii")
+ else:
+ host = parsed.hostname.encode("idna")
+ parsed = encode_parse_result(parsed, "ascii")
+
+ port = parsed.port
+ if not port:
+ port = 443 if parsed.scheme == b"https" else 80
+
+ full_path = urllib.parse.urlunparse(
+ (b"", b"", parsed.path, parsed.params, parsed.query, parsed.fragment)
+ )
+ if not full_path.startswith(b"/"):
+ full_path = b"/" + full_path
+
+ if not is_valid_host(host):
+ raise ValueError("Invalid Host")
+ if not is_valid_port(port):
+ raise ValueError("Invalid Port")
+
+ return parsed.scheme, host, port, full_path
+
+
+def get_header_tokens(headers, key):
+ """
+ Retrieve all tokens for a header key. A number of different headers
+ follow a pattern where each header line can containe comma-separated
+ tokens, and headers can be set multiple times.
+ """
+ if key not in headers:
+ return []
+ tokens = headers[key].split(",")
+ return [token.strip() for token in tokens]
+
+
+def hostport(scheme, host, port):
+ """
+ Returns the host component, with a port specifcation if needed.
+ """
+ if (port, scheme) in [(80, "http"), (443, "https"), (80, b"http"), (443, b"https")]:
+ return host
+ else:
+ if isinstance(host, six.binary_type):
+ return b"%s:%d" % (host, port)
+ else:
+ return "%s:%d" % (host, port)
+
+
+def unparse_url(scheme, host, port, path=""):
+ """
+ Returns a URL string, constructed from the specified components.
+
+ Args:
+ All args must be str.
+ """
+ return "%s://%s%s" % (scheme, hostport(scheme, host, port), path)
+
+
+def urlencode(s):
+ """
+ Takes a list of (key, value) tuples and returns a urlencoded string.
+ """
+ s = [tuple(i) for i in s]
+ return urllib.parse.urlencode(s, False)
+
+
+def urldecode(s):
+ """
+ Takes a urlencoded string and returns a list of (key, value) tuples.
+ """
+ return urllib.parse.parse_qsl(s, keep_blank_values=True)
+
+
+def parse_content_type(c):
+ """
+ A simple parser for content-type values. Returns a (type, subtype,
+ parameters) tuple, where type and subtype are strings, and parameters
+ is a dict. If the string could not be parsed, return None.
+
+ E.g. the following string:
+
+ text/html; charset=UTF-8
+
+ Returns:
+
+ ("text", "html", {"charset": "UTF-8"})
+ """
+ parts = c.split(";", 1)
+ ts = parts[0].split("/", 1)
+ if len(ts) != 2:
+ return None
+ d = {}
+ if len(parts) == 2:
+ for i in parts[1].split(";"):
+ clause = i.split("=", 1)
+ if len(clause) == 2:
+ d[clause[0].strip()] = clause[1].strip()
+ return ts[0].lower(), ts[1].lower(), d
+
+
+def multipartdecode(headers, content):
+ """
+ Takes a multipart boundary encoded string and returns list of (key, value) tuples.
+ """
+ v = headers.get("content-type")
+ if v:
+ v = parse_content_type(v)
+ if not v:
+ return []
+ try:
+ boundary = v[2]["boundary"].encode("ascii")
+ except (KeyError, UnicodeError):
+ return []
+
+ rx = re.compile(br'\bname="([^"]+)"')
+ r = []
+
+ for i in content.split(b"--" + boundary):
+ parts = i.splitlines()
+ if len(parts) > 1 and parts[0][0:2] != b"--":
+ match = rx.search(parts[1])
+ if match:
+ key = match.group(1)
+ value = b"".join(parts[3 + parts[2:].index(b""):])
+ r.append((key, value))
+ return r
+ return []