diff options
authorMaximilian Hils <git@maximilianhils.com>2016-07-05 19:25:56 -0700
committerMaximilian Hils <git@maximilianhils.com>2016-07-06 19:55:48 -0700
commit48ee3a553e30b36c16bfbe1674d3313605dff661 (patch)
parent684b4b5130aa9cc75322dd270172b263615d39dc (diff)
add tnetstring unicode type
6 files changed, 251 insertions, 750 deletions
diff --git a/mitmproxy/contrib/py2/__init__.py b/mitmproxy/contrib/py2/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/mitmproxy/contrib/py2/__init__.py
+++ /dev/null
diff --git a/mitmproxy/contrib/py2/tnetstring.py b/mitmproxy/contrib/py2/tnetstring.py
deleted file mode 100644
index 9bf20b09..00000000
--- a/mitmproxy/contrib/py2/tnetstring.py
+++ /dev/null
@@ -1,375 +0,0 @@
-# imported from the tnetstring project: https://github.com/rfk/tnetstring
-# Copyright (c) 2011 Ryan Kelly
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-tnetstring: data serialization using typed netstrings
-This is a data serialization library. It's a lot like JSON but it uses a
-new syntax called "typed netstrings" that Zed has proposed for use in the
-Mongrel2 webserver. It's designed to be simpler and easier to implement
-than JSON, with a happy consequence of also being faster in many cases.
-An ordinary netstring is a blob of data prefixed with its length and postfixed
-with a sanity-checking comma. The string "hello world" encodes like this::
- 11:hello world,
-Typed netstrings add other datatypes by replacing the comma with a type tag.
-Here's the integer 12345 encoded as a tnetstring::
- 5:12345#
-And here's the list [12345,True,0] which mixes integers and bools::
- 19:5:12345#4:true!1:0#]
-Simple enough? This module gives you the following functions:
- :dump: dump an object as a tnetstring to a file
- :dumps: dump an object as a tnetstring to a string
- :load: load a tnetstring-encoded object from a file
- :loads: load a tnetstring-encoded object from a string
- :pop: pop a tnetstring-encoded object from the front of a string
-Note that since parsing a tnetstring requires reading all the data into memory
-at once, there's no efficiency gain from using the file-based versions of these
-functions. They're only here so you can use load() to read precisely one
-item from a file or socket without consuming any extra data.
-By default tnetstrings work only with byte strings, not unicode. If you want
-unicode strings then pass an optional encoding to the various functions,
-like so::
- >>> print(repr(tnetstring.loads("2:\\xce\\xb1,")))
- '\\xce\\xb1'
- >>>
- >>> print(repr(tnetstring.loads("2:\\xce\\xb1,","utf8")))
- u'\u03b1'
-from collections import deque
-import six
-__ver_major__ = 0
-__ver_minor__ = 2
-__ver_patch__ = 0
-__ver_sub__ = ""
-__version__ = "%d.%d.%d%s" % (
- __ver_major__, __ver_minor__, __ver_patch__, __ver_sub__)
-def dumps(value):
- """
- This function dumps a python object as a tnetstring.
- """
- # This uses a deque to collect output fragments in reverse order,
- # then joins them together at the end. It's measurably faster
- # than creating all the intermediate strings.
- # If you're reading this to get a handle on the tnetstring format,
- # consider the _gdumps() function instead; it's a standard top-down
- # generator that's simpler to understand but much less efficient.
- q = deque()
- _rdumpq(q, 0, value)
- return b''.join(q)
-def dump(value, file_handle):
- """
- This function dumps a python object as a tnetstring and
- writes it to the given file.
- """
- file_handle.write(dumps(value))
-def _rdumpq(q, size, value):
- """
- Dump value as a tnetstring, to a deque instance, last chunks first.
- This function generates the tnetstring representation of the given value,
- pushing chunks of the output onto the given deque instance. It pushes
- the last chunk first, then recursively generates more chunks.
- When passed in the current size of the string in the queue, it will return
- the new size of the string in the queue.
- Operating last-chunk-first makes it easy to calculate the size written
- for recursive structures without having to build their representation as
- a string. This is measurably faster than generating the intermediate
- strings, especially on deeply nested structures.
- """
- write = q.appendleft
- if value is None:
- write(b'0:~')
- return size + 3
- elif value is True:
- write(b'4:true!')
- return size + 7
- elif value is False:
- write(b'5:false!')
- return size + 8
- elif isinstance(value, six.integer_types):
- data = str(value).encode()
- ldata = len(data)
- span = str(ldata).encode()
- write(b'#')
- write(data)
- write(b':')
- write(span)
- return size + 2 + len(span) + ldata
- elif isinstance(value, float):
- # Use repr() for float rather than str().
- # It round-trips more accurately.
- # Probably unnecessary in later python versions that
- # use David Gay's ftoa routines.
- data = repr(value).encode()
- ldata = len(data)
- span = str(ldata).encode()
- write(b'^')
- write(data)
- write(b':')
- write(span)
- return size + 2 + len(span) + ldata
- elif isinstance(value, bytes):
- lvalue = len(value)
- span = str(lvalue).encode()
- write(b',')
- write(value)
- write(b':')
- write(span)
- return size + 2 + len(span) + lvalue
- elif isinstance(value, (list, tuple)):
- write(b']')
- init_size = size = size + 1
- for item in reversed(value):
- size = _rdumpq(q, size, item)
- span = str(size - init_size).encode()
- write(b':')
- write(span)
- return size + 1 + len(span)
- elif isinstance(value, dict):
- write(b'}')
- init_size = size = size + 1
- for (k, v) in value.items():
- size = _rdumpq(q, size, v)
- size = _rdumpq(q, size, k)
- span = str(size - init_size).encode()
- write(b':')
- write(span)
- return size + 1 + len(span)
- else:
- raise ValueError("unserializable object: {} ({})".format(value, type(value)))
-def _gdumps(value):
- """
- Generate fragments of value dumped as a tnetstring.
- This is the naive dumping algorithm, implemented as a generator so that
- it's easy to pass to "".join() without building a new list.
- This is mainly here for comparison purposes; the _rdumpq version is
- measurably faster as it doesn't have to build intermediate strins.
- """
- if value is None:
- yield b'0:~'
- elif value is True:
- yield b'4:true!'
- elif value is False:
- yield b'5:false!'
- elif isinstance(value, six.integer_types):
- data = str(value).encode()
- yield str(len(data)).encode()
- yield b':'
- yield data
- yield b'#'
- elif isinstance(value, float):
- data = repr(value).encode()
- yield str(len(data)).encode()
- yield b':'
- yield data
- yield b'^'
- elif isinstance(value, bytes):
- yield str(len(value)).encode()
- yield b':'
- yield value
- yield b','
- elif isinstance(value, (list, tuple)):
- sub = []
- for item in value:
- sub.extend(_gdumps(item))
- sub = b''.join(sub)
- yield str(len(sub)).encode()
- yield b':'
- yield sub
- yield b']'
- elif isinstance(value, (dict,)):
- sub = []
- for (k, v) in value.items():
- sub.extend(_gdumps(k))
- sub.extend(_gdumps(v))
- sub = b''.join(sub)
- yield str(len(sub)).encode()
- yield b':'
- yield sub
- yield b'}'
- else:
- raise ValueError("unserializable object")
-def loads(string):
- """
- This function parses a tnetstring into a python object.
- """
- # No point duplicating effort here. In the C-extension version,
- # loads() is measurably faster then pop() since it can avoid
- # the overhead of building a second string.
- return pop(string)[0]
-def load(file_handle):
- """load(file) -> object
- This function reads a tnetstring from a file and parses it into a
- python object. The file must support the read() method, and this
- function promises not to read more data than necessary.
- """
- # Read the length prefix one char at a time.
- # Note that the netstring spec explicitly forbids padding zeros.
- c = file_handle.read(1)
- if not c.isdigit():
- raise ValueError("not a tnetstring: missing or invalid length prefix")
- datalen = ord(c) - ord('0')
- c = file_handle.read(1)
- if datalen != 0:
- while c.isdigit():
- datalen = (10 * datalen) + (ord(c) - ord('0'))
- if datalen > 999999999:
- errmsg = "not a tnetstring: absurdly large length prefix"
- raise ValueError(errmsg)
- c = file_handle.read(1)
- if c != b':':
- raise ValueError("not a tnetstring: missing or invalid length prefix")
- # Now we can read and parse the payload.
- # This repeats the dispatch logic of pop() so we can avoid
- # re-constructing the outermost tnetstring.
- data = file_handle.read(datalen)
- if len(data) != datalen:
- raise ValueError("not a tnetstring: length prefix too big")
- tns_type = file_handle.read(1)
- if tns_type == b',':
- return data
- if tns_type == b'#':
- try:
- return int(data)
- except ValueError:
- raise ValueError("not a tnetstring: invalid integer literal")
- if tns_type == b'^':
- try:
- return float(data)
- except ValueError:
- raise ValueError("not a tnetstring: invalid float literal")
- if tns_type == b'!':
- if data == b'true':
- return True
- elif data == b'false':
- return False
- else:
- raise ValueError("not a tnetstring: invalid boolean literal")
- if tns_type == b'~':
- if data:
- raise ValueError("not a tnetstring: invalid null literal")
- return None
- if tns_type == b']':
- l = []
- while data:
- item, data = pop(data)
- l.append(item)
- return l
- if tns_type == b'}':
- d = {}
- while data:
- key, data = pop(data)
- val, data = pop(data)
- d[key] = val
- return d
- raise ValueError("unknown type tag")
-def pop(string):
- """pop(string,encoding='utf_8') -> (object, remain)
- This function parses a tnetstring into a python object.
- It returns a tuple giving the parsed object and a string
- containing any unparsed data from the end of the string.
- """
- # Parse out data length, type and remaining string.
- try:
- dlen, rest = string.split(b':', 1)
- dlen = int(dlen)
- except ValueError:
- raise ValueError("not a tnetstring: missing or invalid length prefix: {}".format(string))
- try:
- data, tns_type, remain = rest[:dlen], rest[dlen:dlen + 1], rest[dlen + 1:]
- except IndexError:
- # This fires if len(rest) < dlen, meaning we don't need
- # to further validate that data is the right length.
- raise ValueError("not a tnetstring: invalid length prefix: {}".format(dlen))
- # Parse the data based on the type tag.
- if tns_type == b',':
- return data, remain
- if tns_type == b'#':
- try:
- return int(data), remain
- except ValueError:
- raise ValueError("not a tnetstring: invalid integer literal: {}".format(data))
- if tns_type == b'^':
- try:
- return float(data), remain
- except ValueError:
- raise ValueError("not a tnetstring: invalid float literal: {}".format(data))
- if tns_type == b'!':
- if data == b'true':
- return True, remain
- elif data == b'false':
- return False, remain
- else:
- raise ValueError("not a tnetstring: invalid boolean literal: {}".format(data))
- if tns_type == b'~':
- if data:
- raise ValueError("not a tnetstring: invalid null literal")
- return None, remain
- if tns_type == b']':
- l = []
- while data:
- item, data = pop(data)
- l.append(item)
- return (l, remain)
- if tns_type == b'}':
- d = {}
- while data:
- key, data = pop(data)
- val, data = pop(data)
- d[key] = val
- return d, remain
- raise ValueError("unknown type tag: {}".format(tns_type))
diff --git a/mitmproxy/contrib/py3/__init__.py b/mitmproxy/contrib/py3/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/mitmproxy/contrib/py3/__init__.py
+++ /dev/null
diff --git a/mitmproxy/contrib/py3/tnetstring.py b/mitmproxy/contrib/py3/tnetstring.py
deleted file mode 100644
index 6998fc82..00000000
--- a/mitmproxy/contrib/py3/tnetstring.py
+++ /dev/null
@@ -1,237 +0,0 @@
-tnetstring: data serialization using typed netstrings
-This is a custom Python 3 implementation of tnetstrings.
-Compared to other implementations, the main difference
-is the conversion of dictionary keys to str.
-An ordinary tnetstring is a blob of data prefixed with its length and postfixed
-with its type. Here are some examples:
- >>> tnetstring.dumps("hello world")
- 11:hello world,
- >>> tnetstring.dumps(12345)
- 5:12345#
- >>> tnetstring.dumps([12345, True, 0])
- 19:5:12345#4:true!1:0#]
-This module gives you the following functions:
- :dump: dump an object as a tnetstring to a file
- :dumps: dump an object as a tnetstring to a string
- :load: load a tnetstring-encoded object from a file
- :loads: load a tnetstring-encoded object from a string
-Note that since parsing a tnetstring requires reading all the data into memory
-at once, there's no efficiency gain from using the file-based versions of these
-functions. They're only here so you can use load() to read precisely one
-item from a file or socket without consuming any extra data.
-The tnetstrings specification explicitly states that strings are binary blobs
-and forbids the use of unicode at the protocol level.
-**This implementation decodes dictionary keys as surrogate-escaped ASCII**,
-all other strings are returned as plain bytes.
-:Copyright: (c) 2012-2013 by Ryan Kelly <ryan@rfk.id.au>.
-:Copyright: (c) 2014 by Carlo Pires <carlopires@gmail.com>.
-:Copyright: (c) 2016 by Maximilian Hils <tnetstring3@maximilianhils.com>.
-:License: MIT
-import collections
-from typing import io, Union, Tuple
-TSerializable = Union[None, bool, int, float, bytes, list, tuple, dict]
-def dumps(value: TSerializable) -> bytes:
- """
- This function dumps a python object as a tnetstring.
- """
- # This uses a deque to collect output fragments in reverse order,
- # then joins them together at the end. It's measurably faster
- # than creating all the intermediate strings.
- q = collections.deque()
- _rdumpq(q, 0, value)
- return b''.join(q)
-def dump(value: TSerializable, file_handle: io.BinaryIO) -> None:
- """
- This function dumps a python object as a tnetstring and
- writes it to the given file.
- """
- file_handle.write(dumps(value))
-def _rdumpq(q: collections.deque, size: int, value: TSerializable) -> int:
- """
- Dump value as a tnetstring, to a deque instance, last chunks first.
- This function generates the tnetstring representation of the given value,
- pushing chunks of the output onto the given deque instance. It pushes
- the last chunk first, then recursively generates more chunks.
- When passed in the current size of the string in the queue, it will return
- the new size of the string in the queue.
- Operating last-chunk-first makes it easy to calculate the size written
- for recursive structures without having to build their representation as
- a string. This is measurably faster than generating the intermediate
- strings, especially on deeply nested structures.
- """
- write = q.appendleft
- if value is None:
- write(b'0:~')
- return size + 3
- elif value is True:
- write(b'4:true!')
- return size + 7
- elif value is False:
- write(b'5:false!')
- return size + 8
- elif isinstance(value, int):
- data = str(value).encode()
- ldata = len(data)
- span = str(ldata).encode()
- write(b'%s:%s#' % (span, data))
- return size + 2 + len(span) + ldata
- elif isinstance(value, float):
- # Use repr() for float rather than str().
- # It round-trips more accurately.
- # Probably unnecessary in later python versions that
- # use David Gay's ftoa routines.
- data = repr(value).encode()
- ldata = len(data)
- span = str(ldata).encode()
- write(b'%s:%s^' % (span, data))
- return size + 2 + len(span) + ldata
- elif isinstance(value, bytes):
- lvalue = len(value)
- span = str(lvalue).encode()
- write(b'%s:%s,' % (span, value))
- return size + 2 + len(span) + lvalue
- elif isinstance(value, (list, tuple)):
- write(b']')
- init_size = size = size + 1
- for item in reversed(value):
- size = _rdumpq(q, size, item)
- span = str(size - init_size).encode()
- write(b':')
- write(span)
- return size + 1 + len(span)
- elif isinstance(value, dict):
- write(b'}')
- init_size = size = size + 1
- for (k, v) in value.items():
- if isinstance(k, str):
- k = k.encode("ascii", "strict")
- size = _rdumpq(q, size, v)
- size = _rdumpq(q, size, k)
- span = str(size - init_size).encode()
- write(b':')
- write(span)
- return size + 1 + len(span)
- else:
- raise ValueError("unserializable object: {} ({})".format(value, type(value)))
-def loads(string: bytes) -> TSerializable:
- """
- This function parses a tnetstring into a python object.
- """
- return pop(string)[0]
-def load(file_handle: io.BinaryIO) -> TSerializable:
- """load(file) -> object
- This function reads a tnetstring from a file and parses it into a
- python object. The file must support the read() method, and this
- function promises not to read more data than necessary.
- """
- # Read the length prefix one char at a time.
- # Note that the netstring spec explicitly forbids padding zeros.
- c = file_handle.read(1)
- data_length = b""
- while c.isdigit():
- data_length += c
- if len(data_length) > 9:
- raise ValueError("not a tnetstring: absurdly large length prefix")
- c = file_handle.read(1)
- if c != b":":
- raise ValueError("not a tnetstring: missing or invalid length prefix")
- data = file_handle.read(int(data_length))
- data_type = file_handle.read(1)[0]
- return parse(data_type, data)
-def parse(data_type: int, data: bytes) -> TSerializable:
- if data_type == ord(b','):
- return data
- if data_type == ord(b'#'):
- try:
- return int(data)
- except ValueError:
- raise ValueError("not a tnetstring: invalid integer literal: {}".format(data))
- if data_type == ord(b'^'):
- try:
- return float(data)
- except ValueError:
- raise ValueError("not a tnetstring: invalid float literal: {}".format(data))
- if data_type == ord(b'!'):
- if data == b'true':
- return True
- elif data == b'false':
- return False
- else:
- raise ValueError("not a tnetstring: invalid boolean literal: {}".format(data))
- if data_type == ord(b'~'):
- if data:
- raise ValueError("not a tnetstring: invalid null literal")
- return None
- if data_type == ord(b']'):
- l = []
- while data:
- item, data = pop(data)
- l.append(item)
- return l
- if data_type == ord(b'}'):
- d = {}
- while data:
- key, data = pop(data)
- if isinstance(key, bytes):
- key = key.decode("ascii", "strict")
- val, data = pop(data)
- d[key] = val
- return d
- raise ValueError("unknown type tag: {}".format(data_type))
-def pop(data: bytes) -> Tuple[TSerializable, bytes]:
- """
- This function parses a tnetstring into a python object.
- It returns a tuple giving the parsed object and a string
- containing any unparsed data from the end of the string.
- """
- # Parse out data length, type and remaining string.
- try:
- length, data = data.split(b':', 1)
- length = int(length)
- except ValueError:
- raise ValueError("not a tnetstring: missing or invalid length prefix: {}".format(data))
- try:
- data, data_type, remain = data[:length], data[length], data[length + 1:]
- except IndexError:
- # This fires if len(data) < dlen, meaning we don't need
- # to further validate that data is the right length.
- raise ValueError("not a tnetstring: invalid length prefix: {}".format(length))
- # Parse the data based on the type tag.
- return parse(data_type, data), remain
-__all__ = ["dump", "dumps", "load", "loads", "pop"]
diff --git a/mitmproxy/contrib/py3/tnetstring_tests.py b/mitmproxy/contrib/py3/tnetstring_tests.py
deleted file mode 100644
index 4ee184d5..00000000
--- a/mitmproxy/contrib/py3/tnetstring_tests.py
+++ /dev/null
@@ -1,133 +0,0 @@
-import unittest
-import random
-import math
-import io
-from . import tnetstring
-import struct
-MAXINT = 2 ** (struct.Struct('i').size * 8 - 1) - 1
- b'0:}': {},
- b'0:]': [],
- b'51:5:hello,39:11:12345678901#4:this,4:true!0:~4:\x00\x00\x00\x00,]}':
- {'hello': [12345678901, b'this', True, None, b'\x00\x00\x00\x00']},
- b'5:12345#': 12345,
- b'12:this is cool,': b'this is cool',
- b'0:,': b'',
- b'0:~': None,
- b'4:true!': True,
- b'5:false!': False,
- b'10:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00,': b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
- b'24:5:12345#5:67890#5:xxxxx,]': [12345, 67890, b'xxxxx'],
- b'18:3:0.1^3:0.2^3:0.3^]': [0.1, 0.2, 0.3],
- b'243:238:233:228:223:218:213:208:203:198:193:188:183:178:173:168:163:158:153:148:143:138:133:128:123:118:113:108:103:99:95:91:87:83:79:75:71:67:63:59:55:51:47:43:39:35:31:27:23:19:15:11:hello-there,]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]': [[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[b'hello-there']]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]
-def get_random_object(random=random, depth=0):
- """Generate a random serializable object."""
- # The probability of generating a scalar value increases as the depth increase.
- # This ensures that we bottom out eventually.
- if random.randint(depth,10) <= 4:
- what = random.randint(0,1)
- if what == 0:
- n = random.randint(0,10)
- l = []
- for _ in range(n):
- l.append(get_random_object(random,depth+1))
- return l
- if what == 1:
- n = random.randint(0,10)
- d = {}
- for _ in range(n):
- n = random.randint(0,100)
- k = str([random.randint(32,126) for _ in range(n)])
- d[k] = get_random_object(random,depth+1)
- return d
- else:
- what = random.randint(0,4)
- if what == 0:
- return None
- if what == 1:
- return True
- if what == 2:
- return False
- if what == 3:
- if random.randint(0,1) == 0:
- return random.randint(0,MAXINT)
- else:
- return -1 * random.randint(0,MAXINT)
- n = random.randint(0,100)
- return bytes([random.randint(32,126) for _ in range(n)])
-class Test_Format(unittest.TestCase):
- def test_roundtrip_format_examples(self):
- for data, expect in FORMAT_EXAMPLES.items():
- self.assertEqual(expect,tnetstring.loads(data))
- self.assertEqual(expect,tnetstring.loads(tnetstring.dumps(expect)))
- self.assertEqual((expect,b''),tnetstring.pop(data))
- def test_roundtrip_format_random(self):
- for _ in range(500):
- v = get_random_object()
- self.assertEqual(v,tnetstring.loads(tnetstring.dumps(v)))
- self.assertEqual((v,b""),tnetstring.pop(tnetstring.dumps(v)))
- def test_unicode_handling(self):
- with self.assertRaises(ValueError):
- tnetstring.dumps("hello")
- self.assertEqual(tnetstring.dumps("hello".encode()),b"5:hello,")
- self.assertEqual(type(tnetstring.loads(b"5:hello,")),bytes)
- def test_roundtrip_format_unicode(self):
- for _ in range(500):
- v = get_random_object()
- self.assertEqual(v,tnetstring.loads(tnetstring.dumps(v)))
- self.assertEqual((v,b''),tnetstring.pop(tnetstring.dumps(v)))
- def test_roundtrip_big_integer(self):
- i1 = math.factorial(30000)
- s = tnetstring.dumps(i1)
- i2 = tnetstring.loads(s)
- self.assertEqual(i1, i2)
-class Test_FileLoading(unittest.TestCase):
- def test_roundtrip_file_examples(self):
- for data, expect in FORMAT_EXAMPLES.items():
- s = io.BytesIO()
- s.write(data)
- s.write(b'OK')
- s.seek(0)
- self.assertEqual(expect,tnetstring.load(s))
- self.assertEqual(b'OK',s.read())
- s = io.BytesIO()
- tnetstring.dump(expect,s)
- s.write(b'OK')
- s.seek(0)
- self.assertEqual(expect,tnetstring.load(s))
- self.assertEqual(b'OK',s.read())
- def test_roundtrip_file_random(self):
- for _ in range(500):
- v = get_random_object()
- s = io.BytesIO()
- tnetstring.dump(v,s)
- s.write(b'OK')
- s.seek(0)
- self.assertEqual(v,tnetstring.load(s))
- self.assertEqual(b'OK',s.read())
- def test_error_on_absurd_lengths(self):
- s = io.BytesIO()
- s.write(b'1000000000:pwned!,')
- s.seek(0)
- with self.assertRaises(ValueError):
- tnetstring.load(s)
- self.assertEqual(s.read(1),b':')
-def suite():
- loader = unittest.TestLoader()
- suite = unittest.TestSuite()
- suite.addTest(loader.loadTestsFromTestCase(Test_Format))
- suite.addTest(loader.loadTestsFromTestCase(Test_FileLoading))
- return suite \ No newline at end of file
diff --git a/mitmproxy/contrib/tnetstring.py b/mitmproxy/contrib/tnetstring.py
index 1ebaba21..5fc26b45 100644
--- a/mitmproxy/contrib/tnetstring.py
+++ b/mitmproxy/contrib/tnetstring.py
@@ -1,8 +1,254 @@
+tnetstring: data serialization using typed netstrings
+This is a custom Python 3 implementation of tnetstrings.
+Compared to other implementations, the main difference
+is that this implementation supports a custom unicode datatype.
+An ordinary tnetstring is a blob of data prefixed with its length and postfixed
+with its type. Here are some examples:
+ >>> tnetstring.dumps("hello world")
+ 11:hello world,
+ >>> tnetstring.dumps(12345)
+ 5:12345#
+ >>> tnetstring.dumps([12345, True, 0])
+ 19:5:12345#4:true!1:0#]
+This module gives you the following functions:
+ :dump: dump an object as a tnetstring to a file
+ :dumps: dump an object as a tnetstring to a string
+ :load: load a tnetstring-encoded object from a file
+ :loads: load a tnetstring-encoded object from a string
+Note that since parsing a tnetstring requires reading all the data into memory
+at once, there's no efficiency gain from using the file-based versions of these
+functions. They're only here so you can use load() to read precisely one
+item from a file or socket without consuming any extra data.
+The tnetstrings specification explicitly states that strings are binary blobs
+and forbids the use of unicode at the protocol level.
+**This implementation decodes dictionary keys as surrogate-escaped ASCII**,
+all other strings are returned as plain bytes.
+:Copyright: (c) 2012-2013 by Ryan Kelly <ryan@rfk.id.au>.
+:Copyright: (c) 2014 by Carlo Pires <carlopires@gmail.com>.
+:Copyright: (c) 2016 by Maximilian Hils <tnetstring3@maximilianhils.com>.
+:License: MIT
+import collections
import six
+from typing import io, Union, Tuple # noqa
+TSerializable = Union[None, bool, int, float, bytes, list, tuple, dict]
+def dumps(value):
+ # type: (TSerializable) -> bytes
+ """
+ This function dumps a python object as a tnetstring.
+ """
+ # This uses a deque to collect output fragments in reverse order,
+ # then joins them together at the end. It's measurably faster
+ # than creating all the intermediate strings.
+ q = collections.deque()
+ _rdumpq(q, 0, value)
+ return b''.join(q)
+def dump(value, file_handle):
+ # type: (TSerializable, io.BinaryIO) -> None
+ """
+ This function dumps a python object as a tnetstring and
+ writes it to the given file.
+ """
+ file_handle.write(dumps(value))
+def _rdumpq(q, size, value):
+ # type: (collections.deque, int, TSerializable) -> int
+ """
+ Dump value as a tnetstring, to a deque instance, last chunks first.
+ This function generates the tnetstring representation of the given value,
+ pushing chunks of the output onto the given deque instance. It pushes
+ the last chunk first, then recursively generates more chunks.
+ When passed in the current size of the string in the queue, it will return
+ the new size of the string in the queue.
+ Operating last-chunk-first makes it easy to calculate the size written
+ for recursive structures without having to build their representation as
+ a string. This is measurably faster than generating the intermediate
+ strings, especially on deeply nested structures.
+ """
+ write = q.appendleft
+ if value is None:
+ write(b'0:~')
+ return size + 3
+ elif value is True:
+ write(b'4:true!')
+ return size + 7
+ elif value is False:
+ write(b'5:false!')
+ return size + 8
+ elif isinstance(value, int):
+ data = str(value).encode()
+ ldata = len(data)
+ span = str(ldata).encode()
+ write(b'%s:%s#' % (span, data))
+ return size + 2 + len(span) + ldata
+ elif isinstance(value, float):
+ # Use repr() for float rather than str().
+ # It round-trips more accurately.
+ # Probably unnecessary in later python versions that
+ # use David Gay's ftoa routines.
+ data = repr(value).encode()
+ ldata = len(data)
+ span = str(ldata).encode()
+ write(b'%s:%s^' % (span, data))
+ return size + 2 + len(span) + ldata
+ elif isinstance(value, bytes):
+ data = value
+ ldata = len(data)
+ span = str(ldata).encode()
+ write(b'%s:%s,' % (span, data))
+ return size + 2 + len(span) + ldata
+ elif isinstance(value, six.text_type):
+ data = value.encode()
+ ldata = len(data)
+ span = str(ldata).encode()
+ write(b'%s:%s;' % (span, data))
+ return size + 2 + len(span) + ldata
+ elif isinstance(value, (list, tuple)):
+ write(b']')
+ init_size = size = size + 1
+ for item in reversed(value):
+ size = _rdumpq(q, size, item)
+ span = str(size - init_size).encode()
+ write(b':')
+ write(span)
+ return size + 1 + len(span)
+ elif isinstance(value, dict):
+ write(b'}')
+ init_size = size = size + 1
+ for (k, v) in value.items():
+ if isinstance(k, str):
+ k = k.encode("ascii", "strict")
+ size = _rdumpq(q, size, v)
+ size = _rdumpq(q, size, k)
+ span = str(size - init_size).encode()
+ write(b':')
+ write(span)
+ return size + 1 + len(span)
+ else:
+ raise ValueError("unserializable object: {} ({})".format(value, type(value)))
+def loads(string):
+ # type: (bytes) -> TSerializable
+ """
+ This function parses a tnetstring into a python object.
+ """
+ return pop(string)[0]
+def load(file_handle):
+ # type: (io.BinaryIO) -> TSerializable
+ """load(file) -> object
+ This function reads a tnetstring from a file and parses it into a
+ python object. The file must support the read() method, and this
+ function promises not to read more data than necessary.
+ """
+ # Read the length prefix one char at a time.
+ # Note that the netstring spec explicitly forbids padding zeros.
+ c = file_handle.read(1)
+ data_length = b""
+ while c.isdigit():
+ data_length += c
+ if len(data_length) > 9:
+ raise ValueError("not a tnetstring: absurdly large length prefix")
+ c = file_handle.read(1)
+ if c != b":":
+ raise ValueError("not a tnetstring: missing or invalid length prefix")
+ data = file_handle.read(int(data_length))
+ data_type = file_handle.read(1)[0]
+ return parse(data_type, data)
+def parse(data_type, data):
+ # type: (int, bytes) -> TSerializable
+ if data_type == ord(b','):
+ return data
+ if data_type == ord(b';'):
+ return data.decode()
+ if data_type == ord(b'#'):
+ try:
+ return int(data)
+ except ValueError:
+ raise ValueError("not a tnetstring: invalid integer literal: {}".format(data))
+ if data_type == ord(b'^'):
+ try:
+ return float(data)
+ except ValueError:
+ raise ValueError("not a tnetstring: invalid float literal: {}".format(data))
+ if data_type == ord(b'!'):
+ if data == b'true':
+ return True
+ elif data == b'false':
+ return False
+ else:
+ raise ValueError("not a tnetstring: invalid boolean literal: {}".format(data))
+ if data_type == ord(b'~'):
+ if data:
+ raise ValueError("not a tnetstring: invalid null literal")
+ return None
+ if data_type == ord(b']'):
+ l = []
+ while data:
+ item, data = pop(data)
+ l.append(item)
+ return l
+ if data_type == ord(b'}'):
+ d = {}
+ while data:
+ key, data = pop(data)
+ if isinstance(key, bytes):
+ key = key.decode("ascii", "strict")
+ val, data = pop(data)
+ d[key] = val
+ return d
+ raise ValueError("unknown type tag: {}".format(data_type))
+def pop(data):
+ # type: (bytes) -> Tuple[TSerializable, bytes]
+ """
+ This function parses a tnetstring into a python object.
+ It returns a tuple giving the parsed object and a string
+ containing any unparsed data from the end of the string.
+ """
+ # Parse out data length, type and remaining string.
+ try:
+ length, data = data.split(b':', 1)
+ length = int(length)
+ except ValueError:
+ raise ValueError("not a tnetstring: missing or invalid length prefix: {}".format(data))
+ try:
+ data, data_type, remain = data[:length], data[length], data[length + 1:]
+ except IndexError:
+ # This fires if len(data) < dlen, meaning we don't need
+ # to further validate that data is the right length.
+ raise ValueError("not a tnetstring: invalid length prefix: {}".format(length))
+ # Parse the data based on the type tag.
+ return parse(data_type, data), remain
-if six.PY2:
- from .py2.tnetstring import load, loads, dump, dumps, pop
- from .py3.tnetstring import load, loads, dump, dumps, pop
-__all__ = ["load", "loads", "dump", "dumps", "pop"]
+__all__ = ["dump", "dumps", "load", "loads", "pop"]