diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/mitmproxy/test_contentview.py | 8 | ||||
-rw-r--r-- | test/mitmproxy/test_utils.py | 15 | ||||
-rw-r--r-- | test/netlib/test_human.py | 9 | ||||
-rw-r--r-- | test/pathod/test_language_base.py | 13 | ||||
-rw-r--r-- | test/pathod/test_language_http.py | 146 | ||||
-rw-r--r-- | test/pathod/test_language_websocket.py | 14 | ||||
-rw-r--r-- | test/pathod/test_pathod.py | 28 | ||||
-rw-r--r-- | test/pathod/test_pathod_cmdline.py | 5 | ||||
-rw-r--r-- | test/pathod/test_utils.py | 5 | ||||
-rw-r--r-- | test/pathod/tutils.py | 3 |
10 files changed, 127 insertions, 119 deletions
diff --git a/test/mitmproxy/test_contentview.py b/test/mitmproxy/test_contentview.py index 9142bdad..48d6c307 100644 --- a/test/mitmproxy/test_contentview.py +++ b/test/mitmproxy/test_contentview.py @@ -1,3 +1,5 @@ +import json + from mitmproxy.exceptions import ContentViewException from netlib.http import Headers from netlib.odict import ODict @@ -274,3 +276,9 @@ if cv.ViewProtobuf.is_available(): def test_get_by_shortcut(): assert cv.get_by_shortcut("h") + + +def test_pretty_json(): + s = json.dumps({"foo": 1}) + assert cv.pretty_json(s) + assert not cv.pretty_json("moo") diff --git a/test/mitmproxy/test_utils.py b/test/mitmproxy/test_utils.py index c01b5f2a..79b72eec 100644 --- a/test/mitmproxy/test_utils.py +++ b/test/mitmproxy/test_utils.py @@ -1,29 +1,14 @@ -import json from mitmproxy import utils from . import tutils utils.CERT_SLEEP_TIME = 0 -def test_format_timestamp(): - assert utils.format_timestamp(utils.timestamp()) - - -def test_format_timestamp_with_milli(): - assert utils.format_timestamp_with_milli(utils.timestamp()) - - def test_pkg_data(): assert utils.pkg_data.path("console") tutils.raises("does not exist", utils.pkg_data.path, "nonexistent") -def test_pretty_json(): - s = json.dumps({"foo": 1}) - assert utils.pretty_json(s) - assert not utils.pretty_json("moo") - - def test_LRUCache(): cache = utils.LRUCache(2) diff --git a/test/netlib/test_human.py b/test/netlib/test_human.py index 2a5c2a85..bb97dc54 100644 --- a/test/netlib/test_human.py +++ b/test/netlib/test_human.py @@ -1,6 +1,15 @@ +import time from netlib import human, tutils +def test_format_timestamp(): + assert human.format_timestamp(time.time()) + + +def test_format_timestamp_with_milli(): + assert human.format_timestamp_with_milli(time.time()) + + def test_parse_size(): assert human.parse_size("0") == 0 assert human.parse_size("0b") == 0 diff --git a/test/pathod/test_language_base.py b/test/pathod/test_language_base.py index 075dc2b8..7c7d8cf9 100644 --- a/test/pathod/test_language_base.py +++ b/test/pathod/test_language_base.py @@ -55,8 +55,15 @@ class TestTokValueLiteral: v = base.TokValueLiteral("f\x00oo") assert v.spec() == repr(v) == r"'f\x00oo'" - v = base.TokValueLiteral("\"") - assert v.spec() == repr(v) == '\'"\'' + v = base.TokValueLiteral('"') + assert v.spec() == repr(v) == """ '"' """.strip() + + # While pyparsing has a escChar argument for QuotedString, + # escChar only performs scapes single-character escapes and does not work for e.g. r"\x02". + # Thus, we cannot use that option, which means we cannot have single quotes in strings. + # To fix this, we represent single quotes as r"\x07". + v = base.TokValueLiteral("'") + assert v.spec() == r"'\x27'" def roundtrip(self, spec): e = base.TokValueLiteral.expr() @@ -311,7 +318,7 @@ def test_options_or_value(): def test_integer(): e = base.Integer.expr() v = e.parseString("200")[0] - assert v.string() == "200" + assert v.string() == b"200" assert v.spec() == "200" assert v.freeze({}).value == v.value diff --git a/test/pathod/test_language_http.py b/test/pathod/test_language_http.py index d1870a63..18059e3a 100644 --- a/test/pathod/test_language_http.py +++ b/test/pathod/test_language_http.py @@ -1,15 +1,15 @@ -from six.moves import cStringIO as StringIO +from six import BytesIO from pathod import language from pathod.language import http, base import tutils def parse_request(s): - return language.parse_pathoc(s).next() + return next(language.parse_pathoc(s)) def test_make_error_response(): - d = StringIO() + d = BytesIO() s = http.make_error_response("foo") language.serve(s, d, {}) @@ -24,17 +24,17 @@ class TestRequest: def test_simple(self): r = parse_request('GET:"/foo"') - assert r.method.string() == "GET" - assert r.path.string() == "/foo" + assert r.method.string() == b"GET" + assert r.path.string() == b"/foo" r = parse_request('GET:/foo') - assert r.path.string() == "/foo" + assert r.path.string() == b"/foo" r = parse_request('GET:@1k') assert len(r.path.string()) == 1024 def test_multiple(self): r = list(language.parse_pathoc("GET:/ PUT:/")) - assert r[0].method.string() == "GET" - assert r[1].method.string() == "PUT" + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"PUT" assert len(r) == 2 l = """ @@ -54,8 +54,8 @@ class TestRequest: """ r = list(language.parse_pathoc(l)) assert len(r) == 2 - assert r[0].method.string() == "GET" - assert r[1].method.string() == "PUT" + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"PUT" l = """ get:"http://localhost:9999/p/200":ir,@1 @@ -63,8 +63,8 @@ class TestRequest: """ r = list(language.parse_pathoc(l)) assert len(r) == 2 - assert r[0].method.string() == "GET" - assert r[1].method.string() == "GET" + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"GET" def test_nested_response(self): l = "get:/p:s'200'" @@ -75,7 +75,7 @@ class TestRequest: assert r[0].values({}) def test_render(self): - s = StringIO() + s = BytesIO() r = parse_request("GET:'/foo'") assert language.serve( r, @@ -90,8 +90,8 @@ class TestRequest: ir,@1 """ r = parse_request(l) - assert r.method.string() == "GET" - assert r.path.string() == "/foo" + assert r.method.string() == b"GET" + assert r.path.string() == b"/foo" assert r.actions l = """ @@ -106,8 +106,8 @@ class TestRequest: ir,@1 """ r = parse_request(l) - assert r.method.string() == "GET" - assert r.path.string().endswith("bar") + assert r.method.string() == b"GET" + assert r.path.string().endswith(b"bar") assert r.actions def test_spec(self): @@ -128,66 +128,66 @@ class TestRequest: def test_websocket(self): r = parse_request('ws:/path/') res = r.resolve(language.Settings()) - assert res.method.string().lower() == "get" - assert res.tok(http.Path).value.val == "/path/" - assert res.tok(http.Method).value.val.lower() == "get" - assert http.get_header("Upgrade", res.headers).value.val == "websocket" + assert res.method.string().lower() == b"get" + assert res.tok(http.Path).value.val == b"/path/" + assert res.tok(http.Method).value.val.lower() == b"get" + assert http.get_header(b"Upgrade", res.headers).value.val == b"websocket" r = parse_request('ws:put:/path/') res = r.resolve(language.Settings()) - assert r.method.string().lower() == "put" - assert res.tok(http.Path).value.val == "/path/" - assert res.tok(http.Method).value.val.lower() == "put" - assert http.get_header("Upgrade", res.headers).value.val == "websocket" + assert r.method.string().lower() == b"put" + assert res.tok(http.Path).value.val == b"/path/" + assert res.tok(http.Method).value.val.lower() == b"put" + assert http.get_header(b"Upgrade", res.headers).value.val == b"websocket" class TestResponse: def dummy_response(self): - return language.parse_pathod("400'msg'").next() + return next(language.parse_pathod("400'msg'")) def test_response(self): - r = language.parse_pathod("400:m'msg'").next() - assert r.status_code.string() == "400" - assert r.reason.string() == "msg" + r = next(language.parse_pathod("400:m'msg'")) + assert r.status_code.string() == b"400" + assert r.reason.string() == b"msg" - r = language.parse_pathod("400:m'msg':b@100b").next() - assert r.reason.string() == "msg" + r = next(language.parse_pathod("400:m'msg':b@100b")) + assert r.reason.string() == b"msg" assert r.body.values({}) assert str(r) - r = language.parse_pathod("200").next() - assert r.status_code.string() == "200" + r = next(language.parse_pathod("200")) + assert r.status_code.string() == b"200" assert not r.reason - assert "OK" in [i[:] for i in r.preamble({})] + assert b"OK" in [i[:] for i in r.preamble({})] def test_render(self): - s = StringIO() - r = language.parse_pathod("400:m'msg'").next() + s = BytesIO() + r = next(language.parse_pathod("400:m'msg'")) assert language.serve(r, s, {}) - r = language.parse_pathod("400:p0,100:dr").next() + r = next(language.parse_pathod("400:p0,100:dr")) assert "p0" in r.spec() s = r.preview_safe() assert "p0" not in s.spec() def test_raw(self): - s = StringIO() - r = language.parse_pathod("400:b'foo'").next() + s = BytesIO() + r = next(language.parse_pathod("400:b'foo'")) language.serve(r, s, {}) v = s.getvalue() - assert "Content-Length" in v + assert b"Content-Length" in v - s = StringIO() - r = language.parse_pathod("400:b'foo':r").next() + s = BytesIO() + r = next(language.parse_pathod("400:b'foo':r")) language.serve(r, s, {}) v = s.getvalue() - assert "Content-Length" not in v + assert b"Content-Length" not in v def test_length(self): def testlen(x): - s = StringIO() - x = x.next() + s = BytesIO() + x = next(x) language.serve(x, s, language.Settings()) assert x.length(language.Settings()) == len(s.getvalue()) testlen(language.parse_pathod("400:m'msg':r")) @@ -196,8 +196,8 @@ class TestResponse: def test_maximum_length(self): def testlen(x): - x = x.next() - s = StringIO() + x = next(x) + s = BytesIO() m = x.maximum_length({}) language.serve(x, s, {}) assert m >= len(s.getvalue()) @@ -225,19 +225,19 @@ class TestResponse: tutils.raises("ascii", language.parse_pathod, "foo:b\xf0") def test_parse_header(self): - r = language.parse_pathod('400:h"foo"="bar"').next() - assert http.get_header("foo", r.headers) + r = next(language.parse_pathod('400:h"foo"="bar"')) + assert http.get_header(b"foo", r.headers) def test_parse_pause_before(self): - r = language.parse_pathod("400:p0,10").next() + r = next(language.parse_pathod("400:p0,10")) assert r.actions[0].spec() == "p0,10" def test_parse_pause_after(self): - r = language.parse_pathod("400:pa,10").next() + r = next(language.parse_pathod("400:pa,10")) assert r.actions[0].spec() == "pa,10" def test_parse_pause_random(self): - r = language.parse_pathod("400:pr,10").next() + r = next(language.parse_pathod("400:pr,10")) assert r.actions[0].spec() == "pr,10" def test_parse_stress(self): @@ -245,29 +245,29 @@ class TestResponse: # returns an int and a python 2.7 int on windows has 32bit precision. # Therefore, we should keep the body length < 2147483647 bytes in our # tests. - r = language.parse_pathod("400:b@1g").next() + r = next(language.parse_pathod("400:b@1g")) assert r.length({}) def test_spec(self): def rt(s): - s = language.parse_pathod(s).next().spec() - assert language.parse_pathod(s).next().spec() == s + s = next(language.parse_pathod(s)).spec() + assert next(language.parse_pathod(s)).spec() == s rt("400:b@100g") rt("400") rt("400:da") def test_websockets(self): - r = language.parse_pathod("ws").next() + r = next(language.parse_pathod("ws")) tutils.raises("no websocket key", r.resolve, language.Settings()) - res = r.resolve(language.Settings(websocket_key="foo")) - assert res.status_code.string() == "101" + res = r.resolve(language.Settings(websocket_key=b"foo")) + assert res.status_code.string() == b"101" def test_ctype_shortcut(): e = http.ShortcutContentType.expr() v = e.parseString("c'foo'")[0] - assert v.key.val == "Content-Type" - assert v.value.val == "foo" + assert v.key.val == b"Content-Type" + assert v.value.val == b"foo" s = v.spec() assert s == e.parseString(s)[0].spec() @@ -282,8 +282,8 @@ def test_ctype_shortcut(): def test_location_shortcut(): e = http.ShortcutLocation.expr() v = e.parseString("l'foo'")[0] - assert v.key.val == "Location" - assert v.value.val == "foo" + assert v.key.val == b"Location" + assert v.value.val == b"foo" s = v.spec() assert s == e.parseString(s)[0].spec() @@ -296,23 +296,23 @@ def test_location_shortcut(): def test_shortcuts(): - assert language.parse_pathod( - "400:c'foo'").next().headers[0].key.val == "Content-Type" - assert language.parse_pathod( - "400:l'foo'").next().headers[0].key.val == "Location" + assert next(language.parse_pathod( + "400:c'foo'")).headers[0].key.val == b"Content-Type" + assert next(language.parse_pathod( + "400:l'foo'")).headers[0].key.val == b"Location" - assert "Android" in tutils.render(parse_request("get:/:ua")) - assert "User-Agent" in tutils.render(parse_request("get:/:ua")) + assert b"Android" in tutils.render(parse_request("get:/:ua")) + assert b"User-Agent" in tutils.render(parse_request("get:/:ua")) def test_user_agent(): e = http.ShortcutUserAgent.expr() v = e.parseString("ua")[0] - assert "Android" in v.string() + assert b"Android" in v.string() e = http.ShortcutUserAgent.expr() v = e.parseString("u'a'")[0] - assert "Android" not in v.string() + assert b"Android" not in v.string() v = e.parseString("u@100'")[0] assert len(str(v.freeze({}).value)) > 100 @@ -324,7 +324,7 @@ def test_user_agent(): def test_nested_response(): e = http.NestedResponse.expr() v = e.parseString("s'200'")[0] - assert v.value.val == "200" + assert v.value.val == b"200" tutils.raises( language.ParseException, e.parseString, @@ -340,9 +340,7 @@ def test_nested_response(): def test_nested_response_freeze(): e = http.NestedResponse( base.TokValueLiteral( - "200:b'foo':i10,'\\x27'".encode( - "string_escape" - ) + r"200:b\'foo\':i10,\'\\x27\'" ) ) assert e.freeze({}) diff --git a/test/pathod/test_language_websocket.py b/test/pathod/test_language_websocket.py index 29155dba..58297141 100644 --- a/test/pathod/test_language_websocket.py +++ b/test/pathod/test_language_websocket.py @@ -6,7 +6,7 @@ import tutils def parse_request(s): - return language.parse_pathoc(s).next() + return next(language.parse_pathoc(s)) class TestWebsocketFrame: @@ -93,9 +93,9 @@ class TestWebsocketFrame: def test_rawbody(self): frm = self.fr("wf:mask:r'foo'") assert len(frm.payload) == 3 - assert frm.payload != "foo" + assert frm.payload != b"foo" - assert self.fr("wf:r'foo'").payload == "foo" + assert self.fr("wf:r'foo'").payload == b"foo" def test_construction_2(self): # Simple server frame @@ -109,7 +109,7 @@ class TestWebsocketFrame: assert frm.header.masking_key frm = self.fr("wf:b'foo':k'abcd'", is_client=True) assert frm.header.mask - assert frm.header.masking_key == 'abcd' + assert frm.header.masking_key == b'abcd' # Server frame, mask explicitly set frm = self.fr("wf:b'foo':mask") @@ -117,7 +117,7 @@ class TestWebsocketFrame: assert frm.header.masking_key frm = self.fr("wf:b'foo':k'abcd'") assert frm.header.mask - assert frm.header.masking_key == 'abcd' + assert frm.header.masking_key == b'abcd' # Client frame, mask explicitly unset frm = self.fr("wf:b'foo':-mask", is_client=True) @@ -128,7 +128,7 @@ class TestWebsocketFrame: assert not frm.header.mask # We're reading back a corrupted frame - the first 3 characters of the # mask is mis-interpreted as the payload - assert frm.payload == "abc" + assert frm.payload == b"abc" def test_knone(self): with tutils.raises("expected 4 bytes"): @@ -138,5 +138,5 @@ class TestWebsocketFrame: assert self.fr("wf:l3:b'foo'").header.payload_length == 3 frm = self.fr("wf:l2:b'foo'") assert frm.header.payload_length == 2 - assert frm.payload == "fo" + assert frm.payload == b"fo" tutils.raises("expected 1024 bytes", self.fr, "wf:l1024:b'foo'") diff --git a/test/pathod/test_pathod.py b/test/pathod/test_pathod.py index 0bbad6c2..ed4ef49f 100644 --- a/test/pathod/test_pathod.py +++ b/test/pathod/test_pathod.py @@ -145,14 +145,14 @@ class CommonTests(tutils.DaemonTests): def test_invalid_first_line(self): c = tcp.TCPClient(("localhost", self.d.port)) - c.connect() - if self.ssl: - c.convert_to_ssl() - c.wfile.write("foo\n\n\n") - c.wfile.flush() - l = self.d.last_log() - assert l["type"] == "error" - assert "foo" in l["msg"] + with c.connect(): + if self.ssl: + c.convert_to_ssl() + c.wfile.write("foo\n\n\n") + c.wfile.flush() + l = self.d.last_log() + assert l["type"] == "error" + assert "foo" in l["msg"] def test_invalid_content_length(self): tutils.raises( @@ -238,12 +238,12 @@ class TestDaemonSSL(CommonTests): c = tcp.TCPClient(("localhost", self.d.port)) c.rbufsize = 0 c.wbufsize = 0 - c.connect() - c.wfile.write("\0\0\0\0") - tutils.raises(TlsException, c.convert_to_ssl) - l = self.d.last_log() - assert l["type"] == "error" - assert "SSL" in l["msg"] + with c.connect(): + c.wfile.write("\0\0\0\0") + tutils.raises(TlsException, c.convert_to_ssl) + l = self.d.last_log() + assert l["type"] == "error" + assert "SSL" in l["msg"] def test_ssl_cipher(self): r, _ = self.pathoc([r"get:/p/202"]) diff --git a/test/pathod/test_pathod_cmdline.py b/test/pathod/test_pathod_cmdline.py index 3c0918ef..18d54c82 100644 --- a/test/pathod/test_pathod_cmdline.py +++ b/test/pathod/test_pathod_cmdline.py @@ -3,6 +3,11 @@ import tutils import mock +def test_parse_anchor_spec(): + assert cmdline.parse_anchor_spec("foo=200") == ("foo", "200") + assert cmdline.parse_anchor_spec("foo") is None + + @mock.patch("argparse.ArgumentParser.error") def test_pathod(perror): assert cmdline.args_pathod(["pathod"]) diff --git a/test/pathod/test_utils.py b/test/pathod/test_utils.py index a46a523a..2bb82fe7 100644 --- a/test/pathod/test_utils.py +++ b/test/pathod/test_utils.py @@ -11,10 +11,5 @@ def test_membool(): assert m.v == 2 -def test_parse_anchor_spec(): - assert utils.parse_anchor_spec("foo=200") == ("foo", "200") - assert utils.parse_anchor_spec("foo") is None - - def test_data_path(): tutils.raises(ValueError, utils.data.path, "nonexistent") diff --git a/test/pathod/tutils.py b/test/pathod/tutils.py index 56cd2002..bf5e3165 100644 --- a/test/pathod/tutils.py +++ b/test/pathod/tutils.py @@ -3,6 +3,7 @@ import re import shutil import requests from six.moves import cStringIO as StringIO +from six import BytesIO import urllib from netlib import tcp @@ -147,6 +148,6 @@ test_data = utils.Data(__name__) def render(r, settings=language.Settings()): r = r.resolve(settings) - s = StringIO() + s = BytesIO() assert language.serve(r, s, settings) return s.getvalue() |