diff options
Diffstat (limited to 'test')
85 files changed, 2132 insertions, 1195 deletions
diff --git a/test/examples/__init__.py b/test/examples/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/examples/__init__.py diff --git a/test/examples/test_examples.py b/test/examples/test_examples.py new file mode 100644 index 00000000..4c1631ce --- /dev/null +++ b/test/examples/test_examples.py @@ -0,0 +1,101 @@ +from mitmproxy import contentviews +from mitmproxy.test import tflow +from mitmproxy.test import tutils +from mitmproxy.test import taddons +from mitmproxy.net.http import Headers + +from ..mitmproxy import tservers + +example_dir = tutils.test_data.push("../examples") + + +class TestScripts(tservers.MasterTest): + def test_add_header(self): + with taddons.context() as tctx: + a = tctx.script(example_dir.path("simple/add_header.py")) + f = tflow.tflow(resp=tutils.tresp()) + a.response(f) + assert f.response.headers["newheader"] == "foo" + + def test_custom_contentviews(self): + with taddons.context() as tctx: + tctx.script(example_dir.path("simple/custom_contentview.py")) + swapcase = contentviews.get("swapcase") + _, fmt = swapcase(b"<html>Test!</html>") + assert any(b'tEST!' in val[0][1] for val in fmt) + + def test_iframe_injector(self): + with taddons.context() as tctx: + sc = tctx.script(example_dir.path("simple/modify_body_inject_iframe.py")) + tctx.configure( + sc, + iframe = "http://example.org/evil_iframe" + ) + f = tflow.tflow( + resp=tutils.tresp(content=b"<html><body>mitmproxy</body></html>") + ) + tctx.master.addons.invoke_addon(sc, "response", f) + content = f.response.content + assert b'iframe' in content and b'evil_iframe' in content + + def test_modify_form(self): + with taddons.context() as tctx: + sc = tctx.script(example_dir.path("simple/modify_form.py")) + + form_header = Headers(content_type="application/x-www-form-urlencoded") + f = tflow.tflow(req=tutils.treq(headers=form_header)) + sc.request(f) + + assert f.request.urlencoded_form["mitmproxy"] == "rocks" + + f.request.headers["content-type"] = "" + sc.request(f) + assert list(f.request.urlencoded_form.items()) == [("foo", "bar")] + + def test_modify_querystring(self): + with taddons.context() as tctx: + sc = tctx.script(example_dir.path("simple/modify_querystring.py")) + f = tflow.tflow(req=tutils.treq(path="/search?q=term")) + + sc.request(f) + assert f.request.query["mitmproxy"] == "rocks" + + f.request.path = "/" + sc.request(f) + assert f.request.query["mitmproxy"] == "rocks" + + def test_redirect_requests(self): + with taddons.context() as tctx: + sc = tctx.script(example_dir.path("simple/redirect_requests.py")) + f = tflow.tflow(req=tutils.treq(host="example.org")) + sc.request(f) + assert f.request.host == "mitmproxy.org" + + def test_send_reply_from_proxy(self): + with taddons.context() as tctx: + sc = tctx.script(example_dir.path("simple/send_reply_from_proxy.py")) + f = tflow.tflow(req=tutils.treq(host="example.com", port=80)) + sc.request(f) + assert f.response.content == b"Hello World" + + def test_dns_spoofing(self): + with taddons.context() as tctx: + sc = tctx.script(example_dir.path("complex/dns_spoofing.py")) + + original_host = "example.com" + + host_header = Headers(host=original_host) + f = tflow.tflow(req=tutils.treq(headers=host_header, port=80)) + + tctx.master.addons.invoke_addon(sc, "requestheaders", f) + + # Rewrite by reverse proxy mode + f.request.scheme = "https" + f.request.port = 443 + + tctx.master.addons.invoke_addon(sc, "request", f) + + assert f.request.scheme == "http" + assert f.request.port == 80 + + assert f.request.headers["Host"] == original_host diff --git a/test/examples/test_har_dump.py b/test/examples/test_har_dump.py new file mode 100644 index 00000000..11cd5c29 --- /dev/null +++ b/test/examples/test_har_dump.py @@ -0,0 +1,86 @@ +import json + +from mitmproxy.test import tflow +from mitmproxy.test import tutils +from mitmproxy.test import taddons +from mitmproxy.net.http import cookies + +example_dir = tutils.test_data.push("../examples") + + +class TestHARDump: + def flow(self, resp_content=b'message'): + times = dict( + timestamp_start=746203272, + timestamp_end=746203272, + ) + + # Create a dummy flow for testing + return tflow.tflow( + req=tutils.treq(method=b'GET', **times), + resp=tutils.tresp(content=resp_content, **times) + ) + + def test_simple(self, tmpdir): + with taddons.context() as tctx: + a = tctx.script(example_dir.path("complex/har_dump.py")) + path = str(tmpdir.join("somefile")) + tctx.configure(a, hardump=path) + tctx.invoke(a, "response", self.flow()) + tctx.invoke(a, "done") + with open(path, "r") as inp: + har = json.load(inp) + assert len(har["log"]["entries"]) == 1 + + def test_base64(self, tmpdir): + with taddons.context() as tctx: + a = tctx.script(example_dir.path("complex/har_dump.py")) + path = str(tmpdir.join("somefile")) + tctx.configure(a, hardump=path) + + tctx.invoke( + a, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10) + ) + tctx.invoke(a, "done") + with open(path, "r") as inp: + har = json.load(inp) + assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64" + + def test_format_cookies(self): + with taddons.context() as tctx: + a = tctx.script(example_dir.path("complex/har_dump.py")) + + CA = cookies.CookieAttrs + + f = a.format_cookies([("n", "v", CA([("k", "v")]))])[0] + assert f['name'] == "n" + assert f['value'] == "v" + assert not f['httpOnly'] + assert not f['secure'] + + f = a.format_cookies([("n", "v", CA([("httponly", None), ("secure", None)]))])[0] + assert f['httpOnly'] + assert f['secure'] + + f = a.format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0] + assert f['expires'] + + def test_binary(self, tmpdir): + with taddons.context() as tctx: + a = tctx.script(example_dir.path("complex/har_dump.py")) + path = str(tmpdir.join("somefile")) + tctx.configure(a, hardump=path) + + f = self.flow() + f.request.method = "POST" + f.request.headers["content-type"] = "application/x-www-form-urlencoded" + f.request.content = b"foo=bar&baz=s%c3%bc%c3%9f" + f.response.headers["random-junk"] = bytes(range(256)) + f.response.content = bytes(range(256)) + + tctx.invoke(a, "response", f) + tctx.invoke(a, "done") + + with open(path, "r") as inp: + har = json.load(inp) + assert len(har["log"]["entries"]) == 1 diff --git a/test/mitmproxy/examples/test_xss_scanner.py b/test/examples/test_xss_scanner.py index 14ee6902..14ee6902 100755..100644 --- a/test/mitmproxy/examples/test_xss_scanner.py +++ b/test/examples/test_xss_scanner.py diff --git a/test/full_coverage_plugin.py b/test/full_coverage_plugin.py index d98c29d6..ec30a9f9 100644 --- a/test/full_coverage_plugin.py +++ b/test/full_coverage_plugin.py @@ -1,6 +1,7 @@ import os import configparser import pytest +import sys here = os.path.abspath(os.path.dirname(__file__)) @@ -59,6 +60,12 @@ def pytest_runtestloop(session): if os.name == 'nt': cov.exclude('pragma: windows no cover') + if sys.platform == 'darwin': + cov.exclude('pragma: osx no cover') + + if os.environ.get("OPENSSL") == "old": + cov.exclude('pragma: openssl-old no cover') + yield coverage_values = dict([(name, 0) for name in pytest.config.option.full_cov]) diff --git a/test/helper_tools/ab.exe b/test/helper_tools/ab.exe Binary files differdeleted file mode 100644 index d68ed0f3..00000000 --- a/test/helper_tools/ab.exe +++ /dev/null diff --git a/test/individual_coverage.py b/test/individual_coverage.py index 35bcd27f..c975b4c8 100644 --- a/test/individual_coverage.py +++ b/test/individual_coverage.py @@ -26,20 +26,20 @@ def run_tests(src, test, fail): if e == 0: if fail: - print("SUCCESS but should have FAILED:", src, "Please remove this file from setup.cfg tool:individual_coverage/exclude.") + print("UNEXPECTED SUCCESS:", src, "Please remove this file from setup.cfg tool:individual_coverage/exclude.") e = 42 else: - print("SUCCESS:", src) + print("SUCCESS: ", src) else: if fail: - print("Ignoring fail:", src) + print("IGNORING FAIL: ", src) e = 0 else: cov = [l for l in stdout.getvalue().split("\n") if (src in l) or ("was never imported" in l)] if len(cov) == 1: - print("FAIL:", cov[0]) + print("FAIL: ", cov[0]) else: - print("FAIL:", src, test, stdout.getvalue(), stdout.getvalue()) + print("FAIL: ", src, test, stdout.getvalue(), stdout.getvalue()) print(stderr.getvalue()) print(stdout.getvalue()) diff --git a/test/mitmproxy/addons/onboardingapp/__init__.py b/test/mitmproxy/addons/onboardingapp/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/mitmproxy/addons/onboardingapp/__init__.py diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py index f71662f0..7ffda317 100644 --- a/test/mitmproxy/addons/test_clientplayback.py +++ b/test/mitmproxy/addons/test_clientplayback.py @@ -26,7 +26,7 @@ class TestClientPlayback: with taddons.context() as tctx: assert cp.count() == 0 f = tflow.tflow(resp=True) - cp.load([f]) + cp.start_replay([f]) assert cp.count() == 1 RP = "mitmproxy.proxy.protocol.http_replay.RequestReplayThread" with mock.patch(RP) as rp: @@ -44,13 +44,30 @@ class TestClientPlayback: cp.tick() assert cp.current_thread is None + cp.start_replay([f]) + cp.stop_replay() + assert not cp.flows + + def test_load_file(self, tmpdir): + cp = clientplayback.ClientPlayback() + with taddons.context(): + fpath = str(tmpdir.join("flows")) + tdump(fpath, [tflow.tflow(resp=True)]) + cp.load_file(fpath) + assert cp.flows + with pytest.raises(exceptions.CommandError): + cp.load_file("/nonexistent") + def test_configure(self, tmpdir): cp = clientplayback.ClientPlayback() with taddons.context() as tctx: path = str(tmpdir.join("flows")) tdump(path, [tflow.tflow()]) tctx.configure(cp, client_replay=[path]) + cp.configured = False tctx.configure(cp, client_replay=[]) + cp.configured = False tctx.configure(cp) + cp.configured = False with pytest.raises(exceptions.OptionsError): tctx.configure(cp, client_replay=["nonexistent"]) diff --git a/test/mitmproxy/addons/test_core.py b/test/mitmproxy/addons/test_core.py new file mode 100644 index 00000000..c132d80a --- /dev/null +++ b/test/mitmproxy/addons/test_core.py @@ -0,0 +1,165 @@ +from mitmproxy.addons import core +from mitmproxy.test import taddons +from mitmproxy.test import tflow +from mitmproxy import exceptions +import pytest + + +def test_set(): + sa = core.Core() + with taddons.context() as tctx: + tctx.master.addons.add(sa) + + assert not tctx.master.options.anticomp + tctx.command(sa.set, "anticomp") + assert tctx.master.options.anticomp + + with pytest.raises(exceptions.CommandError): + tctx.command(sa.set, "nonexistent") + + +def test_resume(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + assert not sa.resume([f]) + f.intercept() + sa.resume([f]) + assert not f.reply.state == "taken" + + +def test_mark(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + assert not f.marked + sa.mark([f], True) + assert f.marked + + sa.mark_toggle([f]) + assert not f.marked + sa.mark_toggle([f]) + assert f.marked + + +def test_kill(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + f.intercept() + assert f.killable + sa.kill([f]) + assert not f.killable + + +def test_revert(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + f.backup() + f.request.content = b"bar" + assert f.modified() + sa.revert([f]) + assert not f.modified() + + +def test_flow_set(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow(resp=True) + assert sa.flow_set_options() + + with pytest.raises(exceptions.CommandError): + sa.flow_set([f], "flibble", "post") + + assert f.request.method != "post" + sa.flow_set([f], "method", "post") + assert f.request.method == "POST" + + assert f.request.host != "testhost" + sa.flow_set([f], "host", "testhost") + assert f.request.host == "testhost" + + assert f.request.path != "/test/path" + sa.flow_set([f], "path", "/test/path") + assert f.request.path == "/test/path" + + assert f.request.url != "http://foo.com/bar" + sa.flow_set([f], "url", "http://foo.com/bar") + assert f.request.url == "http://foo.com/bar" + with pytest.raises(exceptions.CommandError): + sa.flow_set([f], "url", "oink") + + assert f.response.status_code != 404 + sa.flow_set([f], "status_code", "404") + assert f.response.status_code == 404 + assert f.response.reason == "Not Found" + with pytest.raises(exceptions.CommandError): + sa.flow_set([f], "status_code", "oink") + + assert f.response.reason != "foo" + sa.flow_set([f], "reason", "foo") + assert f.response.reason == "foo" + + +def test_encoding(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + assert sa.encode_options() + sa.encode([f], "request", "deflate") + assert f.request.headers["content-encoding"] == "deflate" + + sa.encode([f], "request", "br") + assert f.request.headers["content-encoding"] == "deflate" + + sa.decode([f], "request") + assert "content-encoding" not in f.request.headers + + sa.encode([f], "request", "br") + assert f.request.headers["content-encoding"] == "br" + + sa.encode_toggle([f], "request") + assert "content-encoding" not in f.request.headers + sa.encode_toggle([f], "request") + assert f.request.headers["content-encoding"] == "deflate" + sa.encode_toggle([f], "request") + assert "content-encoding" not in f.request.headers + + with pytest.raises(exceptions.CommandError): + sa.encode([f], "request", "invalid") + + +def test_options(tmpdir): + p = str(tmpdir.join("path")) + sa = core.Core() + with taddons.context() as tctx: + tctx.options.stickycookie = "foo" + assert tctx.options.stickycookie == "foo" + sa.options_reset() + assert tctx.options.stickycookie is None + + tctx.options.stickycookie = "foo" + tctx.options.stickyauth = "bar" + sa.options_reset_one("stickycookie") + assert tctx.options.stickycookie is None + assert tctx.options.stickyauth == "bar" + + with pytest.raises(exceptions.CommandError): + sa.options_reset_one("unknown") + + sa.options_save(p) + with pytest.raises(exceptions.CommandError): + sa.options_save("/") + + sa.options_reset() + assert tctx.options.stickyauth is None + sa.options_load(p) + assert tctx.options.stickyauth == "bar" + + sa.options_load("/nonexistent") + + with open(p, 'a') as f: + f.write("'''") + with pytest.raises(exceptions.CommandError): + sa.options_load(p) diff --git a/test/mitmproxy/addons/test_core_option_validation.py b/test/mitmproxy/addons/test_core_option_validation.py index 0bb2bb0d..6d6d5ba4 100644 --- a/test/mitmproxy/addons/test_core_option_validation.py +++ b/test/mitmproxy/addons/test_core_option_validation.py @@ -11,7 +11,7 @@ def test_simple(): with pytest.raises(exceptions.OptionsError): tctx.configure(sa, body_size_limit = "invalid") tctx.configure(sa, body_size_limit = "1m") - assert tctx.options._processed["body_size_limit"] + assert tctx.master.options._processed["body_size_limit"] with pytest.raises(exceptions.OptionsError, match="mutually exclusive"): tctx.configure( diff --git a/test/mitmproxy/addons/test_cut.py b/test/mitmproxy/addons/test_cut.py new file mode 100644 index 00000000..e028331f --- /dev/null +++ b/test/mitmproxy/addons/test_cut.py @@ -0,0 +1,178 @@ + +from mitmproxy.addons import cut +from mitmproxy.addons import view +from mitmproxy import exceptions +from mitmproxy import certs +from mitmproxy.test import taddons +from mitmproxy.test import tflow +from mitmproxy.test import tutils +import pytest +from unittest import mock + + +def test_extract(): + tf = tflow.tflow(resp=True) + tests = [ + ["q.method", "GET"], + ["q.scheme", "http"], + ["q.host", "address"], + ["q.port", "22"], + ["q.path", "/path"], + ["q.url", "http://address:22/path"], + ["q.text", "content"], + ["q.content", b"content"], + ["q.raw_content", b"content"], + ["q.header[header]", "qvalue"], + + ["s.status_code", "200"], + ["s.reason", "OK"], + ["s.text", "message"], + ["s.content", b"message"], + ["s.raw_content", b"message"], + ["s.header[header-response]", "svalue"], + + ["cc.address.port", "22"], + ["cc.address.host", "address"], + ["cc.tls_version", "TLSv1.2"], + ["cc.sni", "address"], + ["cc.ssl_established", "false"], + + ["sc.address.port", "22"], + ["sc.address.host", "address"], + ["sc.ip_address.host", "192.168.0.1"], + ["sc.tls_version", "TLSv1.2"], + ["sc.sni", "address"], + ["sc.ssl_established", "false"], + ] + for t in tests: + ret = cut.extract(t[0], tf) + if ret != t[1]: + raise AssertionError("%s: Expected %s, got %s" % (t[0], t[1], ret)) + + with open(tutils.test_data.path("mitmproxy/net/data/text_cert"), "rb") as f: + d = f.read() + c1 = certs.SSLCert.from_pem(d) + tf.server_conn.cert = c1 + assert "CERTIFICATE" in cut.extract("sc.cert", tf) + + +def test_parse_cutspec(): + tests = [ + ("", None, True), + ("req.method", ("@all", ["req.method"]), False), + ( + "req.method,req.host", + ("@all", ["req.method", "req.host"]), + False + ), + ( + "req.method,req.host|~b foo", + ("~b foo", ["req.method", "req.host"]), + False + ), + ( + "req.method,req.host|~b foo | ~b bar", + ("~b foo | ~b bar", ["req.method", "req.host"]), + False + ), + ( + "req.method, req.host | ~b foo | ~b bar", + ("~b foo | ~b bar", ["req.method", "req.host"]), + False + ), + ] + for cutspec, output, err in tests: + try: + assert cut.parse_cutspec(cutspec) == output + except exceptions.CommandError: + if not err: + raise + else: + if err: + raise AssertionError("Expected error.") + + +def test_headername(): + with pytest.raises(exceptions.CommandError): + cut.headername("header[foo.") + + +def qr(f): + with open(f, "rb") as fp: + return fp.read() + + +def test_cut_clip(): + v = view.View() + c = cut.Cut() + with taddons.context() as tctx: + tctx.master.addons.add(v, c) + v.add([tflow.tflow(resp=True)]) + + with mock.patch('pyperclip.copy') as pc: + tctx.command(c.clip, "q.method|@all") + assert pc.called + + with mock.patch('pyperclip.copy') as pc: + tctx.command(c.clip, "q.content|@all") + assert pc.called + + with mock.patch('pyperclip.copy') as pc: + tctx.command(c.clip, "q.method,q.content|@all") + assert pc.called + + +def test_cut_file(tmpdir): + f = str(tmpdir.join("path")) + v = view.View() + c = cut.Cut() + with taddons.context() as tctx: + tctx.master.addons.add(v, c) + + v.add([tflow.tflow(resp=True)]) + + tctx.command(c.save, "q.method|@all", f) + assert qr(f) == b"GET" + tctx.command(c.save, "q.content|@all", f) + assert qr(f) == b"content" + tctx.command(c.save, "q.content|@all", "+" + f) + assert qr(f) == b"content\ncontent" + + v.add([tflow.tflow(resp=True)]) + tctx.command(c.save, "q.method|@all", f) + assert qr(f).splitlines() == [b"GET", b"GET"] + tctx.command(c.save, "q.method,q.content|@all", f) + assert qr(f).splitlines() == [b"GET,content", b"GET,content"] + + +def test_cut(): + v = view.View() + c = cut.Cut() + with taddons.context() as tctx: + v.add([tflow.tflow(resp=True)]) + tctx.master.addons.add(v, c) + assert c.cut("q.method|@all") == [["GET"]] + assert c.cut("q.scheme|@all") == [["http"]] + assert c.cut("q.host|@all") == [["address"]] + assert c.cut("q.port|@all") == [["22"]] + assert c.cut("q.path|@all") == [["/path"]] + assert c.cut("q.url|@all") == [["http://address:22/path"]] + assert c.cut("q.content|@all") == [[b"content"]] + assert c.cut("q.header[header]|@all") == [["qvalue"]] + assert c.cut("q.header[unknown]|@all") == [[""]] + + assert c.cut("s.status_code|@all") == [["200"]] + assert c.cut("s.reason|@all") == [["OK"]] + assert c.cut("s.content|@all") == [[b"message"]] + assert c.cut("s.header[header-response]|@all") == [["svalue"]] + assert c.cut("moo") == [[""]] + with pytest.raises(exceptions.CommandError): + assert c.cut("__dict__") == [[""]] + + v = view.View() + c = cut.Cut() + with taddons.context() as tctx: + tctx.master.addons.add(v, c) + v.add([tflow.ttcpflow()]) + assert c.cut("q.method|@all") == [[""]] + assert c.cut("s.status|@all") == [[""]] diff --git a/test/mitmproxy/addons/test_defaults.py b/test/mitmproxy/addons/test_defaults.py deleted file mode 100644 index e20466f1..00000000 --- a/test/mitmproxy/addons/test_defaults.py +++ /dev/null @@ -1,5 +0,0 @@ -from mitmproxy import addons - - -def test_defaults(): - assert addons.default_addons() diff --git a/test/mitmproxy/addons/test_dumper.py b/test/mitmproxy/addons/test_dumper.py index fbcc4d16..d8aa593b 100644 --- a/test/mitmproxy/addons/test_dumper.py +++ b/test/mitmproxy/addons/test_dumper.py @@ -16,7 +16,7 @@ from mitmproxy import options def test_configure(): d = dumper.Dumper() with taddons.context(options=options.Options()) as ctx: - ctx.configure(d, filtstr="~b foo") + ctx.configure(d, view_filter="~b foo") assert d.filter f = tflow.tflow(resp=True) @@ -24,10 +24,10 @@ def test_configure(): f.response.content = b"foo" assert d.match(f) - ctx.configure(d, filtstr=None) + ctx.configure(d, view_filter=None) assert not d.filter with pytest.raises(exceptions.OptionsError): - ctx.configure(d, filtstr="~~") + ctx.configure(d, view_filter="~~") assert not d.filter @@ -68,7 +68,6 @@ def test_simple(): ctx.configure(d, flow_detail=4) flow = tflow.tflow() flow.request = tutils.treq() - flow.request.stickycookie = True flow.client_conn = mock.MagicMock() flow.client_conn.address[0] = "foo" flow.response = tutils.tresp(content=None) diff --git a/test/mitmproxy/addons/test_export.py b/test/mitmproxy/addons/test_export.py new file mode 100644 index 00000000..233c62d5 --- /dev/null +++ b/test/mitmproxy/addons/test_export.py @@ -0,0 +1,109 @@ +import pytest +import os + +from mitmproxy import exceptions +from mitmproxy.addons import export # heh +from mitmproxy.test import tflow +from mitmproxy.test import tutils +from mitmproxy.test import taddons +from unittest import mock + + +@pytest.fixture +def get_request(): + return tflow.tflow( + req=tutils.treq( + method=b'GET', + content=b'', + path=b"/path?a=foo&a=bar&b=baz" + ) + ) + + +@pytest.fixture +def post_request(): + return tflow.tflow( + req=tutils.treq( + method=b'POST', + headers=(), + content=bytes(range(256)) + ) + ) + + +@pytest.fixture +def patch_request(): + return tflow.tflow( + req=tutils.treq(method=b'PATCH', path=b"/path?query=param") + ) + + +@pytest.fixture +def tcp_flow(): + return tflow.ttcpflow() + + +class TestExportCurlCommand: + def test_get(self, get_request): + result = """curl -H 'header:qvalue' -H 'content-length:0' 'http://address:22/path?a=foo&a=bar&b=baz'""" + assert export.curl_command(get_request) == result + + def test_post(self, post_request): + result = "curl -H 'content-length:256' -X POST 'http://address:22/path' --data-binary '{}'".format( + str(bytes(range(256)))[2:-1] + ) + assert export.curl_command(post_request) == result + + def test_patch(self, patch_request): + result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'""" + assert export.curl_command(patch_request) == result + + def test_tcp(self, tcp_flow): + with pytest.raises(exceptions.CommandError): + export.curl_command(tcp_flow) + + +class TestRaw: + def test_get(self, get_request): + assert b"header: qvalue" in export.raw(get_request) + + def test_tcp(self, tcp_flow): + with pytest.raises(exceptions.CommandError): + export.raw(tcp_flow) + + +def qr(f): + with open(f, "rb") as fp: + return fp.read() + + +def test_export(tmpdir): + f = str(tmpdir.join("path")) + e = export.Export() + with taddons.context(): + assert e.formats() == ["curl", "raw"] + with pytest.raises(exceptions.CommandError): + e.file("nonexistent", tflow.tflow(resp=True), f) + + e.file("raw", tflow.tflow(resp=True), f) + assert qr(f) + os.unlink(f) + + e.file("curl", tflow.tflow(resp=True), f) + assert qr(f) + os.unlink(f) + + +def test_clip(tmpdir): + e = export.Export() + with taddons.context(): + with pytest.raises(exceptions.CommandError): + e.clip("nonexistent", tflow.tflow(resp=True)) + + with mock.patch('pyperclip.copy') as pc: + e.clip("raw", tflow.tflow(resp=True)) + assert pc.called + + with mock.patch('pyperclip.copy') as pc: + e.clip("curl", tflow.tflow(resp=True)) + assert pc.called diff --git a/test/mitmproxy/addons/test_onboarding.py b/test/mitmproxy/addons/test_onboarding.py index 63125c23..474e6c3c 100644 --- a/test/mitmproxy/addons/test_onboarding.py +++ b/test/mitmproxy/addons/test_onboarding.py @@ -1,4 +1,8 @@ +import pytest + from mitmproxy.addons import onboarding +from mitmproxy.test import taddons +from mitmproxy import options from .. import tservers @@ -7,10 +11,25 @@ class TestApp(tservers.HTTPProxyTest): return [onboarding.Onboarding()] def test_basic(self): - assert self.app("/").status_code == 200 + with taddons.context() as tctx: + tctx.configure(self.addons()[0]) + assert self.app("/").status_code == 200 - def test_cert(self): - for ext in ["pem", "p12"]: + @pytest.mark.parametrize("ext", ["pem", "p12"]) + def test_cert(self, ext): + with taddons.context() as tctx: + tctx.configure(self.addons()[0]) resp = self.app("/cert/%s" % ext) assert resp.status_code == 200 assert resp.content + + @pytest.mark.parametrize("ext", ["pem", "p12"]) + def test_head(self, ext): + with taddons.context() as tctx: + tctx.configure(self.addons()[0]) + p = self.pathoc() + with p.connect(): + resp = p.request("head:'http://%s/cert/%s'" % (options.APP_HOST, ext)) + assert resp.status_code == 200 + assert "Content-Length" in resp.headers + assert not resp.content diff --git a/test/mitmproxy/addons/test_proxyauth.py b/test/mitmproxy/addons/test_proxyauth.py index 513f3f08..40044bf0 100644 --- a/test/mitmproxy/addons/test_proxyauth.py +++ b/test/mitmproxy/addons/test_proxyauth.py @@ -2,6 +2,7 @@ import binascii import pytest +from unittest import mock from mitmproxy import exceptions from mitmproxy.addons import proxyauth from mitmproxy.test import taddons @@ -41,6 +42,22 @@ def test_configure(): ctx.configure(up, proxyauth=None) assert not up.nonanonymous + with mock.patch('ldap3.Server', return_value="ldap://fake_server:389 - cleartext"): + with mock.patch('ldap3.Connection', return_value="test"): + ctx.configure(up, proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com") + assert up.ldapserver + ctx.configure(up, proxyauth="ldaps:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com") + assert up.ldapserver + + with pytest.raises(exceptions.OptionsError): + ctx.configure(up, proxyauth="ldap:test:test:test") + + with pytest.raises(IndexError): + ctx.configure(up, proxyauth="ldap:fake_serveruid=?dc=example,dc=com:person") + + with pytest.raises(exceptions.OptionsError): + ctx.configure(up, proxyauth="ldapssssssss:fake_server:dn:password:tree") + with pytest.raises(exceptions.OptionsError): ctx.configure( up, @@ -66,11 +83,8 @@ def test_configure(): with pytest.raises(exceptions.OptionsError): ctx.configure(up, proxyauth="any", mode="socks5") - ctx.configure(up, mode="regular") - assert up.mode == "regular" - -def test_check(): +def test_check(monkeypatch): up = proxyauth.ProxyAuth() with taddons.context() as ctx: ctx.configure(up, proxyauth="any", mode="regular") @@ -112,6 +126,22 @@ def test_check(): ) assert not up.check(f) + with mock.patch('ldap3.Server', return_value="ldap://fake_server:389 - cleartext"): + with mock.patch('ldap3.Connection', search="test"): + with mock.patch('ldap3.Connection.search', return_value="test"): + ctx.configure( + up, + proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com" + ) + f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( + "test", "test" + ) + assert up.check(f) + f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( + "", "" + ) + assert not up.check(f) + def test_authenticate(): up = proxyauth.ProxyAuth() diff --git a/test/mitmproxy/addons/test_readfile.py b/test/mitmproxy/addons/test_readfile.py index b30c147b..813aa10e 100644 --- a/test/mitmproxy/addons/test_readfile.py +++ b/test/mitmproxy/addons/test_readfile.py @@ -1,62 +1,104 @@ +import io +from unittest import mock + +import pytest + +import mitmproxy.io +from mitmproxy import exceptions from mitmproxy.addons import readfile from mitmproxy.test import taddons from mitmproxy.test import tflow -from mitmproxy import io -from mitmproxy import exceptions -from unittest import mock -import pytest +@pytest.fixture +def data(): + f = io.BytesIO() + + w = mitmproxy.io.FlowWriter(f) + flows = [ + tflow.tflow(resp=True), + tflow.tflow(err=True), + tflow.ttcpflow(), + tflow.ttcpflow(err=True) + ] + for flow in flows: + w.add(flow) -def write_data(path, corrupt=False): - with open(path, "wb") as tf: - w = io.FlowWriter(tf) - for i in range(3): - f = tflow.tflow(resp=True) - w.add(f) - for i in range(3): - f = tflow.tflow(err=True) - w.add(f) - f = tflow.ttcpflow() - w.add(f) - f = tflow.ttcpflow(err=True) - w.add(f) - if corrupt: - tf.write(b"flibble") - - -@mock.patch('mitmproxy.master.Master.load_flow') -def test_configure(mck, tmpdir): - - rf = readfile.ReadFile() - with taddons.context() as tctx: - tf = str(tmpdir.join("tfile")) - write_data(tf) - tctx.configure(rf, rfile=str(tf)) - assert not mck.called - rf.running() - assert mck.called - - write_data(tf, corrupt=True) - tctx.configure(rf, rfile=str(tf)) - with pytest.raises(exceptions.OptionsError): + f.seek(0) + return f + + +@pytest.fixture +def corrupt_data(): + f = data() + f.seek(0, io.SEEK_END) + f.write(b"qibble") + f.seek(0) + return f + + +class TestReadFile: + @mock.patch('mitmproxy.master.Master.load_flow') + def test_configure(self, mck, tmpdir, data, corrupt_data): + rf = readfile.ReadFile() + with taddons.context() as tctx: + tf = tmpdir.join("tfile") + + tf.write(data.getvalue()) + tctx.configure(rf, rfile=str(tf)) + assert not mck.called rf.running() + assert mck.called + tf.write(corrupt_data.getvalue()) + tctx.configure(rf, rfile=str(tf)) + with pytest.raises(exceptions.OptionsError): + rf.running() -@mock.patch('mitmproxy.master.Master.load_flow') -def test_corruption(mck, tmpdir): + @mock.patch('mitmproxy.master.Master.load_flow') + def test_corrupt(self, mck, corrupt_data): + rf = readfile.ReadFile() + with taddons.context() as tctx: + with pytest.raises(exceptions.FlowReadException): + rf.load_flows(io.BytesIO(b"qibble")) + assert not mck.called + assert len(tctx.master.logs) == 1 - rf = readfile.ReadFile() - with taddons.context() as tctx: - with pytest.raises(exceptions.FlowReadException): - rf.load_flows_file("nonexistent") - assert not mck.called - assert len(tctx.master.logs) == 1 + with pytest.raises(exceptions.FlowReadException): + rf.load_flows(corrupt_data) + assert mck.called + assert len(tctx.master.logs) == 2 + + def test_nonexisting_file(self): + rf = readfile.ReadFile() + with taddons.context() as tctx: + with pytest.raises(exceptions.FlowReadException): + rf.load_flows_from_path("nonexistent") + assert len(tctx.master.logs) == 1 + + +class TestReadFileStdin: + @mock.patch('mitmproxy.master.Master.load_flow') + @mock.patch('sys.stdin') + def test_stdin(self, stdin, load_flow, data, corrupt_data): + rf = readfile.ReadFileStdin() + with taddons.context() as tctx: + stdin.buffer = data + tctx.configure(rf, rfile="-") + assert not load_flow.called + rf.running() + assert load_flow.called - tfc = str(tmpdir.join("tfile")) - write_data(tfc, corrupt=True) + stdin.buffer = corrupt_data + tctx.configure(rf, rfile="-") + with pytest.raises(exceptions.OptionsError): + rf.running() - with pytest.raises(exceptions.FlowReadException): - rf.load_flows_file(tfc) - assert mck.called - assert len(tctx.master.logs) == 2 + @mock.patch('mitmproxy.master.Master.load_flow') + def test_normal(self, load_flow, tmpdir, data): + rf = readfile.ReadFileStdin() + with taddons.context(): + tfile = tmpdir.join("tfile") + tfile.write(data.getvalue()) + rf.load_flows_from_path(str(tfile)) + assert load_flow.called diff --git a/test/mitmproxy/addons/test_readstdin.py b/test/mitmproxy/addons/test_readstdin.py deleted file mode 100644 index 76b01f4f..00000000 --- a/test/mitmproxy/addons/test_readstdin.py +++ /dev/null @@ -1,53 +0,0 @@ - -import io -from mitmproxy.addons import readstdin -from mitmproxy.test import taddons -from mitmproxy.test import tflow -import mitmproxy.io -from unittest import mock - - -def gen_data(corrupt=False): - tf = io.BytesIO() - w = mitmproxy.io.FlowWriter(tf) - for i in range(3): - f = tflow.tflow(resp=True) - w.add(f) - for i in range(3): - f = tflow.tflow(err=True) - w.add(f) - f = tflow.ttcpflow() - w.add(f) - f = tflow.ttcpflow(err=True) - w.add(f) - if corrupt: - tf.write(b"flibble") - tf.seek(0) - return tf - - -class mStdin: - def __init__(self, d): - self.buffer = d - - def isatty(self): - return False - - -@mock.patch('mitmproxy.master.Master.load_flow') -def test_read(m, tmpdir): - rf = readstdin.ReadStdin() - with taddons.context() as tctx: - assert not m.called - rf.running(stdin=mStdin(gen_data())) - assert m.called - - rf.running(stdin=mStdin(None)) - assert tctx.master.logs - tctx.master.clear() - - m.reset_mock() - assert not m.called - rf.running(stdin=mStdin(gen_data(corrupt=True))) - assert m.called - assert tctx.master.logs diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py new file mode 100644 index 00000000..85c2a398 --- /dev/null +++ b/test/mitmproxy/addons/test_save.py @@ -0,0 +1,83 @@ +import pytest + +from mitmproxy.test import taddons +from mitmproxy.test import tflow + +from mitmproxy import io +from mitmproxy import exceptions +from mitmproxy import options +from mitmproxy.addons import save +from mitmproxy.addons import view + + +def test_configure(tmpdir): + sa = save.Save() + with taddons.context(options=options.Options()) as tctx: + with pytest.raises(exceptions.OptionsError): + tctx.configure(sa, save_stream_file=str(tmpdir)) + with pytest.raises(Exception, match="Invalid filter"): + tctx.configure( + sa, save_stream_file=str(tmpdir.join("foo")), save_stream_filter="~~" + ) + tctx.configure(sa, save_stream_filter="foo") + assert sa.filt + tctx.configure(sa, save_stream_filter=None) + assert not sa.filt + + +def rd(p): + x = io.FlowReader(open(p, "rb")) + return list(x.stream()) + + +def test_tcp(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + tctx.configure(sa, save_stream_file=p) + + tt = tflow.ttcpflow() + sa.tcp_start(tt) + sa.tcp_end(tt) + tctx.configure(sa, save_stream_file=None) + assert rd(p) + + +def test_save_command(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + sa.save([tflow.tflow(resp=True)], p) + assert len(rd(p)) == 1 + sa.save([tflow.tflow(resp=True)], p) + assert len(rd(p)) == 1 + sa.save([tflow.tflow(resp=True)], "+" + p) + assert len(rd(p)) == 2 + + with pytest.raises(exceptions.CommandError): + sa.save([tflow.tflow(resp=True)], str(tmpdir)) + + v = view.View() + tctx.master.addons.add(v) + tctx.master.addons.add(sa) + tctx.master.commands.call_args("save.file", ["@shown", p]) + + +def test_simple(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + + tctx.configure(sa, save_stream_file=p) + + f = tflow.tflow(resp=True) + sa.request(f) + sa.response(f) + tctx.configure(sa, save_stream_file=None) + assert rd(p)[0].response + + tctx.configure(sa, save_stream_file="+" + p) + f = tflow.tflow() + sa.request(f) + tctx.configure(sa, save_stream_file=None) + assert not rd(p)[1].response diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index 16827488..dd5349cb 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -1,155 +1,103 @@ import traceback import sys import time -import re -import watchdog.events import pytest +from unittest import mock from mitmproxy.test import tflow from mitmproxy.test import tutils from mitmproxy.test import taddons +from mitmproxy import addonmanager from mitmproxy import exceptions -from mitmproxy import options -from mitmproxy import proxy -from mitmproxy import master -from mitmproxy import utils from mitmproxy.addons import script -from ...conftest import skip_not_windows - -def test_scriptenv(): +def test_load_script(): with taddons.context() as tctx: - with script.scriptenv("path", []): - raise SystemExit - assert tctx.master.has_log("exited", "error") - - tctx.master.clear() - with script.scriptenv("path", []): - raise ValueError("fooo") - assert tctx.master.has_log("fooo", "error") - - -class Called: - def __init__(self): - self.called = False - - def __call__(self, *args, **kwargs): - self.called = True - - -def test_reloadhandler(): - rh = script.ReloadHandler(Called()) - assert not rh.filter(watchdog.events.DirCreatedEvent("path")) - assert not rh.filter(watchdog.events.FileModifiedEvent("/foo/.bar")) - assert not rh.filter(watchdog.events.FileModifiedEvent("/foo/bar")) - assert rh.filter(watchdog.events.FileModifiedEvent("/foo/bar.py")) - - assert not rh.callback.called - rh.on_modified(watchdog.events.FileModifiedEvent("/foo/bar")) - assert not rh.callback.called - rh.on_modified(watchdog.events.FileModifiedEvent("/foo/bar.py")) - assert rh.callback.called - rh.callback.called = False - - rh.on_created(watchdog.events.FileCreatedEvent("foo")) - assert not rh.callback.called - rh.on_created(watchdog.events.FileCreatedEvent("foo.py")) - assert rh.callback.called - - -class TestParseCommand: - def test_empty_command(self): - with pytest.raises(ValueError): - script.parse_command("") - - with pytest.raises(ValueError): - script.parse_command(" ") - - def test_no_script_file(self, tmpdir): - with pytest.raises(Exception, match="not found"): - script.parse_command("notfound") - - with pytest.raises(Exception, match="Not a file"): - script.parse_command(str(tmpdir)) - - def test_parse_args(self): - with utils.chdir(tutils.test_data.dirname): - assert script.parse_command( - "mitmproxy/data/addonscripts/recorder.py" - ) == ("mitmproxy/data/addonscripts/recorder.py", []) - assert script.parse_command( - "mitmproxy/data/addonscripts/recorder.py foo bar" - ) == ("mitmproxy/data/addonscripts/recorder.py", ["foo", "bar"]) - assert script.parse_command( - "mitmproxy/data/addonscripts/recorder.py 'foo bar'" - ) == ("mitmproxy/data/addonscripts/recorder.py", ["foo bar"]) - - @skip_not_windows - def test_parse_windows(self): - with utils.chdir(tutils.test_data.dirname): - assert script.parse_command( - "mitmproxy/data\\addonscripts\\recorder.py" - ) == ("mitmproxy/data\\addonscripts\\recorder.py", []) - assert script.parse_command( - "mitmproxy/data\\addonscripts\\recorder.py 'foo \\ bar'" - ) == ("mitmproxy/data\\addonscripts\\recorder.py", ['foo \\ bar']) + ns = script.load_script( + tctx.ctx(), + tutils.test_data.path( + "mitmproxy/data/addonscripts/recorder/recorder.py" + ) + ) + assert ns.addons + ns = script.load_script( + tctx.ctx(), + "nonexistent" + ) + assert not ns -def test_load_script(): - ns = script.load_script( - tutils.test_data.path( - "mitmproxy/data/addonscripts/recorder.py" - ), [] - ) - assert ns.start + +def test_script_print_stdout(): + with taddons.context() as tctx: + with mock.patch('mitmproxy.ctx.log.warn') as mock_warn: + with addonmanager.safecall(): + ns = script.load_script( + tctx.ctx(), + tutils.test_data.path( + "mitmproxy/data/addonscripts/print.py" + ) + ) + ns.load(addonmanager.Loader(tctx.master)) + mock_warn.assert_called_once_with("stdoutprint") class TestScript: - def test_simple(self): + def test_notfound(self): with taddons.context(): + with pytest.raises(exceptions.OptionsError): + script.Script("nonexistent") + + def test_simple(self): + with taddons.context() as tctx: sc = script.Script( tutils.test_data.path( - "mitmproxy/data/addonscripts/recorder.py" + "mitmproxy/data/addonscripts/recorder/recorder.py" ) ) - sc.load_script() - assert sc.ns.call_log[0][0:2] == ("solo", "start") + tctx.master.addons.add(sc) + tctx.configure(sc) + sc.tick() + + rec = tctx.master.addons.get("recorder") - sc.ns.call_log = [] + assert rec.call_log[0][0:2] == ("recorder", "load") + + rec.call_log = [] f = tflow.tflow(resp=True) - sc.request(f) + tctx.master.addons.trigger("request", f) - recf = sc.ns.call_log[0] - assert recf[1] == "request" + assert rec.call_log[0][1] == "request" def test_reload(self, tmpdir): with taddons.context() as tctx: f = tmpdir.join("foo.py") f.ensure(file=True) + f.write("\n") sc = script.Script(str(f)) tctx.configure(sc) - for _ in range(100): - f.write(".") + sc.tick() + for _ in range(3): + sc.last_load, sc.last_mtime = 0, 0 sc.tick() time.sleep(0.1) - if tctx.master.logs: - return - raise AssertionError("Change event not detected.") + tctx.master.has_log("Loading") def test_exception(self): with taddons.context() as tctx: sc = script.Script( tutils.test_data.path("mitmproxy/data/addonscripts/error.py") ) - sc.start(tctx.options) + tctx.master.addons.add(sc) + tctx.configure(sc) + sc.tick() + f = tflow.tflow(resp=True) - sc.request(f) - assert tctx.master.logs[0].level == "error" - assert len(tctx.master.logs[0].msg.splitlines()) == 6 - assert re.search(r'addonscripts[\\/]error.py", line \d+, in request', tctx.master.logs[0].msg) - assert re.search(r'addonscripts[\\/]error.py", line \d+, in mkerr', tctx.master.logs[0].msg) - assert tctx.master.logs[0].msg.endswith("ValueError: Error!\n") + tctx.master.addons.trigger("request", f) + + tctx.master.has_log("ValueError: Error!") + tctx.master.has_log("error.py") def test_addon(self): with taddons.context() as tctx: @@ -158,10 +106,11 @@ class TestScript: "mitmproxy/data/addonscripts/addon.py" ) ) - sc.start(tctx.options) + tctx.master.addons.add(sc) tctx.configure(sc) + sc.tick() assert sc.ns.event_log == [ - 'scriptstart', 'addonstart', 'addonconfigure' + 'scriptload', 'addonload', 'scriptconfigure', 'addonconfigure' ] @@ -176,123 +125,131 @@ class TestCutTraceback: self.raise_(4) except RuntimeError: tb = sys.exc_info()[2] - tb_cut = script.cut_traceback(tb, "test_simple") + tb_cut = addonmanager.cut_traceback(tb, "test_simple") assert len(traceback.extract_tb(tb_cut)) == 5 - tb_cut2 = script.cut_traceback(tb, "nonexistent") + tb_cut2 = addonmanager.cut_traceback(tb, "nonexistent") assert len(traceback.extract_tb(tb_cut2)) == len(traceback.extract_tb(tb)) class TestScriptLoader: - def test_run_once(self): - o = options.Options(scripts=[]) - m = master.Master(o, proxy.DummyServer()) - sl = script.ScriptLoader() - m.addons.add(sl) - - f = tflow.tflow(resp=True) - with m.handlecontext(): - sc = sl.run_once( - tutils.test_data.path( - "mitmproxy/data/addonscripts/recorder.py" - ), [f] - ) - evts = [i[1] for i in sc.ns.call_log] - assert evts == ['start', 'requestheaders', 'request', 'responseheaders', 'response', 'done'] - - f = tflow.tflow(resp=True) - with m.handlecontext(): - with pytest.raises(Exception, match="file not found"): - sl.run_once("nonexistent", [f]) - - def test_simple(self): - o = options.Options(scripts=[]) - m = master.Master(o, proxy.DummyServer()) + def test_script_run(self): + rp = tutils.test_data.path( + "mitmproxy/data/addonscripts/recorder/recorder.py" + ) sc = script.ScriptLoader() - m.addons.add(sc) - assert len(m.addons) == 1 - o.update( - scripts = [ - tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py") + with taddons.context() as tctx: + sc.script_run([tflow.tflow(resp=True)], rp) + debug = [i.msg for i in tctx.master.logs if i.level == "debug"] + assert debug == [ + 'recorder load', 'recorder running', 'recorder configure', + 'recorder tick', + 'recorder requestheaders', 'recorder request', + 'recorder responseheaders', 'recorder response' ] - ) - assert len(m.addons) == 2 - o.update(scripts = []) - assert len(m.addons) == 1 - def test_dupes(self): + def test_script_run_nonexistent(self): + sc = script.ScriptLoader() + with taddons.context(): + with pytest.raises(exceptions.CommandError): + sc.script_run([tflow.tflow(resp=True)], "/") + + def test_simple(self): sc = script.ScriptLoader() with taddons.context() as tctx: tctx.master.addons.add(sc) - with pytest.raises(exceptions.OptionsError): - tctx.configure( - sc, - scripts = ["one", "one"] - ) + sc.running() + assert len(tctx.master.addons) == 1 + tctx.master.options.update( + scripts = [ + tutils.test_data.path( + "mitmproxy/data/addonscripts/recorder/recorder.py" + ) + ] + ) + assert len(tctx.master.addons) == 1 + assert len(sc.addons) == 1 + tctx.master.options.update(scripts = []) + assert len(tctx.master.addons) == 1 + assert len(sc.addons) == 0 - def test_nonexistent(self): + def test_dupes(self): sc = script.ScriptLoader() with taddons.context() as tctx: tctx.master.addons.add(sc) with pytest.raises(exceptions.OptionsError): tctx.configure( sc, - scripts = ["nonexistent"] + scripts = ["one", "one"] ) def test_order(self): - rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py") + rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder") sc = script.ScriptLoader() + sc.is_running = True with taddons.context() as tctx: - tctx.master.addons.add(sc) - sc.running() tctx.configure( sc, scripts = [ - "%s %s" % (rec, "a"), - "%s %s" % (rec, "b"), - "%s %s" % (rec, "c"), + "%s/a.py" % rec, + "%s/b.py" % rec, + "%s/c.py" % rec, ] ) + tctx.master.addons.invoke_addon(sc, "tick") debug = [i.msg for i in tctx.master.logs if i.level == "debug"] assert debug == [ - 'a start', - 'a configure', + 'a load', 'a running', + 'a configure', + 'a tick', - 'b start', - 'b configure', + 'b load', 'b running', + 'b configure', + 'b tick', - 'c start', - 'c configure', + 'c load', 'c running', + 'c configure', + 'c tick', ] + tctx.master.logs = [] tctx.configure( sc, scripts = [ - "%s %s" % (rec, "c"), - "%s %s" % (rec, "a"), - "%s %s" % (rec, "b"), + "%s/c.py" % rec, + "%s/a.py" % rec, + "%s/b.py" % rec, ] ) + debug = [i.msg for i in tctx.master.logs if i.level == "debug"] - assert debug == [] + assert debug == [ + 'c configure', + 'a configure', + 'b configure', + ] tctx.master.logs = [] tctx.configure( sc, scripts = [ - "%s %s" % (rec, "x"), - "%s %s" % (rec, "a"), + "%s/e.py" % rec, + "%s/a.py" % rec, ] ) + tctx.master.addons.invoke_addon(sc, "tick") + debug = [i.msg for i in tctx.master.logs if i.level == "debug"] assert debug == [ 'c done', 'b done', - 'x start', - 'x configure', - 'x running', + 'a configure', + 'e load', + 'e running', + 'e configure', + 'e tick', + 'a tick', ] diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py index 02642c35..3ceab3fa 100644 --- a/test/mitmproxy/addons/test_serverplayback.py +++ b/test/mitmproxy/addons/test_serverplayback.py @@ -6,7 +6,6 @@ from mitmproxy.test import tflow import mitmproxy.test.tutils from mitmproxy.addons import serverplayback -from mitmproxy import options from mitmproxy import exceptions from mitmproxy import io @@ -17,12 +16,24 @@ def tdump(path, flows): w.add(i) +def test_load_file(tmpdir): + s = serverplayback.ServerPlayback() + with taddons.context(): + fpath = str(tmpdir.join("flows")) + tdump(fpath, [tflow.tflow(resp=True)]) + s.load_file(fpath) + assert s.flowmap + with pytest.raises(exceptions.CommandError): + s.load_file("/nonexistent") + + def test_config(tmpdir): s = serverplayback.ServerPlayback() with taddons.context() as tctx: fpath = str(tmpdir.join("flows")) tdump(fpath, [tflow.tflow(resp=True)]) tctx.configure(s, server_replay=[fpath]) + s.configured = False with pytest.raises(exceptions.OptionsError): tctx.configure(s, server_replay=[str(tmpdir)]) @@ -39,86 +50,88 @@ def test_tick(): def test_server_playback(): sp = serverplayback.ServerPlayback() - sp.configure(options.Options(), []) - f = tflow.tflow(resp=True) + with taddons.context() as tctx: + tctx.configure(sp) + f = tflow.tflow(resp=True) - assert not sp.flowmap + assert not sp.flowmap - sp.load([f]) - assert sp.flowmap - assert sp.next_flow(f) - assert not sp.flowmap + sp.load_flows([f]) + assert sp.flowmap + assert sp.next_flow(f) + assert not sp.flowmap - sp.load([f]) - assert sp.flowmap - sp.clear() - assert not sp.flowmap + sp.load_flows([f]) + assert sp.flowmap + sp.clear() + assert not sp.flowmap def test_ignore_host(): sp = serverplayback.ServerPlayback() - sp.configure(options.Options(server_replay_ignore_host=True), []) + with taddons.context() as tctx: + tctx.configure(sp, server_replay_ignore_host=True) - r = tflow.tflow(resp=True) - r2 = tflow.tflow(resp=True) + r = tflow.tflow(resp=True) + r2 = tflow.tflow(resp=True) - r.request.host = "address" - r2.request.host = "address" - assert sp._hash(r) == sp._hash(r2) - r2.request.host = "wrong_address" - assert sp._hash(r) == sp._hash(r2) + r.request.host = "address" + r2.request.host = "address" + assert sp._hash(r) == sp._hash(r2) + r2.request.host = "wrong_address" + assert sp._hash(r) == sp._hash(r2) def test_ignore_content(): s = serverplayback.ServerPlayback() - s.configure(options.Options(server_replay_ignore_content=False), []) + with taddons.context() as tctx: + tctx.configure(s, server_replay_ignore_content=False) - r = tflow.tflow(resp=True) - r2 = tflow.tflow(resp=True) + r = tflow.tflow(resp=True) + r2 = tflow.tflow(resp=True) - r.request.content = b"foo" - r2.request.content = b"foo" - assert s._hash(r) == s._hash(r2) - r2.request.content = b"bar" - assert not s._hash(r) == s._hash(r2) + r.request.content = b"foo" + r2.request.content = b"foo" + assert s._hash(r) == s._hash(r2) + r2.request.content = b"bar" + assert not s._hash(r) == s._hash(r2) - s.configure(options.Options(server_replay_ignore_content=True), []) - r = tflow.tflow(resp=True) - r2 = tflow.tflow(resp=True) - r.request.content = b"foo" - r2.request.content = b"foo" - assert s._hash(r) == s._hash(r2) - r2.request.content = b"bar" - assert s._hash(r) == s._hash(r2) - r2.request.content = b"" - assert s._hash(r) == s._hash(r2) - r2.request.content = None - assert s._hash(r) == s._hash(r2) + tctx.configure(s, server_replay_ignore_content=True) + r = tflow.tflow(resp=True) + r2 = tflow.tflow(resp=True) + r.request.content = b"foo" + r2.request.content = b"foo" + assert s._hash(r) == s._hash(r2) + r2.request.content = b"bar" + assert s._hash(r) == s._hash(r2) + r2.request.content = b"" + assert s._hash(r) == s._hash(r2) + r2.request.content = None + assert s._hash(r) == s._hash(r2) def test_ignore_content_wins_over_params(): s = serverplayback.ServerPlayback() - s.configure( - options.Options( + with taddons.context() as tctx: + tctx.configure( + s, server_replay_ignore_content=True, server_replay_ignore_payload_params=[ "param1", "param2" ] - ), - [] - ) - # NOTE: parameters are mutually exclusive in options + ) - r = tflow.tflow(resp=True) - r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" - r.request.content = b"paramx=y" + # NOTE: parameters are mutually exclusive in options + r = tflow.tflow(resp=True) + r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" + r.request.content = b"paramx=y" - r2 = tflow.tflow(resp=True) - r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" - r2.request.content = b"paramx=x" + r2 = tflow.tflow(resp=True) + r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" + r2.request.content = b"paramx=x" - # same parameters - assert s._hash(r) == s._hash(r2) + # same parameters + assert s._hash(r) == s._hash(r2) def test_ignore_payload_params_other_content_type(): @@ -147,136 +160,139 @@ def test_ignore_payload_params_other_content_type(): def test_hash(): s = serverplayback.ServerPlayback() - s.configure(options.Options(), []) + with taddons.context() as tctx: + tctx.configure(s) - r = tflow.tflow() - r2 = tflow.tflow() + r = tflow.tflow() + r2 = tflow.tflow() - assert s._hash(r) - assert s._hash(r) == s._hash(r2) - r.request.headers["foo"] = "bar" - assert s._hash(r) == s._hash(r2) - r.request.path = "voing" - assert s._hash(r) != s._hash(r2) + assert s._hash(r) + assert s._hash(r) == s._hash(r2) + r.request.headers["foo"] = "bar" + assert s._hash(r) == s._hash(r2) + r.request.path = "voing" + assert s._hash(r) != s._hash(r2) - r.request.path = "path?blank_value" - r2.request.path = "path?" - assert s._hash(r) != s._hash(r2) + r.request.path = "path?blank_value" + r2.request.path = "path?" + assert s._hash(r) != s._hash(r2) def test_headers(): s = serverplayback.ServerPlayback() - s.configure(options.Options(server_replay_use_headers=["foo"]), []) + with taddons.context() as tctx: + tctx.configure(s, server_replay_use_headers=["foo"]) - r = tflow.tflow(resp=True) - r.request.headers["foo"] = "bar" - r2 = tflow.tflow(resp=True) - assert not s._hash(r) == s._hash(r2) - r2.request.headers["foo"] = "bar" - assert s._hash(r) == s._hash(r2) - r2.request.headers["oink"] = "bar" - assert s._hash(r) == s._hash(r2) + r = tflow.tflow(resp=True) + r.request.headers["foo"] = "bar" + r2 = tflow.tflow(resp=True) + assert not s._hash(r) == s._hash(r2) + r2.request.headers["foo"] = "bar" + assert s._hash(r) == s._hash(r2) + r2.request.headers["oink"] = "bar" + assert s._hash(r) == s._hash(r2) - r = tflow.tflow(resp=True) - r2 = tflow.tflow(resp=True) - assert s._hash(r) == s._hash(r2) + r = tflow.tflow(resp=True) + r2 = tflow.tflow(resp=True) + assert s._hash(r) == s._hash(r2) def test_load(): s = serverplayback.ServerPlayback() - s.configure(options.Options(), []) + with taddons.context() as tctx: + tctx.configure(s) - r = tflow.tflow(resp=True) - r.request.headers["key"] = "one" + r = tflow.tflow(resp=True) + r.request.headers["key"] = "one" - r2 = tflow.tflow(resp=True) - r2.request.headers["key"] = "two" + r2 = tflow.tflow(resp=True) + r2.request.headers["key"] = "two" - s.load([r, r2]) + s.load_flows([r, r2]) - assert s.count() == 2 + assert s.count() == 2 - n = s.next_flow(r) - assert n.request.headers["key"] == "one" - assert s.count() == 1 + n = s.next_flow(r) + assert n.request.headers["key"] == "one" + assert s.count() == 1 - n = s.next_flow(r) - assert n.request.headers["key"] == "two" - assert not s.flowmap - assert s.count() == 0 + n = s.next_flow(r) + assert n.request.headers["key"] == "two" + assert not s.flowmap + assert s.count() == 0 - assert not s.next_flow(r) + assert not s.next_flow(r) def test_load_with_server_replay_nopop(): s = serverplayback.ServerPlayback() - s.configure(options.Options(server_replay_nopop=True), []) + with taddons.context() as tctx: + tctx.configure(s, server_replay_nopop=True) - r = tflow.tflow(resp=True) - r.request.headers["key"] = "one" + r = tflow.tflow(resp=True) + r.request.headers["key"] = "one" - r2 = tflow.tflow(resp=True) - r2.request.headers["key"] = "two" + r2 = tflow.tflow(resp=True) + r2.request.headers["key"] = "two" - s.load([r, r2]) + s.load_flows([r, r2]) - assert s.count() == 2 - s.next_flow(r) - assert s.count() == 2 + assert s.count() == 2 + s.next_flow(r) + assert s.count() == 2 def test_ignore_params(): s = serverplayback.ServerPlayback() - s.configure( - options.Options( + with taddons.context() as tctx: + tctx.configure( + s, server_replay_ignore_params=["param1", "param2"] - ), - [] - ) + ) - r = tflow.tflow(resp=True) - r.request.path = "/test?param1=1" - r2 = tflow.tflow(resp=True) - r2.request.path = "/test" - assert s._hash(r) == s._hash(r2) - r2.request.path = "/test?param1=2" - assert s._hash(r) == s._hash(r2) - r2.request.path = "/test?param2=1" - assert s._hash(r) == s._hash(r2) - r2.request.path = "/test?param3=2" - assert not s._hash(r) == s._hash(r2) + r = tflow.tflow(resp=True) + r.request.path = "/test?param1=1" + r2 = tflow.tflow(resp=True) + r2.request.path = "/test" + assert s._hash(r) == s._hash(r2) + r2.request.path = "/test?param1=2" + assert s._hash(r) == s._hash(r2) + r2.request.path = "/test?param2=1" + assert s._hash(r) == s._hash(r2) + r2.request.path = "/test?param3=2" + assert not s._hash(r) == s._hash(r2) def thash(r, r2, setter): s = serverplayback.ServerPlayback() - s.configure( - options.Options( + with taddons.context() as tctx: + s = serverplayback.ServerPlayback() + tctx.configure( + s, server_replay_ignore_payload_params=["param1", "param2"] - ), - [] - ) - - setter(r, paramx="x", param1="1") - - setter(r2, paramx="x", param1="1") - # same parameters - assert s._hash(r) == s._hash(r2) - # ignored parameters != - setter(r2, paramx="x", param1="2") - assert s._hash(r) == s._hash(r2) - # missing parameter - setter(r2, paramx="x") - assert s._hash(r) == s._hash(r2) - # ignorable parameter added - setter(r2, paramx="x", param1="2") - assert s._hash(r) == s._hash(r2) - # not ignorable parameter changed - setter(r2, paramx="y", param1="1") - assert not s._hash(r) == s._hash(r2) - # not ignorable parameter missing - setter(r2, param1="1") - r2.request.content = b"param1=1" - assert not s._hash(r) == s._hash(r2) + ) + + setter(r, paramx="x", param1="1") + + setter(r2, paramx="x", param1="1") + # same parameters + assert s._hash(r) == s._hash(r2) + # ignored parameters != + setter(r2, paramx="x", param1="2") + assert s._hash(r) == s._hash(r2) + # missing parameter + setter(r2, paramx="x") + assert s._hash(r) == s._hash(r2) + # ignorable parameter added + setter(r2, paramx="x", param1="2") + assert s._hash(r) == s._hash(r2) + # not ignorable parameter changed + setter(r2, paramx="y", param1="1") + assert not s._hash(r) == s._hash(r2) + # not ignorable parameter missing + setter(r2, param1="1") + r2.request.content = b"param1=1" + assert not s._hash(r) == s._hash(r2) def test_ignore_payload_params(): @@ -319,7 +335,7 @@ def test_server_playback_full(): f = tflow.tflow() f.response = mitmproxy.test.tutils.tresp(content=f.request.content) - s.load([f, f]) + s.load_flows([f, f]) tf = tflow.tflow() assert not tf.response @@ -352,7 +368,7 @@ def test_server_playback_kill(): f = tflow.tflow() f.response = mitmproxy.test.tutils.tresp(content=f.request.content) - s.load([f]) + s.load_flows([f]) f = tflow.tflow() f.request.host = "nonexistent" diff --git a/test/mitmproxy/addons/test_stickycookie.py b/test/mitmproxy/addons/test_stickycookie.py index 9092e09b..f77d019d 100644 --- a/test/mitmproxy/addons/test_stickycookie.py +++ b/test/mitmproxy/addons/test_stickycookie.py @@ -110,8 +110,8 @@ class TestStickyCookie: f.response.headers["Set-Cookie"] = c2 sc.response(f) googlekey = list(sc.jar.keys())[0] - assert len(sc.jar[googlekey].keys()) == 1 - assert list(sc.jar[googlekey]["somecookie"].items())[0][1] == "newvalue" + assert len(sc.jar[googlekey]) == 1 + assert sc.jar[googlekey]["somecookie"] == "newvalue" def test_response_delete(self): sc = stickycookie.StickyCookie() diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py deleted file mode 100644 index 3f78521c..00000000 --- a/test/mitmproxy/addons/test_streamfile.py +++ /dev/null @@ -1,60 +0,0 @@ -import pytest - -from mitmproxy.test import taddons -from mitmproxy.test import tflow - -from mitmproxy import io -from mitmproxy import exceptions -from mitmproxy import options -from mitmproxy.addons import streamfile - - -def test_configure(tmpdir): - sa = streamfile.StreamFile() - with taddons.context(options=options.Options()) as tctx: - with pytest.raises(exceptions.OptionsError): - tctx.configure(sa, streamfile=str(tmpdir)) - with pytest.raises(Exception, match="Invalid filter"): - tctx.configure(sa, streamfile=str(tmpdir.join("foo")), filtstr="~~") - tctx.configure(sa, filtstr="foo") - assert sa.filt - tctx.configure(sa, filtstr=None) - assert not sa.filt - - -def rd(p): - x = io.FlowReader(open(p, "rb")) - return list(x.stream()) - - -def test_tcp(tmpdir): - sa = streamfile.StreamFile() - with taddons.context() as tctx: - p = str(tmpdir.join("foo")) - tctx.configure(sa, streamfile=p) - - tt = tflow.ttcpflow() - sa.tcp_start(tt) - sa.tcp_end(tt) - tctx.configure(sa, streamfile=None) - assert rd(p) - - -def test_simple(tmpdir): - sa = streamfile.StreamFile() - with taddons.context() as tctx: - p = str(tmpdir.join("foo")) - - tctx.configure(sa, streamfile=p) - - f = tflow.tflow(resp=True) - sa.request(f) - sa.response(f) - tctx.configure(sa, streamfile=None) - assert rd(p)[0].response - - tctx.configure(sa, streamfile="+" + p) - f = tflow.tflow() - sa.request(f) - tctx.configure(sa, streamfile=None) - assert not rd(p)[1].response diff --git a/test/mitmproxy/addons/test_termstatus.py b/test/mitmproxy/addons/test_termstatus.py index 7becc857..2debaff5 100644 --- a/test/mitmproxy/addons/test_termstatus.py +++ b/test/mitmproxy/addons/test_termstatus.py @@ -5,6 +5,7 @@ from mitmproxy.test import taddons def test_configure(): ts = termstatus.TermStatus() with taddons.context() as ctx: + ctx.configure(ts, server=False) ts.running() assert not ctx.master.logs ctx.configure(ts, server=True) diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py index b7842314..6da13650 100644 --- a/test/mitmproxy/addons/test_view.py +++ b/test/mitmproxy/addons/test_view.py @@ -4,7 +4,8 @@ from mitmproxy.test import tflow from mitmproxy.addons import view from mitmproxy import flowfilter -from mitmproxy import options +from mitmproxy import exceptions +from mitmproxy import io from mitmproxy.test import taddons @@ -25,12 +26,12 @@ def test_order_refresh(): v.sig_view_refresh.connect(save) tf = tflow.tflow(resp=True) - with taddons.context(options=options.Options()) as tctx: + with taddons.context() as tctx: tctx.configure(v, console_order="time") - v.add(tf) + v.add([tf]) tf.request.timestamp_start = 1 assert not sargs - v.update(tf) + v.update([tf]) assert sargs @@ -130,9 +131,154 @@ def test_filter(): assert len(v) == 4 +def tdump(path, flows): + w = io.FlowWriter(open(path, "wb")) + for i in flows: + w.add(i) + + +def test_create(): + v = view.View() + with taddons.context(): + v.create("get", "http://foo.com") + assert len(v) == 1 + assert v[0].request.url == "http://foo.com/" + v.create("get", "http://foo.com") + assert len(v) == 2 + + +def test_orders(): + v = view.View() + with taddons.context(): + assert v.order_options() + + +def test_load(tmpdir): + path = str(tmpdir.join("path")) + v = view.View() + with taddons.context() as tctx: + tctx.master.addons.add(v) + tdump( + path, + [ + tflow.tflow(resp=True), + tflow.tflow(resp=True) + ] + ) + v.load_file(path) + assert len(v) == 2 + v.load_file(path) + assert len(v) == 4 + + +def test_resolve(): + v = view.View() + with taddons.context() as tctx: + assert tctx.command(v.resolve, "@all") == [] + assert tctx.command(v.resolve, "@focus") == [] + assert tctx.command(v.resolve, "@shown") == [] + assert tctx.command(v.resolve, "@hidden") == [] + assert tctx.command(v.resolve, "@marked") == [] + assert tctx.command(v.resolve, "@unmarked") == [] + assert tctx.command(v.resolve, "~m get") == [] + v.request(tft(method="get")) + assert len(tctx.command(v.resolve, "~m get")) == 1 + assert len(tctx.command(v.resolve, "@focus")) == 1 + assert len(tctx.command(v.resolve, "@all")) == 1 + assert len(tctx.command(v.resolve, "@shown")) == 1 + assert len(tctx.command(v.resolve, "@unmarked")) == 1 + assert tctx.command(v.resolve, "@hidden") == [] + assert tctx.command(v.resolve, "@marked") == [] + v.request(tft(method="put")) + assert len(tctx.command(v.resolve, "@focus")) == 1 + assert len(tctx.command(v.resolve, "@shown")) == 2 + assert len(tctx.command(v.resolve, "@all")) == 2 + assert tctx.command(v.resolve, "@hidden") == [] + assert tctx.command(v.resolve, "@marked") == [] + + v.request(tft(method="get")) + v.request(tft(method="put")) + + f = flowfilter.parse("~m get") + v.set_filter(f) + v[0].marked = True + + def m(l): + return [i.request.method for i in l] + + assert m(tctx.command(v.resolve, "~m get")) == ["GET", "GET"] + assert m(tctx.command(v.resolve, "~m put")) == ["PUT", "PUT"] + assert m(tctx.command(v.resolve, "@shown")) == ["GET", "GET"] + assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"] + assert m(tctx.command(v.resolve, "@marked")) == ["GET"] + assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"] + assert m(tctx.command(v.resolve, "@all")) == ["GET", "PUT", "GET", "PUT"] + + with pytest.raises(exceptions.CommandError, match="Invalid flow filter"): + tctx.command(v.resolve, "~") + + +def test_movement(): + v = view.View() + with taddons.context(): + v.go(0) + v.add([ + tflow.tflow(), + tflow.tflow(), + tflow.tflow(), + tflow.tflow(), + tflow.tflow(), + ]) + assert v.focus.index == 0 + v.go(-1) + assert v.focus.index == 4 + v.go(0) + assert v.focus.index == 0 + v.go(1) + assert v.focus.index == 1 + v.go(999) + assert v.focus.index == 4 + v.go(-999) + assert v.focus.index == 0 + + v.focus_next() + assert v.focus.index == 1 + v.focus_prev() + assert v.focus.index == 0 + + +def test_duplicate(): + v = view.View() + with taddons.context(): + f = [ + tflow.tflow(), + tflow.tflow(), + ] + v.add(f) + assert len(v) == 2 + v.duplicate(f) + assert len(v) == 4 + assert v.focus.index == 2 + + +def test_setgetval(): + v = view.View() + with taddons.context(): + f = tflow.tflow() + v.add([f]) + v.setvalue([f], "key", "value") + assert v.getvalue(f, "key", "default") == "value" + assert v.getvalue(f, "unknow", "default") == "default" + + v.setvalue_toggle([f], "key") + assert v.getvalue(f, "key", "default") == "true" + v.setvalue_toggle([f], "key") + assert v.getvalue(f, "key", "default") == "false" + + def test_order(): v = view.View() - with taddons.context(options=options.Options()) as tctx: + with taddons.context() as tctx: v.request(tft(method="get", start=1)) v.request(tft(method="put", start=2)) v.request(tft(method="get", start=3)) @@ -180,14 +326,14 @@ def test_update(): assert f in v f.request.method = "put" - v.update(f) + v.update([f]) assert f not in v f.request.method = "get" - v.update(f) + v.update([f]) assert f in v - v.update(f) + v.update([f]) assert f in v @@ -226,7 +372,7 @@ def test_signals(): assert not any([rec_add, rec_update, rec_remove, rec_refresh]) # Simple add - v.add(tft()) + v.add([tft()]) assert rec_add assert not any([rec_update, rec_remove, rec_refresh]) @@ -241,14 +387,14 @@ def test_signals(): # An update that results in a flow being added to the view clearrec() v[0].request.method = "PUT" - v.update(v[0]) + v.update([v[0]]) assert rec_remove assert not any([rec_update, rec_refresh, rec_add]) # An update that does not affect the view just sends update v.set_filter(flowfilter.parse("~m put")) clearrec() - v.update(v[0]) + v.update([v[0]]) assert rec_update assert not any([rec_remove, rec_refresh, rec_add]) @@ -257,33 +403,33 @@ def test_signals(): v.set_filter(flowfilter.parse("~m get")) assert not len(v) clearrec() - v.update(f) + v.update([f]) assert not any([rec_add, rec_update, rec_remove, rec_refresh]) def test_focus_follow(): v = view.View() - with taddons.context(options=options.Options()) as tctx: - tctx.configure(v, console_focus_follow=True, filter="~m get") + with taddons.context() as tctx: + tctx.configure(v, console_focus_follow=True, view_filter="~m get") - v.add(tft(start=5)) + v.add([tft(start=5)]) assert v.focus.index == 0 - v.add(tft(start=4)) + v.add([tft(start=4)]) assert v.focus.index == 0 assert v.focus.flow.request.timestamp_start == 4 - v.add(tft(start=7)) + v.add([tft(start=7)]) assert v.focus.index == 2 assert v.focus.flow.request.timestamp_start == 7 mod = tft(method="put", start=6) - v.add(mod) + v.add([mod]) assert v.focus.index == 2 assert v.focus.flow.request.timestamp_start == 7 mod.request.method = "GET" - v.update(mod) + v.update([mod]) assert v.focus.index == 2 assert v.focus.flow.request.timestamp_start == 6 @@ -291,7 +437,7 @@ def test_focus_follow(): def test_focus(): # Special case - initialising with a view that already contains data v = view.View() - v.add(tft()) + v.add([tft()]) f = view.Focus(v) assert f.index is 0 assert f.flow is v[0] @@ -302,7 +448,7 @@ def test_focus(): assert f.index is None assert f.flow is None - v.add(tft(start=1)) + v.add([tft(start=1)]) assert f.index == 0 assert f.flow is v[0] @@ -312,11 +458,11 @@ def test_focus(): with pytest.raises(ValueError): f.__setattr__("index", 99) - v.add(tft(start=0)) + v.add([tft(start=0)]) assert f.index == 1 assert f.flow is v[1] - v.add(tft(start=2)) + v.add([tft(start=2)]) assert f.index == 1 assert f.flow is v[1] @@ -324,22 +470,25 @@ def test_focus(): assert f.index == 0 f.index = 1 - v.remove(v[1]) + v.remove([v[1]]) + v[1].intercept() assert f.index == 1 assert f.flow is v[1] - v.remove(v[1]) + v.remove([v[1]]) assert f.index == 0 assert f.flow is v[0] - v.remove(v[0]) + v.remove([v[0]]) assert f.index is None assert f.flow is None - v.add(tft(method="get", start=0)) - v.add(tft(method="get", start=1)) - v.add(tft(method="put", start=2)) - v.add(tft(method="get", start=3)) + v.add([ + tft(method="get", start=0), + tft(method="get", start=1), + tft(method="put", start=2), + tft(method="get", start=3), + ]) f.flow = v[2] assert f.flow.request.method == "PUT" @@ -359,16 +508,16 @@ def test_settings(): with pytest.raises(KeyError): v.settings[f] - v.add(f) + v.add([f]) v.settings[f]["foo"] = "bar" assert v.settings[f]["foo"] == "bar" assert len(list(v.settings)) == 1 - v.remove(f) + v.remove([f]) with pytest.raises(KeyError): v.settings[f] assert not v.settings.keys() - v.add(f) + v.add([f]) v.settings[f]["foo"] = "bar" assert v.settings.keys() v.clear() @@ -377,10 +526,10 @@ def test_settings(): def test_configure(): v = view.View() - with taddons.context(options=options.Options()) as tctx: - tctx.configure(v, filter="~q") + with taddons.context() as tctx: + tctx.configure(v, view_filter="~q") with pytest.raises(Exception, match="Invalid interception filter"): - tctx.configure(v, filter="~~") + tctx.configure(v, view_filter="~~") tctx.configure(v, console_order="method") with pytest.raises(Exception, match="Unknown flow order"): @@ -388,7 +537,5 @@ def test_configure(): tctx.configure(v, console_order_reversed=True) - tctx.configure(v, console_order=None) - tctx.configure(v, console_focus_follow=True) assert v.focus_follow diff --git a/test/mitmproxy/console/test_flowlist.py b/test/mitmproxy/console/test_flowlist.py deleted file mode 100644 index 7c442b63..00000000 --- a/test/mitmproxy/console/test_flowlist.py +++ /dev/null @@ -1,21 +0,0 @@ -from unittest import mock - -import mitmproxy.tools.console.flowlist as flowlist -from mitmproxy.tools import console -from mitmproxy import proxy -from mitmproxy import options - - -class TestFlowlist: - def mkmaster(self, **opts): - if "verbosity" not in opts: - opts["verbosity"] = 1 - o = options.Options(**opts) - return console.master.ConsoleMaster(o, proxy.DummyServer()) - - def test_new_request(self): - m = self.mkmaster() - x = flowlist.FlowListBox(m) - with mock.patch('mitmproxy.tools.console.signals.status_message.send') as mock_thing: - x.new_request("nonexistent url", "GET") - mock_thing.assert_called_once_with(message="Invalid URL: No hostname given") diff --git a/test/mitmproxy/contentviews/test_api.py b/test/mitmproxy/contentviews/test_api.py index 95d83af9..c072c86f 100644 --- a/test/mitmproxy/contentviews/test_api.py +++ b/test/mitmproxy/contentviews/test_api.py @@ -9,23 +9,28 @@ from mitmproxy.test import tutils class TestContentView(contentviews.View): name = "test" - prompt = ("t", "test") + prompt = ("test", "t") content_types = ["test/123"] def test_add_remove(): tcv = TestContentView() contentviews.add(tcv) + assert tcv in contentviews.views # repeated addition causes exception - with pytest.raises(ContentViewException): + with pytest.raises(ContentViewException, match="Duplicate view"): contentviews.add(tcv) + tcv2 = TestContentView() + tcv2.name = "test2" + tcv2.prompt = ("test2", "t") # Same shortcut doesn't work either. - with pytest.raises(ContentViewException): - contentviews.add(TestContentView()) + with pytest.raises(ContentViewException, match="Duplicate view shortcut"): + contentviews.add(tcv2) contentviews.remove(tcv) + assert tcv not in contentviews.views def test_get_content_view(): @@ -43,6 +48,7 @@ def test_get_content_view(): headers=Headers(content_type="application/json") ) assert desc == "JSON" + assert list(lines) desc, lines, err = contentviews.get_content_view( contentviews.get("JSON"), @@ -84,3 +90,4 @@ def test_get_message_content_view(): def test_get_by_shortcut(): assert contentviews.get_by_shortcut("s") + assert not contentviews.get_by_shortcut("b") diff --git a/test/mitmproxy/contentviews/test_xml_html.py b/test/mitmproxy/contentviews/test_xml_html.py index 2b0aee4d..8148fd4c 100644 --- a/test/mitmproxy/contentviews/test_xml_html.py +++ b/test/mitmproxy/contentviews/test_xml_html.py @@ -11,6 +11,13 @@ def test_simple(): v = full_eval(xml_html.ViewXmlHtml()) assert v(b"foo") == ('XML', [[('text', 'foo')]]) assert v(b"<html></html>") == ('HTML', [[('text', '<html></html>')]]) + assert v(b"<>") == ('XML', [[('text', '<>')]]) + assert v(b"<p") == ('XML', [[('text', '<p')]]) + + with open(data.path("simple.html")) as f: + input = f.read() + tokens = xml_html.tokenize(input) + assert str(next(tokens)) == "Tag(<!DOCTYPE html>)" @pytest.mark.parametrize("filename", [ @@ -18,6 +25,7 @@ def test_simple(): "cdata.xml", "comment.xml", "inline.html", + "test.html" ]) def test_format_xml(filename): path = data.path(filename) diff --git a/test/mitmproxy/contentviews/test_xml_html_data/test-formatted.html b/test/mitmproxy/contentviews/test_xml_html_data/test-formatted.html new file mode 100644 index 00000000..0eb60004 --- /dev/null +++ b/test/mitmproxy/contentviews/test_xml_html_data/test-formatted.html @@ -0,0 +1,44 @@ +<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <title>Title</title> +</head> +<body> + <p> + Lorem ipsum dolor + <p> + sit amet, consectetur + <p> + adipiscing elit, sed + <p> + do eiusmod tempor + <p> + incididunt ut + <p> + labore et dolore + <p> + magna aliqua. + <p> + Ut enim ad minim + <p> + veniam, quis nostrud + <p> + exercitation + <p> + ullamco laboris + <p> + nisi ut aliquip ex ea + <p> + commodo consequat. + <p> + Duis aute irure + <p> + dolor in reprehenderit + <p> + in voluptate velit + <p> + esse cillum dolore + <p>eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.</p> +</body> +</html> diff --git a/test/mitmproxy/contentviews/test_xml_html_data/test.html b/test/mitmproxy/contentviews/test_xml_html_data/test.html new file mode 100644 index 00000000..e74ac314 --- /dev/null +++ b/test/mitmproxy/contentviews/test_xml_html_data/test.html @@ -0,0 +1,14 @@ +<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <title>Title</title> +</head> +<body> +<p>Lorem ipsum dolor<p>sit amet, consectetur <p>adipiscing elit, sed<p>do eiusmod tempor<p> incididunt ut<p> labore et dolore<p> magna aliqua. + <p>Ut enim ad minim <p>veniam, quis nostrud <p>exercitation <p>ullamco laboris <p> + nisi ut aliquip ex ea <p>commodo consequat.<p>Duis aute irure <p>dolor in reprehenderit <p>in voluptate velit<p> esse cillum dolore <p>eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.</p> + + +</body> +</html>
\ No newline at end of file diff --git a/test/mitmproxy/data/addonscripts/addon.py b/test/mitmproxy/data/addonscripts/addon.py index f34f41cb..8c834d82 100644 --- a/test/mitmproxy/data/addonscripts/addon.py +++ b/test/mitmproxy/data/addonscripts/addon.py @@ -6,17 +6,19 @@ class Addon: def event_log(self): return event_log - def start(self, opts): - event_log.append("addonstart") + def load(self, opts): + event_log.append("addonload") - def configure(self, options, updated): + def configure(self, updated): event_log.append("addonconfigure") -def configure(options, updated): - event_log.append("addonconfigure") +def configure(updated): + event_log.append("scriptconfigure") -def start(opts): - event_log.append("scriptstart") - return Addon() +def load(l): + event_log.append("scriptload") + + +addons = [Addon()] diff --git a/test/mitmproxy/data/addonscripts/concurrent_decorator.py b/test/mitmproxy/data/addonscripts/concurrent_decorator.py index 162c00f4..d1ab6c6c 100644 --- a/test/mitmproxy/data/addonscripts/concurrent_decorator.py +++ b/test/mitmproxy/data/addonscripts/concurrent_decorator.py @@ -1,4 +1,5 @@ import time +import sys from mitmproxy.script import concurrent diff --git a/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py b/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py index 10ba24cd..2a7d300c 100644 --- a/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py +++ b/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py @@ -9,5 +9,4 @@ class ConcurrentClass: time.sleep(0.1) -def start(opts): - return ConcurrentClass() +addons = [ConcurrentClass()] diff --git a/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py b/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py index 7bc28182..4f80e98a 100644 --- a/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py +++ b/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py @@ -2,5 +2,5 @@ from mitmproxy.script import concurrent @concurrent -def start(opts): +def load(v): pass diff --git a/test/mitmproxy/data/addonscripts/print.py b/test/mitmproxy/data/addonscripts/print.py new file mode 100644 index 00000000..93b65a64 --- /dev/null +++ b/test/mitmproxy/data/addonscripts/print.py @@ -0,0 +1,2 @@ +def load(l): + print("stdoutprint") diff --git a/test/mitmproxy/data/addonscripts/recorder/a.py b/test/mitmproxy/data/addonscripts/recorder/a.py new file mode 100644 index 00000000..df81d86b --- /dev/null +++ b/test/mitmproxy/data/addonscripts/recorder/a.py @@ -0,0 +1,3 @@ +import recorder + +addons = [recorder.Recorder("a")] diff --git a/test/mitmproxy/data/addonscripts/recorder/b.py b/test/mitmproxy/data/addonscripts/recorder/b.py new file mode 100644 index 00000000..ccbae705 --- /dev/null +++ b/test/mitmproxy/data/addonscripts/recorder/b.py @@ -0,0 +1,3 @@ +import recorder + +addons = [recorder.Recorder("b")] diff --git a/test/mitmproxy/data/addonscripts/recorder/c.py b/test/mitmproxy/data/addonscripts/recorder/c.py new file mode 100644 index 00000000..b8b0915e --- /dev/null +++ b/test/mitmproxy/data/addonscripts/recorder/c.py @@ -0,0 +1,3 @@ +import recorder + +addons = [recorder.Recorder("c")] diff --git a/test/mitmproxy/data/addonscripts/recorder/e.py b/test/mitmproxy/data/addonscripts/recorder/e.py new file mode 100644 index 00000000..eb5eff5e --- /dev/null +++ b/test/mitmproxy/data/addonscripts/recorder/e.py @@ -0,0 +1,3 @@ +import recorder + +addons = [recorder.Recorder("e")] diff --git a/test/mitmproxy/data/addonscripts/recorder.py b/test/mitmproxy/data/addonscripts/recorder/recorder.py index aff524a8..a962d3df 100644 --- a/test/mitmproxy/data/addonscripts/recorder.py +++ b/test/mitmproxy/data/addonscripts/recorder/recorder.py @@ -1,13 +1,12 @@ from mitmproxy import controller from mitmproxy import eventsequence from mitmproxy import ctx -import sys -class CallLogger: +class Recorder: call_log = [] - def __init__(self, name = "solo"): + def __init__(self, name = "recorder"): self.name = name def __getattr__(self, attr): @@ -22,5 +21,4 @@ class CallLogger: raise AttributeError -def start(opts): - return CallLogger(*sys.argv[1:]) +addons = [Recorder()] diff --git a/test/mitmproxy/data/dumpfile-018 b/test/mitmproxy/data/dumpfile-018 index abe8b0b1..6a27b5a6 100644 --- a/test/mitmproxy/data/dumpfile-018 +++ b/test/mitmproxy/data/dumpfile-018 @@ -1,4 +1,4 @@ -5243:5:error;0:~11:intercepted;5:false!6:marked;5:false!2:id;36:55367415-10f5-4938-b69f-8a523394f947;7:request;396:10:stickyauth;5:false!7:content;0:,15:timestamp_start;18:1482157523.9086578^9:is_replay;5:false!4:path;1:/,4:host;15:www.example.com,17:first_line_format;8:relative;12:stickycookie;5:false!12:http_version;8:HTTP/1.1,6:method;3:GET,4:port;3:443#13:timestamp_end;18:1482157523.9086578^6:scheme;5:https,7:headers;82:29:10:User-Agent,11:curl/7.35.0,]26:4:Host,15:www.example.com,]15:6:Accept,3:*/*,]]}8:response;1851:6:reason;2:OK,12:http_version;8:HTTP/1.1,13:timestamp_end;17:1482157524.361187^11:status_code;3:200#7:content;1270:<!doctype html> +7816:4:type;4:http;2:id;36:55367415-10f5-4938-b69f-8a523394f947;8:response;1851:15:timestamp_start;17:1482157524.361187^12:http_version;8:HTTP/1.1,7:content;1270:<!doctype html> <html> <head> <title>Example Domain</title> @@ -48,7 +48,7 @@ </div> </body> </html> -,7:headers;410:25:13:Accept-Ranges,5:bytes,]35:13:Cache-Control,14:max-age=604800,]28:12:Content-Type,9:text/html,]40:4:Date,29:Mon, 19 Dec 2016 14:25:24 GMT,]22:4:Etag,11:"359670651",]43:7:Expires,29:Mon, 26 Dec 2016 14:25:24 GMT,]50:13:Last-Modified,29:Fri, 09 Aug 2013 23:54:35 GMT,]27:6:Server,14:ECS (iad/18CB),]26:4:Vary,15:Accept-Encoding,]16:7:X-Cache,3:HIT,]25:17:x-ec-custom-error,1:1,]25:14:Content-Length,4:1270,]]15:timestamp_start;17:1482157524.361187^}4:type;4:http;11:server_conn;2570:15:ssl_established;4:true!7:address;58:7:address;25:15:www.example.com;3:443#]8:use_ipv6;5:false!}10:ip_address;56:7:address;23:13:93.184.216.34;3:443#]8:use_ipv6;5:false!}3:via;0:~14:source_address;57:7:address;24:12:10.67.53.133;5:52775#]8:use_ipv6;5:false!}13:timestamp_end;0:~4:cert;2122:-----BEGIN CERTIFICATE----- +,13:timestamp_end;17:1482157524.361187^11:status_code;3:200#6:reason;2:OK,7:headers;410:25:13:Accept-Ranges,5:bytes,]35:13:Cache-Control,14:max-age=604800,]28:12:Content-Type,9:text/html,]40:4:Date,29:Mon, 19 Dec 2016 14:25:24 GMT,]22:4:Etag,11:"359670651",]43:7:Expires,29:Mon, 26 Dec 2016 14:25:24 GMT,]50:13:Last-Modified,29:Fri, 09 Aug 2013 23:54:35 GMT,]27:6:Server,14:ECS (iad/18CB),]26:4:Vary,15:Accept-Encoding,]16:7:X-Cache,3:HIT,]25:17:x-ec-custom-error,1:1,]25:14:Content-Length,4:1270,]]}7:request;396:9:is_replay;5:false!17:first_line_format;8:relative;4:port;3:443#7:content;0:,12:stickycookie;5:false!6:method;3:GET,7:headers;82:29:10:User-Agent,11:curl/7.35.0,]26:4:Host,15:www.example.com,]15:6:Accept,3:*/*,]]15:timestamp_start;18:1482157523.9086578^12:http_version;8:HTTP/1.1,13:timestamp_end;18:1482157523.9086578^4:path;1:/,10:stickyauth;5:false!4:host;15:www.example.com,6:scheme;5:https,}7:version;13:1:0#2:18#1:2#]5:error;0:~11:intercepted;5:false!11:server_conn;5143:10:ip_address;56:8:use_ipv6;5:false!7:address;23:13:93.184.216.34;3:443#]}15:timestamp_start;18:1482157523.9086578^19:timestamp_tcp_setup;18:1482157524.0081189^15:ssl_established;4:true!14:source_address;57:8:use_ipv6;5:false!7:address;24:12:10.67.53.133;5:52775#]}19:timestamp_ssl_setup;17:1482157524.260993^4:cert;2122:-----BEGIN CERTIFICATE----- MIIF8jCCBNqgAwIBAgIQDmTF+8I2reFLFyrrQceMsDANBgkqhkiG9w0BAQsFADBw MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3 d3cuZGlnaWNlcnQuY29tMS8wLQYDVQQDEyZEaWdpQ2VydCBTSEEyIEhpZ2ggQXNz @@ -82,4 +82,38 @@ ieqRbcuFjmqfyPmUv1U9QoI4TQikpw7TZU0zYZANP4C/gj4Ry48/znmUaRvy2kvI l7gRQ21qJTK5suoiYoYNo3J9T+pXPGU7Lydz/HwW+w0DpArtAaukI8aNX4ohFUKS wDSiIIWIWJiJGbEeIO0TIFwEVWTOnbNl/faPXpk5IRXicapqiII= -----END CERTIFICATE----- -,15:timestamp_start;18:1482157523.9086578^3:sni;15:www.example.com;19:timestamp_ssl_setup;17:1482157524.260993^19:timestamp_tcp_setup;18:1482157524.0081189^}11:client_conn;216:15:ssl_established;4:true!7:address;53:7:address;20:9:127.0.0.1;5:52774#]8:use_ipv6;5:false!}10:clientcert;0:~13:timestamp_end;0:~15:timestamp_start;18:1482157522.8949482^19:timestamp_ssl_setup;18:1482157523.9086578^}7:version;13:1:0#2:18#1:2#]}
\ No newline at end of file +,13:timestamp_end;0:~3:via;2570:15:ssl_established;4:true!19:timestamp_tcp_setup;18:1482157524.0081189^19:timestamp_ssl_setup;17:1482157524.260993^3:via;0:~3:sni;15:www.example.com;10:ip_address;56:8:use_ipv6;5:false!7:address;23:13:93.184.216.34;3:443#]}15:timestamp_start;18:1482157523.9086578^14:source_address;57:8:use_ipv6;5:false!7:address;24:12:10.67.53.133;5:52775#]}4:cert;2122:-----BEGIN CERTIFICATE----- +MIIF8jCCBNqgAwIBAgIQDmTF+8I2reFLFyrrQceMsDANBgkqhkiG9w0BAQsFADBw +MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3 +d3cuZGlnaWNlcnQuY29tMS8wLQYDVQQDEyZEaWdpQ2VydCBTSEEyIEhpZ2ggQXNz +dXJhbmNlIFNlcnZlciBDQTAeFw0xNTExMDMwMDAwMDBaFw0xODExMjgxMjAwMDBa +MIGlMQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZvcm5pYTEUMBIGA1UEBxML +TG9zIEFuZ2VsZXMxPDA6BgNVBAoTM0ludGVybmV0IENvcnBvcmF0aW9uIGZvciBB +c3NpZ25lZCBOYW1lcyBhbmQgTnVtYmVyczETMBEGA1UECxMKVGVjaG5vbG9neTEY +MBYGA1UEAxMPd3d3LmV4YW1wbGUub3JnMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAs0CWL2FjPiXBl61lRfvvE0KzLJmG9LWAC3bcBjgsH6NiVVo2dt6u +Xfzi5bTm7F3K7srfUBYkLO78mraM9qizrHoIeyofrV/n+pZZJauQsPjCPxMEJnRo +D8Z4KpWKX0LyDu1SputoI4nlQ/htEhtiQnuoBfNZxF7WxcxGwEsZuS1KcXIkHl5V +RJOreKFHTaXcB1qcZ/QRaBIv0yhxvK1yBTwWddT4cli6GfHcCe3xGMaSL328Fgs3 +jYrvG29PueB6VJi/tbbPu6qTfwp/H1brqdjh29U52Bhb0fJkM9DWxCP/Cattcc7a +z8EXnCO+LK8vkhw/kAiJWPKx4RBvgy73nwIDAQABo4ICUDCCAkwwHwYDVR0jBBgw +FoAUUWj/kK8CB3U8zNllZGKiErhZcjswHQYDVR0OBBYEFKZPYB4fLdHn8SOgKpUW +5Oia6m5IMIGBBgNVHREEejB4gg93d3cuZXhhbXBsZS5vcmeCC2V4YW1wbGUuY29t +ggtleGFtcGxlLmVkdYILZXhhbXBsZS5uZXSCC2V4YW1wbGUub3Jngg93d3cuZXhh +bXBsZS5jb22CD3d3dy5leGFtcGxlLmVkdYIPd3d3LmV4YW1wbGUubmV0MA4GA1Ud +DwEB/wQEAwIFoDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwdQYDVR0f +BG4wbDA0oDKgMIYuaHR0cDovL2NybDMuZGlnaWNlcnQuY29tL3NoYTItaGEtc2Vy +dmVyLWc0LmNybDA0oDKgMIYuaHR0cDovL2NybDQuZGlnaWNlcnQuY29tL3NoYTIt +aGEtc2VydmVyLWc0LmNybDBMBgNVHSAERTBDMDcGCWCGSAGG/WwBATAqMCgGCCsG +AQUFBwIBFhxodHRwczovL3d3dy5kaWdpY2VydC5jb20vQ1BTMAgGBmeBDAECAjCB +gwYIKwYBBQUHAQEEdzB1MCQGCCsGAQUFBzABhhhodHRwOi8vb2NzcC5kaWdpY2Vy +dC5jb20wTQYIKwYBBQUHMAKGQWh0dHA6Ly9jYWNlcnRzLmRpZ2ljZXJ0LmNvbS9E +aWdpQ2VydFNIQTJIaWdoQXNzdXJhbmNlU2VydmVyQ0EuY3J0MAwGA1UdEwEB/wQC +MAAwDQYJKoZIhvcNAQELBQADggEBAISomhGn2L0LJn5SJHuyVZ3qMIlRCIdvqe0Q +6ls+C8ctRwRO3UU3x8q8OH+2ahxlQmpzdC5al4XQzJLiLjiJ2Q1p+hub8MFiMmVP +PZjb2tZm2ipWVuMRM+zgpRVM6nVJ9F3vFfUSHOb4/JsEIUvPY+d8/Krc+kPQwLvy +ieqRbcuFjmqfyPmUv1U9QoI4TQikpw7TZU0zYZANP4C/gj4Ry48/znmUaRvy2kvI +l7gRQ21qJTK5suoiYoYNo3J9T+pXPGU7Lydz/HwW+w0DpArtAaukI8aNX4ohFUKS +wDSiIIWIWJiJGbEeIO0TIFwEVWTOnbNl/faPXpk5IRXicapqiII= +-----END CERTIFICATE----- +,13:timestamp_end;0:~7:address;58:8:use_ipv6;5:false!7:address;25:15:www.example.com;3:443#]}}7:address;58:8:use_ipv6;5:false!7:address;25:15:www.example.com;3:443#]}3:sni;15:www.example.com;}11:client_conn;216:15:timestamp_start;18:1482157522.8949482^15:ssl_established;4:true!13:timestamp_end;0:~10:clientcert;0:~7:address;53:8:use_ipv6;5:false!7:address;20:9:127.0.0.1;5:52774#]}19:timestamp_ssl_setup;18:1482157523.9086578^}6:marked;5:false!}
\ No newline at end of file diff --git a/test/mitmproxy/data/test_flow_export/locust_get.py b/test/mitmproxy/data/test_flow_export/locust_get.py deleted file mode 100644 index 632d5d53..00000000 --- a/test/mitmproxy/data/test_flow_export/locust_get.py +++ /dev/null @@ -1,35 +0,0 @@ -from locust import HttpLocust, TaskSet, task - -class UserBehavior(TaskSet): - def on_start(self): - ''' on_start is called when a Locust start before any task is scheduled ''' - self.path() - - @task() - def path(self): - url = self.locust.host + '/path' - - headers = { - 'header': 'qvalue', - 'content-length': '7', - } - - params = { - 'a': ['foo', 'bar'], - 'b': 'baz', - } - - self.response = self.client.request( - method='GET', - url=url, - headers=headers, - params=params, - ) - - ### Additional tasks can go here ### - - -class WebsiteUser(HttpLocust): - task_set = UserBehavior - min_wait = 1000 - max_wait = 3000 diff --git a/test/mitmproxy/data/test_flow_export/locust_patch.py b/test/mitmproxy/data/test_flow_export/locust_patch.py deleted file mode 100644 index f64e0857..00000000 --- a/test/mitmproxy/data/test_flow_export/locust_patch.py +++ /dev/null @@ -1,37 +0,0 @@ -from locust import HttpLocust, TaskSet, task - -class UserBehavior(TaskSet): - def on_start(self): - ''' on_start is called when a Locust start before any task is scheduled ''' - self.path() - - @task() - def path(self): - url = self.locust.host + '/path' - - headers = { - 'header': 'qvalue', - 'content-length': '7', - } - - params = { - 'query': 'param', - } - - data = '''content''' - - self.response = self.client.request( - method='PATCH', - url=url, - headers=headers, - params=params, - data=data, - ) - - ### Additional tasks can go here ### - - -class WebsiteUser(HttpLocust): - task_set = UserBehavior - min_wait = 1000 - max_wait = 3000 diff --git a/test/mitmproxy/data/test_flow_export/locust_post.py b/test/mitmproxy/data/test_flow_export/locust_post.py deleted file mode 100644 index df23476a..00000000 --- a/test/mitmproxy/data/test_flow_export/locust_post.py +++ /dev/null @@ -1,26 +0,0 @@ -from locust import HttpLocust, TaskSet, task - -class UserBehavior(TaskSet): - def on_start(self): - ''' on_start is called when a Locust start before any task is scheduled ''' - self.path() - - @task() - def path(self): - url = self.locust.host + '/path' - - data = '''content''' - - self.response = self.client.request( - method='POST', - url=url, - data=data, - ) - - ### Additional tasks can go here ### - - -class WebsiteUser(HttpLocust): - task_set = UserBehavior - min_wait = 1000 - max_wait = 3000 diff --git a/test/mitmproxy/data/test_flow_export/locust_task_get.py b/test/mitmproxy/data/test_flow_export/locust_task_get.py deleted file mode 100644 index 03821cd8..00000000 --- a/test/mitmproxy/data/test_flow_export/locust_task_get.py +++ /dev/null @@ -1,20 +0,0 @@ - @task() - def path(self): - url = self.locust.host + '/path' - - headers = { - 'header': 'qvalue', - 'content-length': '7', - } - - params = { - 'a': ['foo', 'bar'], - 'b': 'baz', - } - - self.response = self.client.request( - method='GET', - url=url, - headers=headers, - params=params, - ) diff --git a/test/mitmproxy/data/test_flow_export/locust_task_patch.py b/test/mitmproxy/data/test_flow_export/locust_task_patch.py deleted file mode 100644 index d425209c..00000000 --- a/test/mitmproxy/data/test_flow_export/locust_task_patch.py +++ /dev/null @@ -1,22 +0,0 @@ - @task() - def path(self): - url = self.locust.host + '/path' - - headers = { - 'header': 'qvalue', - 'content-length': '7', - } - - params = { - 'query': 'param', - } - - data = '''content''' - - self.response = self.client.request( - method='PATCH', - url=url, - headers=headers, - params=params, - data=data, - ) diff --git a/test/mitmproxy/data/test_flow_export/locust_task_post.py b/test/mitmproxy/data/test_flow_export/locust_task_post.py deleted file mode 100644 index 989df455..00000000 --- a/test/mitmproxy/data/test_flow_export/locust_task_post.py +++ /dev/null @@ -1,11 +0,0 @@ - @task() - def path(self): - url = self.locust.host + '/path' - - data = '''content''' - - self.response = self.client.request( - method='POST', - url=url, - data=data, - ) diff --git a/test/mitmproxy/data/test_flow_export/python_get.py b/test/mitmproxy/data/test_flow_export/python_get.py deleted file mode 100644 index e9ed072a..00000000 --- a/test/mitmproxy/data/test_flow_export/python_get.py +++ /dev/null @@ -1,9 +0,0 @@ -import requests - -response = requests.get( - 'http://address:22/path', - params=[('a', 'foo'), ('a', 'bar'), ('b', 'baz')], - headers={'header': 'qvalue'} -) - -print(response.text)
\ No newline at end of file diff --git a/test/mitmproxy/data/test_flow_export/python_patch.py b/test/mitmproxy/data/test_flow_export/python_patch.py deleted file mode 100644 index d83a57b9..00000000 --- a/test/mitmproxy/data/test_flow_export/python_patch.py +++ /dev/null @@ -1,10 +0,0 @@ -import requests - -response = requests.patch( - 'http://address:22/path', - params=[('query', 'param')], - headers={'header': 'qvalue'}, - data=b'content' -) - -print(response.text)
\ No newline at end of file diff --git a/test/mitmproxy/data/test_flow_export/python_post.py b/test/mitmproxy/data/test_flow_export/python_post.py deleted file mode 100644 index 6254adfb..00000000 --- a/test/mitmproxy/data/test_flow_export/python_post.py +++ /dev/null @@ -1,8 +0,0 @@ -import requests - -response = requests.post( - 'http://address:22/path', - data=b'content' -) - -print(response.text) diff --git a/test/mitmproxy/data/test_flow_export/python_post_json.py b/test/mitmproxy/data/test_flow_export/python_post_json.py deleted file mode 100644 index d6ae6357..00000000 --- a/test/mitmproxy/data/test_flow_export/python_post_json.py +++ /dev/null @@ -1,9 +0,0 @@ -import requests - -response = requests.post( - 'http://address:22/path', - headers={'content-type': 'application/json'}, - json={'email': 'example@example.com', 'name': 'example'} -) - -print(response.text)
\ No newline at end of file diff --git a/test/mitmproxy/test_io_compat.py b/test/mitmproxy/io/test_compat.py index 288de4fc..288de4fc 100644 --- a/test/mitmproxy/test_io_compat.py +++ b/test/mitmproxy/io/test_compat.py diff --git a/test/mitmproxy/test_io.py b/test/mitmproxy/io/test_io.py index 777ab4dd..777ab4dd 100644 --- a/test/mitmproxy/test_io.py +++ b/test/mitmproxy/io/test_io.py diff --git a/test/mitmproxy/contrib/test_tnetstring.py b/test/mitmproxy/io/test_tnetstring.py index 05c4a7c9..f7141de0 100644 --- a/test/mitmproxy/contrib/test_tnetstring.py +++ b/test/mitmproxy/io/test_tnetstring.py @@ -4,7 +4,7 @@ import math import io import struct -from mitmproxy.contrib import tnetstring +from mitmproxy.io import tnetstring MAXINT = 2 ** (struct.Struct('i').size * 8 - 1) - 1 diff --git a/test/mitmproxy/net/http/http1/test_read.py b/test/mitmproxy/net/http/http1/test_read.py index 642b91c0..b3589c92 100644 --- a/test/mitmproxy/net/http/http1/test_read.py +++ b/test/mitmproxy/net/http/http1/test_read.py @@ -243,6 +243,7 @@ def test_read_request_line(): def test_parse_authority_form(): assert _parse_authority_form(b"foo:42") == (b"foo", 42) + assert _parse_authority_form(b"[2001:db8:42::]:443") == (b"2001:db8:42::", 443) with pytest.raises(exceptions.HttpSyntaxException): _parse_authority_form(b"foo") with pytest.raises(exceptions.HttpSyntaxException): diff --git a/test/mitmproxy/net/http/test_cookies.py b/test/mitmproxy/net/http/test_cookies.py index 5c30dbdb..77549d9e 100644 --- a/test/mitmproxy/net/http/test_cookies.py +++ b/test/mitmproxy/net/http/test_cookies.py @@ -269,6 +269,9 @@ def test_refresh_cookie(): c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure" assert "00:21:38" in cookies.refresh_set_cookie_header(c, 60) + c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037; Path=/" + assert "expires" not in cookies.refresh_set_cookie_header(c, 60) + c = "foo,bar" with pytest.raises(ValueError): cookies.refresh_set_cookie_header(c, 60) @@ -283,6 +286,10 @@ def test_refresh_cookie(): c = "foo/bar=bla" assert cookies.refresh_set_cookie_header(c, 0) + # https://github.com/mitmproxy/mitmproxy/issues/2250 + c = "" + assert cookies.refresh_set_cookie_header(c, 60) == "" + @mock.patch('time.time') def test_get_expiration_ts(*args): diff --git a/test/mitmproxy/net/http/test_message.py b/test/mitmproxy/net/http/test_message.py index b75bc7c2..512f3199 100644 --- a/test/mitmproxy/net/http/test_message.py +++ b/test/mitmproxy/net/http/test_message.py @@ -48,6 +48,12 @@ class TestMessageData: assert data != 0 + def test_serializable(self): + data1 = tutils.tresp(timestamp_start=42, timestamp_end=42).data + data2 = tutils.tresp().data.from_state(data1.get_state()) # ResponseData.from_state() + + assert data1 == data2 + class TestMessage: @@ -117,6 +123,14 @@ class TestMessageContentEncoding: assert r.content == b"message" assert r.raw_content != b"message" + def test_update_content_length_header(self): + r = tutils.tresp() + assert int(r.headers["content-length"]) == 7 + r.encode("gzip") + assert int(r.headers["content-length"]) == 27 + r.decode() + assert int(r.headers["content-length"]) == 7 + def test_modify(self): r = tutils.tresp() assert "content-encoding" not in r.headers diff --git a/test/mitmproxy/net/test_imports.py b/test/mitmproxy/net/test_imports.py deleted file mode 100644 index b88ef26d..00000000 --- a/test/mitmproxy/net/test_imports.py +++ /dev/null @@ -1 +0,0 @@ -# These are actually tests! diff --git a/test/mitmproxy/net/test_tcp.py b/test/mitmproxy/net/test_tcp.py index 8b26784a..81d51888 100644 --- a/test/mitmproxy/net/test_tcp.py +++ b/test/mitmproxy/net/test_tcp.py @@ -529,10 +529,10 @@ class TestTimeOut(tservers.ServerTestBase): class TestCryptographyALPN: def test_has_alpn(self): - if 'OPENSSL_ALPN' in os.environ: + if os.environ.get("OPENSSL") == "with-alpn": assert tcp.HAS_ALPN assert SSL._lib.Cryptography_HAS_ALPN - elif 'OPENSSL_OLD' in os.environ: + elif os.environ.get("OPENSSL") == "old": assert not tcp.HAS_ALPN assert not SSL._lib.Cryptography_HAS_ALPN @@ -603,13 +603,36 @@ class TestDHParams(tservers.ServerTestBase): assert ret[0] == "DHE-RSA-AES256-SHA" -class TestTCPClient: +class TestTCPClient(tservers.ServerTestBase): def test_conerr(self): c = tcp.TCPClient(("127.0.0.1", 0)) - with pytest.raises(exceptions.TcpException): + with pytest.raises(exceptions.TcpException, match="Error connecting"): c.connect() + def test_timeout(self): + c = tcp.TCPClient(("127.0.0.1", self.port)) + with c.create_connection(timeout=20) as conn: + assert conn.gettimeout() == 20 + + def test_spoof_address(self): + c = tcp.TCPClient(("127.0.0.1", self.port), spoof_source_address=("127.0.0.1", 0)) + with pytest.raises(exceptions.TcpException, match="Failed to spoof"): + c.connect() + + +class TestTCPServer: + + def test_binderr(self): + with pytest.raises(socket.error, match="prohibited"): + tcp.TCPServer(("localhost", 8080)) + + def test_wait_for_silence(self): + s = tcp.TCPServer(("127.0.0.1", 0)) + with s.handler_counter: + with pytest.raises(exceptions.Timeout): + s.wait_for_silence() + class TestFileLike: @@ -811,7 +834,7 @@ class TestSSLKeyLogger(tservers.ServerTestBase): assert not tcp.SSLKeyLogger.create_logfun(False) -class TestSSLInvalidMethod(tservers.ServerTestBase): +class TestSSLInvalid(tservers.ServerTestBase): handler = EchoHandler ssl = True @@ -821,3 +844,13 @@ class TestSSLInvalidMethod(tservers.ServerTestBase): with c.connect(): with pytest.raises(exceptions.TlsException): c.convert_to_ssl(method=fake_ssl_method) + + def test_alpn_error(self): + c = tcp.TCPClient(("127.0.0.1", self.port)) + with c.connect(): + if tcp.HAS_ALPN: + with pytest.raises(exceptions.TlsException, match="must be a function"): + c.create_ssl_context(alpn_select_callback="foo") + + with pytest.raises(exceptions.TlsException, match="ALPN error"): + c.create_ssl_context(alpn_select="foo", alpn_select_callback="bar") diff --git a/test/mitmproxy/platform/__init__.py b/test/mitmproxy/platform/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/mitmproxy/platform/__init__.py diff --git a/test/mitmproxy/proxy/protocol/test_http2.py b/test/mitmproxy/proxy/protocol/test_http2.py index 1f695cc5..b07257b3 100644 --- a/test/mitmproxy/proxy/protocol/test_http2.py +++ b/test/mitmproxy/proxy/protocol/test_http2.py @@ -11,7 +11,7 @@ from mitmproxy import options from mitmproxy.proxy.config import ProxyConfig import mitmproxy.net -from ....mitmproxy.net import tservers as net_tservers +from ...net import tservers as net_tservers from mitmproxy import exceptions from mitmproxy.net.http import http1, http2 @@ -36,7 +36,11 @@ class _Http2ServerBase(net_tservers.ServerTestBase): class handler(mitmproxy.net.tcp.BaseHandler): def handle(self): - h2_conn = h2.connection.H2Connection(client_side=False, header_encoding=False) + config = h2.config.H2Configuration( + client_side=False, + validate_outbound_headers=False, + validate_inbound_headers=False) + h2_conn = h2.connection.H2Connection(config) preamble = self.rfile.read(24) h2_conn.initiate_connection() @@ -138,7 +142,11 @@ class _Http2TestBase: client.convert_to_ssl(alpn_protos=[b'h2']) - h2_conn = h2.connection.H2Connection(client_side=True, header_encoding=False) + config = h2.config.H2Configuration( + client_side=True, + validate_outbound_headers=False, + validate_inbound_headers=False) + h2_conn = h2.connection.H2Connection(config) h2_conn.initiate_connection() client.wfile.write(h2_conn.data_to_send()) client.wfile.flush() @@ -756,7 +764,7 @@ class TestMaxConcurrentStreams(_Http2Test): @classmethod def setup_class(cls): _Http2TestBase.setup_class() - _Http2ServerBase.setup_class(h2_server_settings={h2.settings.MAX_CONCURRENT_STREAMS: 2}) + _Http2ServerBase.setup_class(h2_server_settings={h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS: 2}) @classmethod def handle_server_event(cls, event, h2_conn, rfile, wfile): diff --git a/test/mitmproxy/proxy/protocol/test_websocket.py b/test/mitmproxy/proxy/protocol/test_websocket.py index 486e9d64..8dfc4f2b 100644 --- a/test/mitmproxy/proxy/protocol/test_websocket.py +++ b/test/mitmproxy/proxy/protocol/test_websocket.py @@ -11,7 +11,7 @@ from mitmproxy.proxy.config import ProxyConfig from mitmproxy.net import tcp from mitmproxy.net import http -from ....mitmproxy.net import tservers as net_tservers +from ...net import tservers as net_tservers from ... import tservers from mitmproxy.net import websockets diff --git a/test/mitmproxy/proxy/test_server.py b/test/mitmproxy/proxy/test_server.py index 447b15a5..b4bb46bb 100644 --- a/test/mitmproxy/proxy/test_server.py +++ b/test/mitmproxy/proxy/test_server.py @@ -296,8 +296,8 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin): class TestHTTPAuth(tservers.HTTPProxyTest): def test_auth(self): self.master.addons.add(proxyauth.ProxyAuth()) - self.master.addons.configure_all( - self.master.options, self.master.options.keys() + self.master.addons.trigger( + "configure", self.master.options.keys() ) self.master.options.proxyauth = "test:test" assert self.pathod("202").status_code == 407 diff --git a/test/mitmproxy/script/test_concurrent.py b/test/mitmproxy/script/test_concurrent.py index 0e397b8f..ceff9fb9 100644 --- a/test/mitmproxy/script/test_concurrent.py +++ b/test/mitmproxy/script/test_concurrent.py @@ -3,8 +3,6 @@ from mitmproxy.test import tutils from mitmproxy.test import taddons from mitmproxy import controller -from mitmproxy.addons import script - import time from .. import tservers @@ -19,13 +17,11 @@ class Thing: class TestConcurrent(tservers.MasterTest): def test_concurrent(self): with taddons.context() as tctx: - sc = script.Script( + sc = tctx.script( tutils.test_data.path( "mitmproxy/data/addonscripts/concurrent_decorator.py" ) ) - sc.start(tctx.options) - f1, f2 = tflow.tflow(), tflow.tflow() tctx.cycle(sc, f1) tctx.cycle(sc, f2) @@ -37,23 +33,20 @@ class TestConcurrent(tservers.MasterTest): def test_concurrent_err(self): with taddons.context() as tctx: - sc = script.Script( + tctx.script( tutils.test_data.path( "mitmproxy/data/addonscripts/concurrent_decorator_err.py" ) ) - sc.start(tctx.options) assert tctx.master.has_log("decorator not supported") def test_concurrent_class(self): with taddons.context() as tctx: - sc = script.Script( + sc = tctx.script( tutils.test_data.path( "mitmproxy/data/addonscripts/concurrent_decorator_class.py" ) ) - sc.start(tctx.options) - f1, f2 = tflow.tflow(), tflow.tflow() tctx.cycle(sc, f1) tctx.cycle(sc, f2) diff --git a/test/mitmproxy/test_addonmanager.py b/test/mitmproxy/test_addonmanager.py index e7be25b8..678bc1b7 100644 --- a/test/mitmproxy/test_addonmanager.py +++ b/test/mitmproxy/test_addonmanager.py @@ -1,17 +1,27 @@ import pytest +from mitmproxy import addons from mitmproxy import addonmanager from mitmproxy import exceptions from mitmproxy import options +from mitmproxy import command from mitmproxy import master from mitmproxy import proxy +from mitmproxy.test import taddons +from mitmproxy.test import tflow class TAddon: - def __init__(self, name): + def __init__(self, name, addons=None): self.name = name self.tick = True self.custom_called = False + if addons: + self.addons = addons + + @command.command("test.command") + def testcommand(self) -> str: + return "here" def __repr__(self): return "Addon(%s)" % self.name @@ -23,25 +33,148 @@ class TAddon: self.custom_called = True -def test_simple(): +class THalt: + def event_custom(self): + raise exceptions.AddonHalt + + +class AOption: + def load(self, l): + l.add_option("custom_option", bool, False, "help") + + +def test_command(): + with taddons.context() as tctx: + tctx.master.addons.add(TAddon("test")) + assert tctx.master.commands.call("test.command") == "here" + + +def test_halt(): o = options.Options() m = master.Master(o, proxy.DummyServer(o)) a = addonmanager.AddonManager(m) - with pytest.raises(exceptions.AddonError): - a.invoke_addon(TAddon("one"), "done") + halt = THalt() + end = TAddon("end") + a.add(halt) + a.add(end) + + a.trigger("custom") + assert not end.custom_called + + a.remove(halt) + a.trigger("custom") + assert end.custom_called - a.add(TAddon("one")) - assert a.get("one") - assert not a.get("two") - a.clear() - assert not a.chain +def test_lifecycle(): + o = options.Options() + m = master.Master(o, proxy.DummyServer(o)) + a = addonmanager.AddonManager(m) a.add(TAddon("one")) - a.trigger("done") - with pytest.raises(exceptions.AddonError): + + with pytest.raises(exceptions.AddonManagerError): + a.add(TAddon("one")) + with pytest.raises(exceptions.AddonManagerError): + a.remove(TAddon("nonexistent")) + + f = tflow.tflow() + a.handle_lifecycle("request", f) + + a._configure_all(o, o.keys()) + + +def test_defaults(): + assert addons.default_addons() + + +def test_loader(): + with taddons.context() as tctx: + l = addonmanager.Loader(tctx.master) + l.add_option("custom_option", bool, False, "help") + l.add_option("custom_option", bool, False, "help") + + def cmd(a: str) -> str: + return "foo" + + l.add_command("test.command", cmd) + + +def test_simple(): + with taddons.context() as tctx: + a = tctx.master.addons + + assert len(a) == 0 + a.add(TAddon("one")) + assert a.get("one") + assert not a.get("two") + assert len(a) == 1 + a.clear() + assert len(a) == 0 + assert not a.chain + + a.add(TAddon("one")) + a.trigger("done") a.trigger("tick") + tctx.master.has_log("not callable") + + a.remove(a.get("one")) + assert not a.get("one") + + ta = TAddon("one") + a.add(ta) + a.trigger("custom") + assert ta.custom_called + + +def test_load_option(): + o = options.Options() + m = master.Master(o, proxy.DummyServer(o)) + a = addonmanager.AddonManager(m) + a.add(AOption()) + assert "custom_option" in m.options._options + + +def test_nesting(): + o = options.Options() + m = master.Master(o, proxy.DummyServer(o)) + a = addonmanager.AddonManager(m) + + a.add( + TAddon( + "one", + addons=[ + TAddon("two"), + TAddon("three", addons=[TAddon("four")]) + ] + ) + ) + assert len(a.chain) == 1 + assert a.get("one") + assert a.get("two") + assert a.get("three") + assert a.get("four") - ta = TAddon("one") - a.add(ta) a.trigger("custom") - assert ta.custom_called + assert a.get("one").custom_called + assert a.get("two").custom_called + assert a.get("three").custom_called + assert a.get("four").custom_called + + a.remove(a.get("three")) + assert not a.get("three") + assert not a.get("four") + + +class D: + def __init__(self): + self.w = None + + def log(self, x): + self.w = x + + +def test_streamlog(): + dummy = D() + s = addonmanager.StreamLog(dummy.log) + s.write("foo") + assert dummy.w == "foo" diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py new file mode 100644 index 00000000..958328b2 --- /dev/null +++ b/test/mitmproxy/test_command.py @@ -0,0 +1,165 @@ +import typing +from mitmproxy import command +from mitmproxy import flow +from mitmproxy import exceptions +from mitmproxy.test import tflow +from mitmproxy.test import taddons +import io +import pytest + + +class TAddon: + def cmd1(self, foo: str) -> str: + """cmd1 help""" + return "ret " + foo + + def cmd2(self, foo: str) -> str: + return 99 + + def cmd3(self, foo: int) -> int: + return foo + + def empty(self) -> None: + pass + + def varargs(self, one: str, *var: typing.Sequence[str]) -> typing.Sequence[str]: + return list(var) + + +class TestCommand: + def test_varargs(self): + with taddons.context() as tctx: + cm = command.CommandManager(tctx.master) + a = TAddon() + c = command.Command(cm, "varargs", a.varargs) + assert c.signature_help() == "varargs str *str -> [str]" + assert c.call(["one", "two", "three"]) == ["two", "three"] + with pytest.raises(exceptions.CommandError): + c.call(["one", "two", 3]) + + def test_call(self): + with taddons.context() as tctx: + cm = command.CommandManager(tctx.master) + a = TAddon() + c = command.Command(cm, "cmd.path", a.cmd1) + assert c.call(["foo"]) == "ret foo" + assert c.signature_help() == "cmd.path str -> str" + + c = command.Command(cm, "cmd.two", a.cmd2) + with pytest.raises(exceptions.CommandError): + c.call(["foo"]) + + c = command.Command(cm, "cmd.three", a.cmd3) + assert c.call(["1"]) == 1 + + +def test_simple(): + with taddons.context() as tctx: + c = command.CommandManager(tctx.master) + a = TAddon() + c.add("one.two", a.cmd1) + assert c.commands["one.two"].help == "cmd1 help" + assert(c.call("one.two foo") == "ret foo") + with pytest.raises(exceptions.CommandError, match="Unknown"): + c.call("nonexistent") + with pytest.raises(exceptions.CommandError, match="Invalid"): + c.call("") + with pytest.raises(exceptions.CommandError, match="Usage"): + c.call("one.two too many args") + + c.add("empty", a.empty) + c.call("empty") + + fp = io.StringIO() + c.dump(fp) + assert fp.getvalue() + + +def test_typename(): + assert command.typename(str, True) == "str" + assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]" + assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec" + + assert command.typename(command.Cuts, False) == "cutspec" + assert command.typename(command.Cuts, True) == "[cuts]" + + assert command.typename(flow.Flow, False) == "flow" + assert command.typename(typing.Sequence[str], False) == "[str]" + + +class DummyConsole: + @command.command("view.resolve") + def resolve(self, spec: str) -> typing.Sequence[flow.Flow]: + n = int(spec) + return [tflow.tflow(resp=True)] * n + + @command.command("cut") + def cut(self, spec: str) -> command.Cuts: + return [["test"]] + + +def test_parsearg(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + assert command.parsearg(tctx.master.commands, "foo", str) == "foo" + + assert command.parsearg(tctx.master.commands, "1", int) == 1 + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "foo", int) + + assert command.parsearg(tctx.master.commands, "true", bool) is True + assert command.parsearg(tctx.master.commands, "false", bool) is False + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "flobble", bool) + + assert len(command.parsearg( + tctx.master.commands, "2", typing.Sequence[flow.Flow] + )) == 2 + assert command.parsearg(tctx.master.commands, "1", flow.Flow) + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "2", flow.Flow) + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "0", flow.Flow) + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "foo", Exception) + + assert command.parsearg( + tctx.master.commands, "foo", command.Cuts + ) == [["test"]] + + assert command.parsearg( + tctx.master.commands, "foo", typing.Sequence[str] + ) == ["foo"] + assert command.parsearg( + tctx.master.commands, "foo, bar", typing.Sequence[str] + ) == ["foo", "bar"] + + +class TDec: + @command.command("cmd1") + def cmd1(self, foo: str) -> str: + """cmd1 help""" + return "ret " + foo + + @command.command("cmd2") + def cmd2(self, foo: str) -> str: + return 99 + + @command.command("empty") + def empty(self) -> None: + pass + + +def test_decorator(): + with taddons.context() as tctx: + c = command.CommandManager(tctx.master) + a = TDec() + c.collect_commands(a) + assert "cmd1" in c.commands + assert c.call("cmd1 bar") == "ret bar" + assert "empty" in c.commands + assert c.call("empty") is None + + with taddons.context() as tctx: + tctx.master.addons.add(a) + assert tctx.master.commands.call("cmd1 bar") == "ret bar" diff --git a/test/mitmproxy/test_connections.py b/test/mitmproxy/test_connections.py index 67a6552f..e320885d 100644 --- a/test/mitmproxy/test_connections.py +++ b/test/mitmproxy/test_connections.py @@ -99,7 +99,7 @@ class TestServerConnection: c.alpn_proto_negotiated = b'h2' assert 'address:22' in repr(c) assert 'ALPN' in repr(c) - assert 'TLS: foobar' in repr(c) + assert 'TLSv1.2: foobar' in repr(c) c.sni = None c.tls_established = True diff --git a/test/mitmproxy/test_controller.py b/test/mitmproxy/test_controller.py index ccc8bf35..2e13d298 100644 --- a/test/mitmproxy/test_controller.py +++ b/test/mitmproxy/test_controller.py @@ -176,6 +176,8 @@ class TestDummyReply: reply = controller.DummyReply() reply.ack() reply.take() + with pytest.raises(ControlException): + reply.mark_reset() reply.commit() reply.mark_reset() assert reply.state == "committed" diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py deleted file mode 100644 index 030f2c4e..00000000 --- a/test/mitmproxy/test_examples.py +++ /dev/null @@ -1,204 +0,0 @@ -import json -import shlex -import pytest - -from mitmproxy import options -from mitmproxy import contentviews -from mitmproxy import proxy -from mitmproxy import master -from mitmproxy.addons import script - -from mitmproxy.test import tflow -from mitmproxy.test import tutils -from mitmproxy.net.http import Headers -from mitmproxy.net.http import cookies - -from . import tservers - -example_dir = tutils.test_data.push("../examples") - - -class ScriptError(Exception): - pass - - -class RaiseMaster(master.Master): - def add_log(self, e, level): - if level in ("warn", "error"): - raise ScriptError(e) - - -def tscript(cmd, args=""): - o = options.Options() - cmd = example_dir.path(cmd) + " " + args - m = RaiseMaster(o, proxy.DummyServer()) - sc = script.Script(cmd) - m.addons.add(sc) - return m, sc - - -class TestScripts(tservers.MasterTest): - def test_add_header(self): - m, _ = tscript("simple/add_header.py") - f = tflow.tflow(resp=tutils.tresp()) - m.addons.handle_lifecycle("response", f) - assert f.response.headers["newheader"] == "foo" - - def test_custom_contentviews(self): - m, sc = tscript("simple/custom_contentview.py") - swapcase = contentviews.get("swapcase") - _, fmt = swapcase(b"<html>Test!</html>") - assert any(b'tEST!' in val[0][1] for val in fmt) - - def test_iframe_injector(self): - with pytest.raises(ScriptError): - tscript("simple/modify_body_inject_iframe.py") - - m, sc = tscript("simple/modify_body_inject_iframe.py", "http://example.org/evil_iframe") - f = tflow.tflow(resp=tutils.tresp(content=b"<html><body>mitmproxy</body></html>")) - m.addons.handle_lifecycle("response", f) - content = f.response.content - assert b'iframe' in content and b'evil_iframe' in content - - def test_modify_form(self): - m, sc = tscript("simple/modify_form.py") - - form_header = Headers(content_type="application/x-www-form-urlencoded") - f = tflow.tflow(req=tutils.treq(headers=form_header)) - m.addons.handle_lifecycle("request", f) - - assert f.request.urlencoded_form["mitmproxy"] == "rocks" - - f.request.headers["content-type"] = "" - m.addons.handle_lifecycle("request", f) - assert list(f.request.urlencoded_form.items()) == [("foo", "bar")] - - def test_modify_querystring(self): - m, sc = tscript("simple/modify_querystring.py") - f = tflow.tflow(req=tutils.treq(path="/search?q=term")) - - m.addons.handle_lifecycle("request", f) - assert f.request.query["mitmproxy"] == "rocks" - - f.request.path = "/" - m.addons.handle_lifecycle("request", f) - assert f.request.query["mitmproxy"] == "rocks" - - def test_arguments(self): - m, sc = tscript("simple/script_arguments.py", "mitmproxy rocks") - f = tflow.tflow(resp=tutils.tresp(content=b"I <3 mitmproxy")) - m.addons.handle_lifecycle("response", f) - assert f.response.content == b"I <3 rocks" - - def test_redirect_requests(self): - m, sc = tscript("simple/redirect_requests.py") - f = tflow.tflow(req=tutils.treq(host="example.org")) - m.addons.handle_lifecycle("request", f) - assert f.request.host == "mitmproxy.org" - - def test_send_reply_from_proxy(self): - m, sc = tscript("simple/send_reply_from_proxy.py") - f = tflow.tflow(req=tutils.treq(host="example.com", port=80)) - m.addons.handle_lifecycle("request", f) - assert f.response.content == b"Hello World" - - def test_dns_spoofing(self): - m, sc = tscript("complex/dns_spoofing.py") - original_host = "example.com" - - host_header = Headers(host=original_host) - f = tflow.tflow(req=tutils.treq(headers=host_header, port=80)) - - m.addons.handle_lifecycle("requestheaders", f) - - # Rewrite by reverse proxy mode - f.request.scheme = "https" - f.request.port = 443 - - m.addons.handle_lifecycle("request", f) - - assert f.request.scheme == "http" - assert f.request.port == 80 - - assert f.request.headers["Host"] == original_host - - -class TestHARDump: - - def flow(self, resp_content=b'message'): - times = dict( - timestamp_start=746203272, - timestamp_end=746203272, - ) - - # Create a dummy flow for testing - return tflow.tflow( - req=tutils.treq(method=b'GET', **times), - resp=tutils.tresp(content=resp_content, **times) - ) - - def test_no_file_arg(self): - with pytest.raises(ScriptError): - tscript("complex/har_dump.py") - - def test_simple(self, tmpdir): - path = str(tmpdir.join("somefile")) - - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.trigger("response", self.flow()) - m.addons.remove(sc) - - with open(path, "r") as inp: - har = json.load(inp) - assert len(har["log"]["entries"]) == 1 - - def test_base64(self, tmpdir): - path = str(tmpdir.join("somefile")) - - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.trigger( - "response", self.flow(resp_content=b"foo" + b"\xFF" * 10) - ) - m.addons.remove(sc) - - with open(path, "r") as inp: - har = json.load(inp) - assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64" - - def test_format_cookies(self): - m, sc = tscript("complex/har_dump.py", "-") - format_cookies = sc.ns.format_cookies - - CA = cookies.CookieAttrs - - f = format_cookies([("n", "v", CA([("k", "v")]))])[0] - assert f['name'] == "n" - assert f['value'] == "v" - assert not f['httpOnly'] - assert not f['secure'] - - f = format_cookies([("n", "v", CA([("httponly", None), ("secure", None)]))])[0] - assert f['httpOnly'] - assert f['secure'] - - f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0] - assert f['expires'] - - def test_binary(self, tmpdir): - - f = self.flow() - f.request.method = "POST" - f.request.headers["content-type"] = "application/x-www-form-urlencoded" - f.request.content = b"foo=bar&baz=s%c3%bc%c3%9f" - f.response.headers["random-junk"] = bytes(range(256)) - f.response.content = bytes(range(256)) - - path = str(tmpdir.join("somefile")) - - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.trigger("response", f) - m.addons.remove(sc) - - with open(path, "r") as inp: - har = json.load(inp) - assert len(har["log"]["entries"]) == 1 diff --git a/test/mitmproxy/test_export.py b/test/mitmproxy/test_export.py deleted file mode 100644 index 457d8836..00000000 --- a/test/mitmproxy/test_export.py +++ /dev/null @@ -1,106 +0,0 @@ -from mitmproxy.test import tflow -import re - -from mitmproxy.net.http import Headers -from mitmproxy import export # heh -from mitmproxy.test import tutils - - -def clean_blanks(s): - return re.sub(r"^(\s+)$", "", s, flags=re.MULTILINE) - - -def python_equals(testdata, text): - """ - Compare two bits of Python code, disregarding non-significant differences - like whitespace on blank lines and trailing space. - """ - d = open(tutils.test_data.path(testdata)).read() - assert clean_blanks(text).rstrip() == clean_blanks(d).rstrip() - - -def req_get(): - return tutils.treq(method=b'GET', content=b'', path=b"/path?a=foo&a=bar&b=baz") - - -def req_post(): - return tutils.treq(method=b'POST', headers=()) - - -def req_patch(): - return tutils.treq(method=b'PATCH', path=b"/path?query=param") - - -class TestExportCurlCommand: - def test_get(self): - flow = tflow.tflow(req=req_get()) - result = """curl -H 'header:qvalue' -H 'content-length:7' 'http://address:22/path?a=foo&a=bar&b=baz'""" - assert export.curl_command(flow) == result - - def test_post(self): - flow = tflow.tflow(req=req_post()) - result = """curl -X POST 'http://address:22/path' --data-binary 'content'""" - assert export.curl_command(flow) == result - - def test_patch(self): - flow = tflow.tflow(req=req_patch()) - result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'""" - assert export.curl_command(flow) == result - - -class TestExportPythonCode: - def test_get(self): - flow = tflow.tflow(req=req_get()) - python_equals("mitmproxy/data/test_flow_export/python_get.py", export.python_code(flow)) - - def test_post(self): - flow = tflow.tflow(req=req_post()) - python_equals("mitmproxy/data/test_flow_export/python_post.py", export.python_code(flow)) - - def test_post_json(self): - p = req_post() - p.content = b'{"name": "example", "email": "example@example.com"}' - p.headers = Headers(content_type="application/json") - flow = tflow.tflow(req=p) - python_equals("mitmproxy/data/test_flow_export/python_post_json.py", export.python_code(flow)) - - def test_patch(self): - flow = tflow.tflow(req=req_patch()) - python_equals("mitmproxy/data/test_flow_export/python_patch.py", export.python_code(flow)) - - -class TestExportLocustCode: - def test_get(self): - flow = tflow.tflow(req=req_get()) - python_equals("mitmproxy/data/test_flow_export/locust_get.py", export.locust_code(flow)) - - def test_post(self): - p = req_post() - p.content = b'content' - p.headers = '' - flow = tflow.tflow(req=p) - python_equals("mitmproxy/data/test_flow_export/locust_post.py", export.locust_code(flow)) - - def test_patch(self): - flow = tflow.tflow(req=req_patch()) - python_equals("mitmproxy/data/test_flow_export/locust_patch.py", export.locust_code(flow)) - - -class TestExportLocustTask: - def test_get(self): - flow = tflow.tflow(req=req_get()) - python_equals("mitmproxy/data/test_flow_export/locust_task_get.py", export.locust_task(flow)) - - def test_post(self): - flow = tflow.tflow(req=req_post()) - python_equals("mitmproxy/data/test_flow_export/locust_task_post.py", export.locust_task(flow)) - - def test_patch(self): - flow = tflow.tflow(req=req_patch()) - python_equals("mitmproxy/data/test_flow_export/locust_task_patch.py", export.locust_task(flow)) - - -class TestURL: - def test_url(self): - flow = tflow.tflow() - assert export.url(flow) == "http://address:22/path" diff --git a/test/mitmproxy/test_flow.py b/test/mitmproxy/test_flow.py index 630fc7e4..19f0e7d9 100644 --- a/test/mitmproxy/test_flow.py +++ b/test/mitmproxy/test_flow.py @@ -6,7 +6,7 @@ import mitmproxy.io from mitmproxy import flowfilter from mitmproxy import options from mitmproxy.proxy import config -from mitmproxy.contrib import tnetstring +from mitmproxy.io import tnetstring from mitmproxy.exceptions import FlowReadException from mitmproxy import flow from mitmproxy import http @@ -113,10 +113,6 @@ class TestFlowMaster: with pytest.raises(Exception, match="live"): fm.replay_request(f) - def test_create_flow(self): - fm = master.Master(None, DummyServer()) - assert fm.create_request("GET", "http://example.com/") - def test_all(self): s = tservers.TestState() fm = master.Master(None, DummyServer()) diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py index df392829..04ec7ded 100644 --- a/test/mitmproxy/test_optmanager.py +++ b/test/mitmproxy/test_optmanager.py @@ -14,6 +14,7 @@ class TO(optmanager.OptManager): self.add_option("one", typing.Optional[int], None, "help") self.add_option("two", typing.Optional[int], 2, "help") self.add_option("bool", bool, False, "help") + self.add_option("required_int", int, 2, "help") class TD(optmanager.OptManager): @@ -37,12 +38,6 @@ class TM(optmanager.OptManager): self.add_option("one", typing.Optional[str], None, "help") -def test_add_option(): - o = TO() - with pytest.raises(ValueError, match="already exists"): - o.add_option("one", typing.Optional[int], None, "help") - - def test_defaults(): o = TD2() defaults = { @@ -72,9 +67,15 @@ def test_defaults(): assert not o.has_changed(k) +def test_required_int(): + o = TO() + with pytest.raises(exceptions.OptionsError): + o.parse_setval("required_int", None) + + def test_options(): o = TO() - assert o.keys() == {"bool", "one", "two"} + assert o.keys() == {"bool", "one", "two", "required_int"} assert o.one is None assert o.two == 2 @@ -140,6 +141,18 @@ class Rec(): def test_subscribe(): o = TO() r = Rec() + + # pytest.raises keeps a reference here that interferes with the cleanup test + # further down. + try: + o.subscribe(r, ["unknown"]) + except exceptions.OptionsError: + pass + else: + raise AssertionError + + assert len(o.changed.receivers) == 0 + o.subscribe(r, ["two"]) o.one = 2 assert not r.called @@ -151,6 +164,21 @@ def test_subscribe(): o.two = 4 assert len(o.changed.receivers) == 0 + class binder: + def __init__(self): + self.o = TO() + self.called = False + self.o.subscribe(self.bound, ["two"]) + + def bound(self, *args, **kwargs): + self.called = True + + t = binder() + t.o.one = 3 + assert not t.called + t.o.two = 3 + assert t.called + def test_rollback(): o = TO() @@ -176,8 +204,12 @@ def test_rollback(): o.errored.connect(errsub) assert o.one is None - o.one = 10 - o.bool = True + with pytest.raises(exceptions.OptionsError): + o.one = 10 + assert o.one is None + with pytest.raises(exceptions.OptionsError): + o.bool = True + assert o.bool is False assert isinstance(recerr[0]["exc"], exceptions.OptionsError) assert o.one is None assert o.bool is False @@ -258,6 +290,20 @@ def test_saving(tmpdir): with pytest.raises(exceptions.OptionsError): optmanager.load_paths(o, dst) + with open(dst, 'wb') as f: + f.write(b"\x01\x02\x03") + with pytest.raises(exceptions.OptionsError): + optmanager.load_paths(o, dst) + with pytest.raises(exceptions.OptionsError): + optmanager.save(o, dst) + + with open(dst, 'wb') as f: + f.write(b"\xff\xff\xff") + with pytest.raises(exceptions.OptionsError): + optmanager.load_paths(o, dst) + with pytest.raises(exceptions.OptionsError): + optmanager.save(o, dst) + def test_merge(): m = TM() @@ -270,14 +316,14 @@ def test_merge(): def test_option(): - o = optmanager._Option("test", int, 1, None, None) + o = optmanager._Option("test", int, 1, "help", None) assert o.current() == 1 with pytest.raises(TypeError): o.set("foo") with pytest.raises(TypeError): - optmanager._Option("test", str, 1, None, None) + optmanager._Option("test", str, 1, "help", None) - o2 = optmanager._Option("test", int, 1, None, None) + o2 = optmanager._Option("test", int, 1, "help", None) assert o2 == o o2.set(5) assert o2 != o @@ -335,6 +381,11 @@ def test_set(): with pytest.raises(exceptions.OptionsError): opts.set("bool=wobble") + opts.set("bool=toggle") + assert opts.bool is False + opts.set("bool=toggle") + assert opts.bool is True + opts.set("int=1") assert opts.int == 1 with pytest.raises(exceptions.OptionsError): diff --git a/test/mitmproxy/test_proxy.py b/test/mitmproxy/test_proxy.py index 7a49c530..e1d0da00 100644 --- a/test/mitmproxy/test_proxy.py +++ b/test/mitmproxy/test_proxy.py @@ -3,7 +3,6 @@ from unittest import mock from OpenSSL import SSL import pytest - from mitmproxy.tools import cmdline from mitmproxy.tools import main from mitmproxy import options diff --git a/test/mitmproxy/test_taddons.py b/test/mitmproxy/test_taddons.py index 42371cfe..5a4c99fc 100644 --- a/test/mitmproxy/test_taddons.py +++ b/test/mitmproxy/test_taddons.py @@ -1,4 +1,6 @@ +import io from mitmproxy.test import taddons +from mitmproxy.test import tutils from mitmproxy import ctx @@ -9,3 +11,21 @@ def test_recordingmaster(): ctx.log.error("foo") assert not tctx.master.has_log("foo", level="debug") assert tctx.master.has_log("foo", level="error") + + +def test_dumplog(): + with taddons.context() as tctx: + ctx.log.info("testing") + s = io.StringIO() + tctx.master.dump_log(s) + assert s.getvalue() + + +def test_load_script(): + with taddons.context() as tctx: + s = tctx.script( + tutils.test_data.path( + "mitmproxy/data/addonscripts/recorder/recorder.py" + ) + ) + assert s diff --git a/test/mitmproxy/test_websocket.py b/test/mitmproxy/test_websocket.py index 62f69e2d..7c53a4b0 100644 --- a/test/mitmproxy/test_websocket.py +++ b/test/mitmproxy/test_websocket.py @@ -1,7 +1,7 @@ import io import pytest -from mitmproxy.contrib import tnetstring +from mitmproxy.io import tnetstring from mitmproxy import flowfilter from mitmproxy.test import tflow diff --git a/test/mitmproxy/tools/console/test_help.py b/test/mitmproxy/tools/console/test_help.py index ac3011e6..0ebc2d6a 100644 --- a/test/mitmproxy/tools/console/test_help.py +++ b/test/mitmproxy/tools/console/test_help.py @@ -9,9 +9,3 @@ class TestHelp: def test_helptext(self): h = help.HelpView(None) assert h.helptext() - - def test_keypress(self): - h = help.HelpView([1, 2, 3]) - assert not h.keypress((0, 0), "q") - assert not h.keypress((0, 0), "?") - assert h.keypress((0, 0), "o") == "o" diff --git a/test/mitmproxy/tools/console/test_keymap.py b/test/mitmproxy/tools/console/test_keymap.py new file mode 100644 index 00000000..6a75800e --- /dev/null +++ b/test/mitmproxy/tools/console/test_keymap.py @@ -0,0 +1,29 @@ +from mitmproxy.tools.console import keymap +from mitmproxy.test import taddons +from unittest import mock +import pytest + + +def test_bind(): + with taddons.context() as tctx: + km = keymap.Keymap(tctx.master) + km.executor = mock.Mock() + + with pytest.raises(ValueError): + km.add("foo", "bar", ["unsupported"]) + + km.add("key", "str", ["options", "commands"]) + assert km.get("options", "key") + assert km.get("commands", "key") + assert not km.get("flowlist", "key") + + km.handle("unknown", "unknown") + assert not km.executor.called + + km.handle("options", "key") + assert km.executor.called + + km.add("glob", "str", ["global"]) + km.executor = mock.Mock() + km.handle("options", "glob") + assert km.executor.called diff --git a/test/mitmproxy/tools/console/test_master.py b/test/mitmproxy/tools/console/test_master.py index 44b9ff3f..c87c9e83 100644 --- a/test/mitmproxy/tools/console/test_master.py +++ b/test/mitmproxy/tools/console/test_master.py @@ -30,7 +30,7 @@ class TestMaster(tservers.MasterTest): opts["verbosity"] = 1 o = options.Options(**opts) m = console.master.ConsoleMaster(o, proxy.DummyServer()) - m.addons.configure_all(o, o.keys()) + m.addons.trigger("configure", o.keys()) return m def test_basic(self): @@ -42,12 +42,6 @@ class TestMaster(tservers.MasterTest): pass assert len(m.view) == i - def test_run_script_once(self): - m = self.mkmaster() - f = tflow.tflow(resp=True) - m.run_script_once("nonexistent", [f]) - assert any("Input error" in str(l) for l in m.logbuffer) - def test_intercept(self): """regression test for https://github.com/mitmproxy/mitmproxy/issues/1605""" m = self.mkmaster(intercept="~b bar") diff --git a/test/mitmproxy/tools/console/test_pathedit.py b/test/mitmproxy/tools/console/test_pathedit.py index bd064e5f..b9f51f5a 100644 --- a/test/mitmproxy/tools/console/test_pathedit.py +++ b/test/mitmproxy/tools/console/test_pathedit.py @@ -1,10 +1,10 @@ import os from os.path import normpath +from unittest import mock + from mitmproxy.tools.console import pathedit from mitmproxy.test import tutils -from unittest.mock import patch - class TestPathCompleter: @@ -56,8 +56,8 @@ class TestPathEdit: pe = pathedit.PathEdit("", "") - with patch('urwid.widget.Edit.get_edit_text') as get_text, \ - patch('urwid.widget.Edit.set_edit_text') as set_text: + with mock.patch('urwid.widget.Edit.get_edit_text') as get_text, \ + mock.patch('urwid.widget.Edit.set_edit_text') as set_text: cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion")) get_text.return_value = os.path.join(cd, "a") diff --git a/test/mitmproxy/tools/test_dump.py b/test/mitmproxy/tools/test_dump.py index 8e2fa5b2..69a76d2e 100644 --- a/test/mitmproxy/tools/test_dump.py +++ b/test/mitmproxy/tools/test_dump.py @@ -12,7 +12,7 @@ from .. import tservers class TestDumpMaster(tservers.MasterTest): def mkmaster(self, flt, **opts): - o = options.Options(filtstr=flt, verbosity=-1, flow_detail=0, **opts) + o = options.Options(view_filter=flt, verbosity=-1, flow_detail=0, **opts) m = dump.DumpMaster(o, proxy.DummyServer(), with_termlog=False, with_dumper=False) return m diff --git a/test/mitmproxy/tools/web/test_app.py b/test/mitmproxy/tools/web/test_app.py index e3d5dc44..5427b995 100644 --- a/test/mitmproxy/tools/web/test_app.py +++ b/test/mitmproxy/tools/web/test_app.py @@ -1,5 +1,6 @@ import json as _json from unittest import mock +import os import tornado.testing from tornado import httpclient @@ -23,8 +24,8 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): m = webmaster.WebMaster(o, proxy.DummyServer(), with_termlog=False) f = tflow.tflow(resp=True) f.id = "42" - m.view.add(f) - m.view.add(tflow.tflow(err=True)) + m.view.add([f]) + m.view.add([tflow.tflow(err=True)]) m.add_log("test log", "info") self.master = m self.view = m.view @@ -78,7 +79,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): # restore for f in flows: - self.view.add(f) + self.view.add([f]) self.events.data = events def test_resume(self): @@ -110,7 +111,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): assert self.fetch("/flows/42", method="DELETE").code == 200 assert not self.view.get_by_id("42") - self.view.add(f) + self.view.add([f]) assert self.fetch("/flows/1234", method="DELETE").code == 404 @@ -162,7 +163,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): f = self.view.get_by_id(resp.body.decode()) assert f assert f.id != "42" - self.view.remove(f) + self.view.remove([f]) def test_flow_revert(self): f = self.view.get_by_id("42") @@ -275,3 +276,20 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): # trigger on_close by opening a second connection. ws_client2 = yield websocket.websocket_connect(ws_url) ws_client2.close() + + def test_generate_tflow_js(self): + _tflow = app.flow_to_json(tflow.tflow(resp=True, err=True)) + # Set some value as constant, so that _tflow.js would not change every time. + _tflow['client_conn']['id'] = "4a18d1a0-50a1-48dd-9aa6-d45d74282939" + _tflow['id'] = "d91165be-ca1f-4612-88a9-c0f8696f3e29" + _tflow['error']['timestamp'] = 1495370312.4814785 + _tflow['response']['timestamp_end'] = 1495370312.4814625 + _tflow['response']['timestamp_start'] = 1495370312.481462 + _tflow['server_conn']['id'] = "f087e7b2-6d0a-41a8-a8f0-e1a4761395f8" + tflow_json = _json.dumps(_tflow, indent=4, sort_keys=True) + here = os.path.abspath(os.path.dirname(__file__)) + web_root = os.path.join(here, os.pardir, os.pardir, os.pardir, os.pardir, 'web') + tflow_path = os.path.join(web_root, 'src/js/__tests__/ducks/_tflow.js') + content = """export default function(){{\n return {tflow_json}\n}}""".format(tflow_json=tflow_json) + with open(tflow_path, 'w') as f: + f.write(content) diff --git a/test/mitmproxy/tservers.py b/test/mitmproxy/tservers.py index b737b82a..b8005529 100644 --- a/test/mitmproxy/tservers.py +++ b/test/mitmproxy/tservers.py @@ -74,7 +74,7 @@ class TestMaster(taddons.RecordingMaster): self.state = TestState() self.addons.add(self.state) self.addons.add(*addons) - self.addons.configure_all(self.options, self.options.keys()) + self.addons.trigger("configure", self.options.keys()) self.addons.trigger("running") def reset(self, addons): diff --git a/test/mitmproxy/utils/test_human.py b/test/mitmproxy/utils/test_human.py index 3d65dfd1..76dc2f88 100644 --- a/test/mitmproxy/utils/test_human.py +++ b/test/mitmproxy/utils/test_human.py @@ -46,3 +46,10 @@ def test_pretty_duration(): assert human.pretty_duration(10000) == "10000s" assert human.pretty_duration(1.123) == "1.12s" assert human.pretty_duration(0.123) == "123ms" + + +def test_format_address(): + assert human.format_address(("::1", "54010", "0", "0")) == "[::1]:54010" + assert human.format_address(("::ffff:127.0.0.1", "54010", "0", "0")) == "127.0.0.1:54010" + assert human.format_address(("127.0.0.1", "54010")) == "127.0.0.1:54010" + assert human.format_address(("example.com", "54010")) == "example.com:54010" diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py index d99a914f..fe33070e 100644 --- a/test/mitmproxy/utils/test_typecheck.py +++ b/test/mitmproxy/utils/test_typecheck.py @@ -4,6 +4,7 @@ from unittest import mock import pytest from mitmproxy.utils import typecheck +from mitmproxy import command class TBase: @@ -16,66 +17,97 @@ class T(TBase): super(T, self).__init__(42) -def test_check_type(): - typecheck.check_type("foo", 42, int) +def test_check_option_type(): + typecheck.check_option_type("foo", 42, int) with pytest.raises(TypeError): - typecheck.check_type("foo", 42, str) + typecheck.check_option_type("foo", 42, str) with pytest.raises(TypeError): - typecheck.check_type("foo", None, str) + typecheck.check_option_type("foo", None, str) with pytest.raises(TypeError): - typecheck.check_type("foo", b"foo", str) + typecheck.check_option_type("foo", b"foo", str) def test_check_union(): - typecheck.check_type("foo", 42, typing.Union[int, str]) - typecheck.check_type("foo", "42", typing.Union[int, str]) + typecheck.check_option_type("foo", 42, typing.Union[int, str]) + typecheck.check_option_type("foo", "42", typing.Union[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", [], typing.Union[int, str]) + typecheck.check_option_type("foo", [], typing.Union[int, str]) # Python 3.5 only defines __union_params__ m = mock.Mock() m.__str__ = lambda self: "typing.Union" m.__union_params__ = (int,) - typecheck.check_type("foo", 42, m) + typecheck.check_option_type("foo", 42, m) def test_check_tuple(): - typecheck.check_type("foo", (42, "42"), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (42, "42"), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", None, typing.Tuple[int, str]) + typecheck.check_option_type("foo", None, typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", (), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", (42, 42), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (42, 42), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", ("42", 42), typing.Tuple[int, str]) + typecheck.check_option_type("foo", ("42", 42), typing.Tuple[int, str]) # Python 3.5 only defines __tuple_params__ m = mock.Mock() m.__str__ = lambda self: "typing.Tuple" m.__tuple_params__ = (int, str) - typecheck.check_type("foo", (42, "42"), m) + typecheck.check_option_type("foo", (42, "42"), m) def test_check_sequence(): - typecheck.check_type("foo", [10], typing.Sequence[int]) + typecheck.check_option_type("foo", [10], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", ["foo"], typing.Sequence[int]) + typecheck.check_option_type("foo", ["foo"], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", [10, "foo"], typing.Sequence[int]) + typecheck.check_option_type("foo", [10, "foo"], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", [b"foo"], typing.Sequence[str]) + typecheck.check_option_type("foo", [b"foo"], typing.Sequence[str]) with pytest.raises(TypeError): - typecheck.check_type("foo", "foo", typing.Sequence[str]) + typecheck.check_option_type("foo", "foo", typing.Sequence[str]) # Python 3.5 only defines __parameters__ m = mock.Mock() m.__str__ = lambda self: "typing.Sequence" m.__parameters__ = (int,) - typecheck.check_type("foo", [10], m) + typecheck.check_option_type("foo", [10], m) def test_check_io(): - typecheck.check_type("foo", io.StringIO(), typing.IO[str]) + typecheck.check_option_type("foo", io.StringIO(), typing.IO[str]) with pytest.raises(TypeError): - typecheck.check_type("foo", "foo", typing.IO[str]) + typecheck.check_option_type("foo", "foo", typing.IO[str]) + + +def test_check_any(): + typecheck.check_option_type("foo", 42, typing.Any) + typecheck.check_option_type("foo", object(), typing.Any) + typecheck.check_option_type("foo", None, typing.Any) + + +def test_check_command_type(): + assert(typecheck.check_command_type("foo", str)) + assert(typecheck.check_command_type(["foo"], typing.Sequence[str])) + assert(not typecheck.check_command_type(["foo", 1], typing.Sequence[str])) + assert(typecheck.check_command_type(None, None)) + assert(not typecheck.check_command_type(["foo"], typing.Sequence[int])) + assert(not typecheck.check_command_type("foo", typing.Sequence[int])) + assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts)) + assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts)) + assert(not typecheck.check_command_type([["foo", 22]], command.Cuts)) + + # Python 3.5 only defines __parameters__ + m = mock.Mock() + m.__str__ = lambda self: "typing.Sequence" + m.__parameters__ = (int,) + + typecheck.check_command_type([10], m) + + # Python 3.5 only defines __union_params__ + m = mock.Mock() + m.__str__ = lambda self: "typing.Union" + m.__union_params__ = (int,) + assert not typecheck.check_command_type([22], m) |