aboutsummaryrefslogtreecommitdiffstats
path: root/test/netlib
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2016-07-15 23:17:57 -0700
committerMaximilian Hils <git@maximilianhils.com>2016-07-15 23:17:57 -0700
commit3602fd7a36d963311339ab11ed36ff00df860f71 (patch)
tree5834a98b35c02639c876544bc645f205068fac99 /test/netlib
parenta3c7c84d49c3e6563e7f37ef60c989f99ed96788 (diff)
parent17305643bc482c0b185eec5c64d506790cd26587 (diff)
downloadmitmproxy-3602fd7a36d963311339ab11ed36ff00df860f71.tar.gz
mitmproxy-3602fd7a36d963311339ab11ed36ff00df860f71.tar.bz2
mitmproxy-3602fd7a36d963311339ab11ed36ff00df860f71.zip
Merge remote-tracking branch 'origin/master' into message-body-encoding
Diffstat (limited to 'test/netlib')
-rw-r--r--test/netlib/http/http1/test_assemble.py2
-rw-r--r--test/netlib/http/test_cookies.py21
-rw-r--r--test/netlib/http/test_message.py6
-rw-r--r--test/netlib/http/test_request.py8
-rw-r--r--test/netlib/test_strutils.py67
-rw-r--r--test/netlib/test_tcp.py26
-rw-r--r--test/netlib/tservers.py12
7 files changed, 92 insertions, 50 deletions
diff --git a/test/netlib/http/http1/test_assemble.py b/test/netlib/http/http1/test_assemble.py
index 50d29384..841ea58a 100644
--- a/test/netlib/http/http1/test_assemble.py
+++ b/test/netlib/http/http1/test_assemble.py
@@ -24,7 +24,7 @@ def test_assemble_request():
def test_assemble_request_head():
- c = assemble_request_head(treq(content="foo"))
+ c = assemble_request_head(treq(content=b"foo"))
assert b"GET" in c
assert b"qvalue" in c
assert b"content-length" in c
diff --git a/test/netlib/http/test_cookies.py b/test/netlib/http/test_cookies.py
index 83b85656..17e21b94 100644
--- a/test/netlib/http/test_cookies.py
+++ b/test/netlib/http/test_cookies.py
@@ -245,3 +245,24 @@ def test_refresh_cookie():
assert cookies.refresh_set_cookie_header(c, 0)
c = "foo/bar=bla"
assert cookies.refresh_set_cookie_header(c, 0)
+
+
+def test_is_expired():
+ CA = cookies.CookieAttrs
+
+ # A cookie can be expired
+ # by setting the expire time in the past
+ assert cookies.is_expired(CA([("Expires", "Thu, 01-Jan-1970 00:00:00 GMT")]))
+
+ # or by setting Max-Age to 0
+ assert cookies.is_expired(CA([("Max-Age", "0")]))
+
+ # or both
+ assert cookies.is_expired(CA([("Expires", "Thu, 01-Jan-1970 00:00:00 GMT"), ("Max-Age", "0")]))
+
+ assert not cookies.is_expired(CA([("Expires", "Thu, 24-Aug-2063 00:00:00 GMT")]))
+ assert not cookies.is_expired(CA([("Max-Age", "1")]))
+ assert not cookies.is_expired(CA([("Expires", "Thu, 15-Jul-2068 00:00:00 GMT"), ("Max-Age", "1")]))
+
+ assert not cookies.is_expired(CA([("Max-Age", "nan")]))
+ assert not cookies.is_expired(CA([("Expires", "false")]))
diff --git a/test/netlib/http/test_message.py b/test/netlib/http/test_message.py
index 8b178e04..deebd6f2 100644
--- a/test/netlib/http/test_message.py
+++ b/test/netlib/http/test_message.py
@@ -10,8 +10,8 @@ from netlib import http, tutils
def _test_passthrough_attr(message, attr):
assert getattr(message, attr) == getattr(message.data, attr)
- setattr(message, attr, "foo")
- assert getattr(message.data, attr) == "foo"
+ setattr(message, attr, b"foo")
+ assert getattr(message.data, attr) == b"foo"
def _test_decoded_attr(message, attr):
@@ -233,7 +233,7 @@ class TestMessageText(object):
def test_none(self):
r = tresp(content=None)
assert r.text is None
- r.text = b"foo"
+ r.text = u"foo"
assert r.text is not None
r.text = None
assert r.text is None
diff --git a/test/netlib/http/test_request.py b/test/netlib/http/test_request.py
index c03db339..f3cd8b71 100644
--- a/test/netlib/http/test_request.py
+++ b/test/netlib/http/test_request.py
@@ -248,20 +248,20 @@ class TestRequestUtils(object):
assert "gzip" in request.headers["Accept-Encoding"]
def test_get_urlencoded_form(self):
- request = treq(content="foobar=baz")
+ request = treq(content=b"foobar=baz")
assert not request.urlencoded_form
request.headers["Content-Type"] = "application/x-www-form-urlencoded"
- assert list(request.urlencoded_form.items()) == [("foobar", "baz")]
+ assert list(request.urlencoded_form.items()) == [(b"foobar", b"baz")]
def test_set_urlencoded_form(self):
request = treq()
- request.urlencoded_form = [('foo', 'bar'), ('rab', 'oof')]
+ request.urlencoded_form = [(b'foo', b'bar'), (b'rab', b'oof')]
assert request.headers["Content-Type"] == "application/x-www-form-urlencoded"
assert request.content
def test_get_multipart_form(self):
- request = treq(content="foobar")
+ request = treq(content=b"foobar")
assert not request.multipart_form
request.headers["Content-Type"] = "multipart/form-data"
diff --git a/test/netlib/test_strutils.py b/test/netlib/test_strutils.py
index 16e5d0b3..7c3eacc6 100644
--- a/test/netlib/test_strutils.py
+++ b/test/netlib/test_strutils.py
@@ -1,9 +1,15 @@
-# coding=utf-8
import six
from netlib import strutils, tutils
+def test_always_bytes():
+ assert strutils.always_bytes(bytes(bytearray(range(256)))) == bytes(bytearray(range(256)))
+ assert strutils.always_bytes("foo") == b"foo"
+ with tutils.raises(ValueError):
+ strutils.always_bytes(u"\u2605", "ascii")
+
+
def test_native():
with tutils.raises(TypeError):
strutils.native(42)
@@ -15,18 +21,26 @@ def test_native():
assert strutils.native(b"foo") == u"foo"
-def test_clean_bin():
- assert strutils.clean_bin(b"one") == u"one"
- assert strutils.clean_bin(b"\00ne") == u".ne"
- assert strutils.clean_bin(b"\nne") == u"\nne"
- assert strutils.clean_bin(b"\nne", False) == u".ne"
- assert strutils.clean_bin(u"\u2605".encode("utf8")) == u"..."
-
- assert strutils.clean_bin(u"one") == u"one"
- assert strutils.clean_bin(u"\00ne") == u".ne"
- assert strutils.clean_bin(u"\nne") == u"\nne"
- assert strutils.clean_bin(u"\nne", False) == u".ne"
- assert strutils.clean_bin(u"\u2605") == u"\u2605"
+def test_escape_control_characters():
+ assert strutils.escape_control_characters(u"one") == u"one"
+ assert strutils.escape_control_characters(u"\00ne") == u".ne"
+ assert strutils.escape_control_characters(u"\nne") == u"\nne"
+ assert strutils.escape_control_characters(u"\nne", False) == u".ne"
+ assert strutils.escape_control_characters(u"\u2605") == u"\u2605"
+ assert (
+ strutils.escape_control_characters(bytes(bytearray(range(128))).decode()) ==
+ u'.........\t\n..\r.................. !"#$%&\'()*+,-./0123456789:;<'
+ u'=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~.'
+ )
+ assert (
+ strutils.escape_control_characters(bytes(bytearray(range(128))).decode(), False) ==
+ u'................................ !"#$%&\'()*+,-./0123456789:;<'
+ u'=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~.'
+ )
+
+ if not six.PY2:
+ with tutils.raises(ValueError):
+ strutils.escape_control_characters(b"foo")
def test_bytes_to_escaped_str():
@@ -37,6 +51,14 @@ def test_bytes_to_escaped_str():
assert strutils.bytes_to_escaped_str(b"'") == r"\'"
assert strutils.bytes_to_escaped_str(b'"') == r'"'
+ assert strutils.bytes_to_escaped_str(b"\r\n\t") == "\\r\\n\\t"
+ assert strutils.bytes_to_escaped_str(b"\r\n\t", True) == "\r\n\t"
+
+ assert strutils.bytes_to_escaped_str(b"\n", True) == "\n"
+ assert strutils.bytes_to_escaped_str(b"\\n", True) == "\\ \\ n".replace(" ", "")
+ assert strutils.bytes_to_escaped_str(b"\\\n", True) == "\\ \\ \n".replace(" ", "")
+ assert strutils.bytes_to_escaped_str(b"\\\\n", True) == "\\ \\ \\ \\ n".replace(" ", "")
+
with tutils.raises(ValueError):
strutils.bytes_to_escaped_str(u"such unicode")
@@ -45,10 +67,9 @@ def test_escaped_str_to_bytes():
assert strutils.escaped_str_to_bytes("foo") == b"foo"
assert strutils.escaped_str_to_bytes("\x08") == b"\b"
assert strutils.escaped_str_to_bytes("&!?=\\\\)") == br"&!?=\)"
- assert strutils.escaped_str_to_bytes("ü") == b'\xc3\xbc'
assert strutils.escaped_str_to_bytes(u"\\x08") == b"\b"
assert strutils.escaped_str_to_bytes(u"&!?=\\\\)") == br"&!?=\)"
- assert strutils.escaped_str_to_bytes(u"ü") == b'\xc3\xbc'
+ assert strutils.escaped_str_to_bytes(u"\u00fc") == b'\xc3\xbc'
if six.PY2:
with tutils.raises(ValueError):
@@ -58,17 +79,15 @@ def test_escaped_str_to_bytes():
strutils.escaped_str_to_bytes(b"very byte")
-def test_isBin():
- assert not strutils.isBin("testing\n\r")
- assert strutils.isBin("testing\x01")
- assert strutils.isBin("testing\x0e")
- assert strutils.isBin("testing\x7f")
+def test_is_mostly_bin():
+ assert not strutils.is_mostly_bin(b"foo\xFF")
+ assert strutils.is_mostly_bin(b"foo" + b"\xFF" * 10)
-def test_isXml():
- assert not strutils.isXML("foo")
- assert strutils.isXML("<foo")
- assert strutils.isXML(" \n<foo")
+def test_is_xml():
+ assert not strutils.is_xml(b"foo")
+ assert strutils.is_xml(b"<foo")
+ assert strutils.is_xml(b" \n<foo")
def test_clean_hanging_newline():
diff --git a/test/netlib/test_tcp.py b/test/netlib/test_tcp.py
index 590bcc01..273427d5 100644
--- a/test/netlib/test_tcp.py
+++ b/test/netlib/test_tcp.py
@@ -169,7 +169,7 @@ class TestServerSSL(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
- c.convert_to_ssl(sni=b"foo.com", options=SSL.OP_ALL)
+ c.convert_to_ssl(sni="foo.com", options=SSL.OP_ALL)
testval = b"echo!\n"
c.wfile.write(testval)
c.wfile.flush()
@@ -179,7 +179,7 @@ class TestServerSSL(tservers.ServerTestBase):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
assert not c.get_current_cipher()
- c.convert_to_ssl(sni=b"foo.com")
+ c.convert_to_ssl(sni="foo.com")
ret = c.get_current_cipher()
assert ret
assert "AES" in ret[0]
@@ -195,7 +195,7 @@ class TestSSLv3Only(tservers.ServerTestBase):
def test_failure(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
- tutils.raises(TlsException, c.convert_to_ssl, sni=b"foo.com")
+ tutils.raises(TlsException, c.convert_to_ssl, sni="foo.com")
class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
@@ -238,7 +238,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
with c.connect():
with tutils.raises(InvalidCertificateException):
c.convert_to_ssl(
- sni=b"example.mitmproxy.org",
+ sni="example.mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
)
@@ -272,7 +272,7 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase):
with c.connect():
with tutils.raises(InvalidCertificateException):
c.convert_to_ssl(
- sni=b"mitmproxy.org",
+ sni="mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
)
@@ -291,7 +291,7 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl(
- sni=b"example.mitmproxy.org",
+ sni="example.mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
)
@@ -307,7 +307,7 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl(
- sni=b"example.mitmproxy.org",
+ sni="example.mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
ca_path=tutils.test_data.path("data/verificationcerts/")
)
@@ -371,8 +371,8 @@ class TestSNI(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
- c.convert_to_ssl(sni=b"foo.com")
- assert c.sni == b"foo.com"
+ c.convert_to_ssl(sni="foo.com")
+ assert c.sni == "foo.com"
assert c.rfile.readline() == b"foo.com"
@@ -385,7 +385,7 @@ class TestServerCipherList(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
- c.convert_to_ssl(sni=b"foo.com")
+ c.convert_to_ssl(sni="foo.com")
assert c.rfile.readline() == b"['RC4-SHA']"
@@ -405,7 +405,7 @@ class TestServerCurrentCipher(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
- c.convert_to_ssl(sni=b"foo.com")
+ c.convert_to_ssl(sni="foo.com")
assert b"RC4-SHA" in c.rfile.readline()
@@ -418,7 +418,7 @@ class TestServerCipherListError(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
- tutils.raises("handshake error", c.convert_to_ssl, sni=b"foo.com")
+ tutils.raises("handshake error", c.convert_to_ssl, sni="foo.com")
class TestClientCipherListError(tservers.ServerTestBase):
@@ -433,7 +433,7 @@ class TestClientCipherListError(tservers.ServerTestBase):
tutils.raises(
"cipher specification",
c.convert_to_ssl,
- sni=b"foo.com",
+ sni="foo.com",
cipher_list="bogus"
)
diff --git a/test/netlib/tservers.py b/test/netlib/tservers.py
index 803aaa72..666f97ac 100644
--- a/test/netlib/tservers.py
+++ b/test/netlib/tservers.py
@@ -24,7 +24,7 @@ class _ServerThread(threading.Thread):
class _TServer(tcp.TCPServer):
- def __init__(self, ssl, q, handler_klass, addr):
+ def __init__(self, ssl, q, handler_klass, addr, **kwargs):
"""
ssl: A dictionary of SSL parameters:
@@ -42,6 +42,8 @@ class _TServer(tcp.TCPServer):
self.q = q
self.handler_klass = handler_klass
+ if self.handler_klass is not None:
+ self.handler_klass.kwargs = kwargs
self.last_handler = None
def handle_client_connection(self, request, client_address):
@@ -89,16 +91,16 @@ class ServerTestBase(object):
addr = ("localhost", 0)
@classmethod
- def setup_class(cls):
+ def setup_class(cls, **kwargs):
cls.q = queue.Queue()
- s = cls.makeserver()
+ s = cls.makeserver(**kwargs)
cls.port = s.address.port
cls.server = _ServerThread(s)
cls.server.start()
@classmethod
- def makeserver(cls):
- return _TServer(cls.ssl, cls.q, cls.handler, cls.addr)
+ def makeserver(cls, **kwargs):
+ return _TServer(cls.ssl, cls.q, cls.handler, cls.addr, **kwargs)
@classmethod
def teardown_class(cls):