diff options
Diffstat (limited to 'test')
24 files changed, 605 insertions, 437 deletions
diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py index f71662f0..7ffda317 100644 --- a/test/mitmproxy/addons/test_clientplayback.py +++ b/test/mitmproxy/addons/test_clientplayback.py @@ -26,7 +26,7 @@ class TestClientPlayback: with taddons.context() as tctx: assert cp.count() == 0 f = tflow.tflow(resp=True) - cp.load([f]) + cp.start_replay([f]) assert cp.count() == 1 RP = "mitmproxy.proxy.protocol.http_replay.RequestReplayThread" with mock.patch(RP) as rp: @@ -44,13 +44,30 @@ class TestClientPlayback: cp.tick() assert cp.current_thread is None + cp.start_replay([f]) + cp.stop_replay() + assert not cp.flows + + def test_load_file(self, tmpdir): + cp = clientplayback.ClientPlayback() + with taddons.context(): + fpath = str(tmpdir.join("flows")) + tdump(fpath, [tflow.tflow(resp=True)]) + cp.load_file(fpath) + assert cp.flows + with pytest.raises(exceptions.CommandError): + cp.load_file("/nonexistent") + def test_configure(self, tmpdir): cp = clientplayback.ClientPlayback() with taddons.context() as tctx: path = str(tmpdir.join("flows")) tdump(path, [tflow.tflow()]) tctx.configure(cp, client_replay=[path]) + cp.configured = False tctx.configure(cp, client_replay=[]) + cp.configured = False tctx.configure(cp) + cp.configured = False with pytest.raises(exceptions.OptionsError): tctx.configure(cp, client_replay=["nonexistent"]) diff --git a/test/mitmproxy/addons/test_core.py b/test/mitmproxy/addons/test_core.py index 6ebf4ba9..64d0fa19 100644 --- a/test/mitmproxy/addons/test_core.py +++ b/test/mitmproxy/addons/test_core.py @@ -1,5 +1,6 @@ from mitmproxy.addons import core from mitmproxy.test import taddons +from mitmproxy.test import tflow from mitmproxy import exceptions import pytest @@ -15,3 +16,48 @@ def test_set(): with pytest.raises(exceptions.CommandError): tctx.command(sa.set, "nonexistent") + + +def test_resume(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + assert not sa.resume([f]) + f.intercept() + sa.resume([f]) + assert not f.reply.state == "taken" + + +def test_mark(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + assert not f.marked + sa.mark([f], True) + assert f.marked + + sa.mark_toggle([f]) + assert not f.marked + sa.mark_toggle([f]) + assert f.marked + + +def test_kill(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + f.intercept() + assert f.killable + sa.kill([f]) + assert not f.killable + + +def test_revert(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + f.backup() + f.request.content = b"bar" + assert f.modified() + sa.revert([f]) + assert not f.modified() diff --git a/test/mitmproxy/addons/test_cut.py b/test/mitmproxy/addons/test_cut.py new file mode 100644 index 00000000..e028331f --- /dev/null +++ b/test/mitmproxy/addons/test_cut.py @@ -0,0 +1,178 @@ + +from mitmproxy.addons import cut +from mitmproxy.addons import view +from mitmproxy import exceptions +from mitmproxy import certs +from mitmproxy.test import taddons +from mitmproxy.test import tflow +from mitmproxy.test import tutils +import pytest +from unittest import mock + + +def test_extract(): + tf = tflow.tflow(resp=True) + tests = [ + ["q.method", "GET"], + ["q.scheme", "http"], + ["q.host", "address"], + ["q.port", "22"], + ["q.path", "/path"], + ["q.url", "http://address:22/path"], + ["q.text", "content"], + ["q.content", b"content"], + ["q.raw_content", b"content"], + ["q.header[header]", "qvalue"], + + ["s.status_code", "200"], + ["s.reason", "OK"], + ["s.text", "message"], + ["s.content", b"message"], + ["s.raw_content", b"message"], + ["s.header[header-response]", "svalue"], + + ["cc.address.port", "22"], + ["cc.address.host", "address"], + ["cc.tls_version", "TLSv1.2"], + ["cc.sni", "address"], + ["cc.ssl_established", "false"], + + ["sc.address.port", "22"], + ["sc.address.host", "address"], + ["sc.ip_address.host", "192.168.0.1"], + ["sc.tls_version", "TLSv1.2"], + ["sc.sni", "address"], + ["sc.ssl_established", "false"], + ] + for t in tests: + ret = cut.extract(t[0], tf) + if ret != t[1]: + raise AssertionError("%s: Expected %s, got %s" % (t[0], t[1], ret)) + + with open(tutils.test_data.path("mitmproxy/net/data/text_cert"), "rb") as f: + d = f.read() + c1 = certs.SSLCert.from_pem(d) + tf.server_conn.cert = c1 + assert "CERTIFICATE" in cut.extract("sc.cert", tf) + + +def test_parse_cutspec(): + tests = [ + ("", None, True), + ("req.method", ("@all", ["req.method"]), False), + ( + "req.method,req.host", + ("@all", ["req.method", "req.host"]), + False + ), + ( + "req.method,req.host|~b foo", + ("~b foo", ["req.method", "req.host"]), + False + ), + ( + "req.method,req.host|~b foo | ~b bar", + ("~b foo | ~b bar", ["req.method", "req.host"]), + False + ), + ( + "req.method, req.host | ~b foo | ~b bar", + ("~b foo | ~b bar", ["req.method", "req.host"]), + False + ), + ] + for cutspec, output, err in tests: + try: + assert cut.parse_cutspec(cutspec) == output + except exceptions.CommandError: + if not err: + raise + else: + if err: + raise AssertionError("Expected error.") + + +def test_headername(): + with pytest.raises(exceptions.CommandError): + cut.headername("header[foo.") + + +def qr(f): + with open(f, "rb") as fp: + return fp.read() + + +def test_cut_clip(): + v = view.View() + c = cut.Cut() + with taddons.context() as tctx: + tctx.master.addons.add(v, c) + v.add([tflow.tflow(resp=True)]) + + with mock.patch('pyperclip.copy') as pc: + tctx.command(c.clip, "q.method|@all") + assert pc.called + + with mock.patch('pyperclip.copy') as pc: + tctx.command(c.clip, "q.content|@all") + assert pc.called + + with mock.patch('pyperclip.copy') as pc: + tctx.command(c.clip, "q.method,q.content|@all") + assert pc.called + + +def test_cut_file(tmpdir): + f = str(tmpdir.join("path")) + v = view.View() + c = cut.Cut() + with taddons.context() as tctx: + tctx.master.addons.add(v, c) + + v.add([tflow.tflow(resp=True)]) + + tctx.command(c.save, "q.method|@all", f) + assert qr(f) == b"GET" + tctx.command(c.save, "q.content|@all", f) + assert qr(f) == b"content" + tctx.command(c.save, "q.content|@all", "+" + f) + assert qr(f) == b"content\ncontent" + + v.add([tflow.tflow(resp=True)]) + tctx.command(c.save, "q.method|@all", f) + assert qr(f).splitlines() == [b"GET", b"GET"] + tctx.command(c.save, "q.method,q.content|@all", f) + assert qr(f).splitlines() == [b"GET,content", b"GET,content"] + + +def test_cut(): + v = view.View() + c = cut.Cut() + with taddons.context() as tctx: + v.add([tflow.tflow(resp=True)]) + tctx.master.addons.add(v, c) + assert c.cut("q.method|@all") == [["GET"]] + assert c.cut("q.scheme|@all") == [["http"]] + assert c.cut("q.host|@all") == [["address"]] + assert c.cut("q.port|@all") == [["22"]] + assert c.cut("q.path|@all") == [["/path"]] + assert c.cut("q.url|@all") == [["http://address:22/path"]] + assert c.cut("q.content|@all") == [[b"content"]] + assert c.cut("q.header[header]|@all") == [["qvalue"]] + assert c.cut("q.header[unknown]|@all") == [[""]] + + assert c.cut("s.status_code|@all") == [["200"]] + assert c.cut("s.reason|@all") == [["OK"]] + assert c.cut("s.content|@all") == [[b"message"]] + assert c.cut("s.header[header-response]|@all") == [["svalue"]] + assert c.cut("moo") == [[""]] + with pytest.raises(exceptions.CommandError): + assert c.cut("__dict__") == [[""]] + + v = view.View() + c = cut.Cut() + with taddons.context() as tctx: + tctx.master.addons.add(v, c) + v.add([tflow.ttcpflow()]) + assert c.cut("q.method|@all") == [[""]] + assert c.cut("s.status|@all") == [[""]] diff --git a/test/mitmproxy/addons/test_export.py b/test/mitmproxy/addons/test_export.py new file mode 100644 index 00000000..5c7c4976 --- /dev/null +++ b/test/mitmproxy/addons/test_export.py @@ -0,0 +1,109 @@ +import pytest +import os + +from mitmproxy import exceptions +from mitmproxy.addons import export # heh +from mitmproxy.test import tflow +from mitmproxy.test import tutils +from mitmproxy.test import taddons +from unittest import mock + + +@pytest.fixture +def get_request(): + return tflow.tflow( + req=tutils.treq( + method=b'GET', + content=b'', + path=b"/path?a=foo&a=bar&b=baz" + ) + ) + + +@pytest.fixture +def post_request(): + return tflow.tflow( + req=tutils.treq( + method=b'POST', + headers=(), + content=bytes(range(256)) + ) + ) + + +@pytest.fixture +def patch_request(): + return tflow.tflow( + req=tutils.treq(method=b'PATCH', path=b"/path?query=param") + ) + + +@pytest.fixture +def tcp_flow(): + return tflow.ttcpflow() + + +class TestExportCurlCommand: + def test_get(self, get_request): + result = """curl -H 'header:qvalue' -H 'content-length:7' 'http://address:22/path?a=foo&a=bar&b=baz'""" + assert export.curl_command(get_request) == result + + def test_post(self, post_request): + result = "curl -X POST 'http://address:22/path' --data-binary '{}'".format( + str(bytes(range(256)))[2:-1] + ) + assert export.curl_command(post_request) == result + + def test_patch(self, patch_request): + result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'""" + assert export.curl_command(patch_request) == result + + def test_tcp(self, tcp_flow): + with pytest.raises(exceptions.CommandError): + export.curl_command(tcp_flow) + + +class TestRaw: + def test_get(self, get_request): + assert b"header: qvalue" in export.raw(get_request) + + def test_tcp(self, tcp_flow): + with pytest.raises(exceptions.CommandError): + export.raw(tcp_flow) + + +def qr(f): + with open(f, "rb") as fp: + return fp.read() + + +def test_export(tmpdir): + f = str(tmpdir.join("path")) + e = export.Export() + with taddons.context(): + assert e.formats() == ["curl", "raw"] + with pytest.raises(exceptions.CommandError): + e.file("nonexistent", tflow.tflow(resp=True), f) + + e.file("raw", tflow.tflow(resp=True), f) + assert qr(f) + os.unlink(f) + + e.file("curl", tflow.tflow(resp=True), f) + assert qr(f) + os.unlink(f) + + +def test_clip(tmpdir): + e = export.Export() + with taddons.context(): + with pytest.raises(exceptions.CommandError): + e.clip("nonexistent", tflow.tflow(resp=True)) + + with mock.patch('pyperclip.copy') as pc: + e.clip("raw", tflow.tflow(resp=True)) + assert pc.called + + with mock.patch('pyperclip.copy') as pc: + e.clip("curl", tflow.tflow(resp=True)) + assert pc.called diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index 859d99f9..a3df1fcf 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -9,9 +9,6 @@ from mitmproxy.test import tutils from mitmproxy.test import taddons from mitmproxy import addonmanager from mitmproxy import exceptions -from mitmproxy import options -from mitmproxy import proxy -from mitmproxy import master from mitmproxy.addons import script @@ -48,9 +45,9 @@ def test_script_print_stdout(): class TestScript: def test_notfound(self): - with taddons.context() as tctx: - sc = script.Script("nonexistent") - tctx.master.addons.add(sc) + with taddons.context(): + with pytest.raises(exceptions.OptionsError): + script.Script("nonexistent") def test_simple(self): with taddons.context() as tctx: @@ -136,25 +133,45 @@ class TestCutTraceback: class TestScriptLoader: - def test_simple(self): - o = options.Options(scripts=[]) - m = master.Master(o, proxy.DummyServer()) + def test_script_run(self): + rp = tutils.test_data.path( + "mitmproxy/data/addonscripts/recorder/recorder.py" + ) sc = script.ScriptLoader() - sc.running() - m.addons.add(sc) - assert len(m.addons) == 1 - o.update( - scripts = [ - tutils.test_data.path( - "mitmproxy/data/addonscripts/recorder/recorder.py" - ) + with taddons.context() as tctx: + sc.script_run([tflow.tflow(resp=True)], rp) + debug = [i.msg for i in tctx.master.logs if i.level == "debug"] + assert debug == [ + 'recorder load', 'recorder running', 'recorder configure', + 'recorder tick', + 'recorder requestheaders', 'recorder request', + 'recorder responseheaders', 'recorder response' ] - ) - assert len(m.addons) == 1 - assert len(sc.addons) == 1 - o.update(scripts = []) - assert len(m.addons) == 1 - assert len(sc.addons) == 0 + + def test_script_run_nonexistent(self): + sc = script.ScriptLoader() + with taddons.context(): + with pytest.raises(exceptions.CommandError): + sc.script_run([tflow.tflow(resp=True)], "/nonexistent") + + def test_simple(self): + sc = script.ScriptLoader() + with taddons.context() as tctx: + tctx.master.addons.add(sc) + sc.running() + assert len(tctx.master.addons) == 1 + tctx.master.options.update( + scripts = [ + tutils.test_data.path( + "mitmproxy/data/addonscripts/recorder/recorder.py" + ) + ] + ) + assert len(tctx.master.addons) == 1 + assert len(sc.addons) == 1 + tctx.master.options.update(scripts = []) + assert len(tctx.master.addons) == 1 + assert len(sc.addons) == 0 def test_dupes(self): sc = script.ScriptLoader() @@ -166,13 +183,6 @@ class TestScriptLoader: scripts = ["one", "one"] ) - def test_nonexistent(self): - sc = script.ScriptLoader() - with taddons.context() as tctx: - tctx.master.addons.add(sc) - tctx.configure(sc, scripts = ["nonexistent"]) - tctx.master.has_log("nonexistent: file not found") - def test_order(self): rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder") sc = script.ScriptLoader() diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py index 29de48a0..3ceab3fa 100644 --- a/test/mitmproxy/addons/test_serverplayback.py +++ b/test/mitmproxy/addons/test_serverplayback.py @@ -16,12 +16,24 @@ def tdump(path, flows): w.add(i) +def test_load_file(tmpdir): + s = serverplayback.ServerPlayback() + with taddons.context(): + fpath = str(tmpdir.join("flows")) + tdump(fpath, [tflow.tflow(resp=True)]) + s.load_file(fpath) + assert s.flowmap + with pytest.raises(exceptions.CommandError): + s.load_file("/nonexistent") + + def test_config(tmpdir): s = serverplayback.ServerPlayback() with taddons.context() as tctx: fpath = str(tmpdir.join("flows")) tdump(fpath, [tflow.tflow(resp=True)]) tctx.configure(s, server_replay=[fpath]) + s.configured = False with pytest.raises(exceptions.OptionsError): tctx.configure(s, server_replay=[str(tmpdir)]) diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py index 05d4af30..1724da49 100644 --- a/test/mitmproxy/addons/test_view.py +++ b/test/mitmproxy/addons/test_view.py @@ -4,8 +4,8 @@ from mitmproxy.test import tflow from mitmproxy.addons import view from mitmproxy import flowfilter -from mitmproxy import options from mitmproxy import exceptions +from mitmproxy import io from mitmproxy.test import taddons @@ -26,12 +26,12 @@ def test_order_refresh(): v.sig_view_refresh.connect(save) tf = tflow.tflow(resp=True) - with taddons.context(options=options.Options()) as tctx: + with taddons.context() as tctx: tctx.configure(v, console_order="time") - v.add(tf) + v.add([tf]) tf.request.timestamp_start = 1 assert not sargs - v.update(tf) + v.update([tf]) assert sargs @@ -131,15 +131,50 @@ def test_filter(): assert len(v) == 4 -def test_load(): +def tdump(path, flows): + w = io.FlowWriter(open(path, "wb")) + for i in flows: + w.add(i) + + +def test_create(): + v = view.View() + with taddons.context(): + v.create("get", "http://foo.com") + assert len(v) == 1 + assert v[0].request.url == "http://foo.com/" + v.create("get", "http://foo.com") + assert len(v) == 2 + + +def test_orders(): v = view.View() - with taddons.context(options=options.Options()) as tctx: + with taddons.context(): + assert v.order_options() + + +def test_load(tmpdir): + path = str(tmpdir.join("path")) + v = view.View() + with taddons.context() as tctx: tctx.master.addons.add(v) + tdump( + path, + [ + tflow.tflow(resp=True), + tflow.tflow(resp=True) + ] + ) + v.load_file(path) + assert len(v) == 2 + v.load_file(path) + assert len(v) == 4 def test_resolve(): v = view.View() - with taddons.context(options=options.Options()) as tctx: + with taddons.context() as tctx: + assert tctx.command(v.resolve, "@all") == [] assert tctx.command(v.resolve, "@focus") == [] assert tctx.command(v.resolve, "@shown") == [] assert tctx.command(v.resolve, "@hidden") == [] @@ -149,6 +184,7 @@ def test_resolve(): v.request(tft(method="get")) assert len(tctx.command(v.resolve, "~m get")) == 1 assert len(tctx.command(v.resolve, "@focus")) == 1 + assert len(tctx.command(v.resolve, "@all")) == 1 assert len(tctx.command(v.resolve, "@shown")) == 1 assert len(tctx.command(v.resolve, "@unmarked")) == 1 assert tctx.command(v.resolve, "@hidden") == [] @@ -156,6 +192,7 @@ def test_resolve(): v.request(tft(method="put")) assert len(tctx.command(v.resolve, "@focus")) == 1 assert len(tctx.command(v.resolve, "@shown")) == 2 + assert len(tctx.command(v.resolve, "@all")) == 2 assert tctx.command(v.resolve, "@hidden") == [] assert tctx.command(v.resolve, "@marked") == [] @@ -175,14 +212,52 @@ def test_resolve(): assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"] assert m(tctx.command(v.resolve, "@marked")) == ["GET"] assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"] + assert m(tctx.command(v.resolve, "@all")) == ["GET", "PUT", "GET", "PUT"] with pytest.raises(exceptions.CommandError, match="Invalid flow filter"): tctx.command(v.resolve, "~") +def test_go(): + v = view.View() + with taddons.context(): + v.add([ + tflow.tflow(), + tflow.tflow(), + tflow.tflow(), + tflow.tflow(), + tflow.tflow(), + ]) + assert v.focus.index == 0 + v.go(-1) + assert v.focus.index == 4 + v.go(0) + assert v.focus.index == 0 + v.go(1) + assert v.focus.index == 1 + v.go(999) + assert v.focus.index == 4 + v.go(-999) + assert v.focus.index == 0 + + +def test_duplicate(): + v = view.View() + with taddons.context(): + f = [ + tflow.tflow(), + tflow.tflow(), + ] + v.add(f) + assert len(v) == 2 + v.duplicate(f) + assert len(v) == 4 + assert v.focus.index == 2 + + def test_order(): v = view.View() - with taddons.context(options=options.Options()) as tctx: + with taddons.context() as tctx: v.request(tft(method="get", start=1)) v.request(tft(method="put", start=2)) v.request(tft(method="get", start=3)) @@ -230,14 +305,14 @@ def test_update(): assert f in v f.request.method = "put" - v.update(f) + v.update([f]) assert f not in v f.request.method = "get" - v.update(f) + v.update([f]) assert f in v - v.update(f) + v.update([f]) assert f in v @@ -276,7 +351,7 @@ def test_signals(): assert not any([rec_add, rec_update, rec_remove, rec_refresh]) # Simple add - v.add(tft()) + v.add([tft()]) assert rec_add assert not any([rec_update, rec_remove, rec_refresh]) @@ -291,14 +366,14 @@ def test_signals(): # An update that results in a flow being added to the view clearrec() v[0].request.method = "PUT" - v.update(v[0]) + v.update([v[0]]) assert rec_remove assert not any([rec_update, rec_refresh, rec_add]) # An update that does not affect the view just sends update v.set_filter(flowfilter.parse("~m put")) clearrec() - v.update(v[0]) + v.update([v[0]]) assert rec_update assert not any([rec_remove, rec_refresh, rec_add]) @@ -307,33 +382,33 @@ def test_signals(): v.set_filter(flowfilter.parse("~m get")) assert not len(v) clearrec() - v.update(f) + v.update([f]) assert not any([rec_add, rec_update, rec_remove, rec_refresh]) def test_focus_follow(): v = view.View() - with taddons.context(options=options.Options()) as tctx: + with taddons.context() as tctx: tctx.configure(v, console_focus_follow=True, view_filter="~m get") - v.add(tft(start=5)) + v.add([tft(start=5)]) assert v.focus.index == 0 - v.add(tft(start=4)) + v.add([tft(start=4)]) assert v.focus.index == 0 assert v.focus.flow.request.timestamp_start == 4 - v.add(tft(start=7)) + v.add([tft(start=7)]) assert v.focus.index == 2 assert v.focus.flow.request.timestamp_start == 7 mod = tft(method="put", start=6) - v.add(mod) + v.add([mod]) assert v.focus.index == 2 assert v.focus.flow.request.timestamp_start == 7 mod.request.method = "GET" - v.update(mod) + v.update([mod]) assert v.focus.index == 2 assert v.focus.flow.request.timestamp_start == 6 @@ -341,7 +416,7 @@ def test_focus_follow(): def test_focus(): # Special case - initialising with a view that already contains data v = view.View() - v.add(tft()) + v.add([tft()]) f = view.Focus(v) assert f.index is 0 assert f.flow is v[0] @@ -352,7 +427,7 @@ def test_focus(): assert f.index is None assert f.flow is None - v.add(tft(start=1)) + v.add([tft(start=1)]) assert f.index == 0 assert f.flow is v[0] @@ -362,11 +437,11 @@ def test_focus(): with pytest.raises(ValueError): f.__setattr__("index", 99) - v.add(tft(start=0)) + v.add([tft(start=0)]) assert f.index == 1 assert f.flow is v[1] - v.add(tft(start=2)) + v.add([tft(start=2)]) assert f.index == 1 assert f.flow is v[1] @@ -374,22 +449,25 @@ def test_focus(): assert f.index == 0 f.index = 1 - v.remove(v[1]) + v.remove([v[1]]) + v[1].intercept() assert f.index == 1 assert f.flow is v[1] - v.remove(v[1]) + v.remove([v[1]]) assert f.index == 0 assert f.flow is v[0] - v.remove(v[0]) + v.remove([v[0]]) assert f.index is None assert f.flow is None - v.add(tft(method="get", start=0)) - v.add(tft(method="get", start=1)) - v.add(tft(method="put", start=2)) - v.add(tft(method="get", start=3)) + v.add([ + tft(method="get", start=0), + tft(method="get", start=1), + tft(method="put", start=2), + tft(method="get", start=3), + ]) f.flow = v[2] assert f.flow.request.method == "PUT" @@ -409,16 +487,16 @@ def test_settings(): with pytest.raises(KeyError): v.settings[f] - v.add(f) + v.add([f]) v.settings[f]["foo"] = "bar" assert v.settings[f]["foo"] == "bar" assert len(list(v.settings)) == 1 - v.remove(f) + v.remove([f]) with pytest.raises(KeyError): v.settings[f] assert not v.settings.keys() - v.add(f) + v.add([f]) v.settings[f]["foo"] = "bar" assert v.settings.keys() v.clear() @@ -427,7 +505,7 @@ def test_settings(): def test_configure(): v = view.View() - with taddons.context(options=options.Options()) as tctx: + with taddons.context() as tctx: tctx.configure(v, view_filter="~q") with pytest.raises(Exception, match="Invalid interception filter"): tctx.configure(v, view_filter="~~") diff --git a/test/mitmproxy/console/test_flowlist.py b/test/mitmproxy/console/test_flowlist.py index d63dab1c..6d82749d 100644 --- a/test/mitmproxy/console/test_flowlist.py +++ b/test/mitmproxy/console/test_flowlist.py @@ -1,4 +1,3 @@ -from unittest import mock import urwid import mitmproxy.tools.console.flowlist as flowlist @@ -14,13 +13,6 @@ class TestFlowlist: o = options.Options(**opts) return console.master.ConsoleMaster(o, proxy.DummyServer()) - def test_new_request(self): - m = self.mkmaster() - x = flowlist.FlowListBox(m) - with mock.patch('mitmproxy.tools.console.signals.status_message.send') as mock_thing: - x.new_request("nonexistent url", "GET") - mock_thing.assert_called_once_with(message="Invalid URL: No hostname given") - def test_logbuffer_set_focus(self): m = self.mkmaster() b = flowlist.LogBufferBox(m) diff --git a/test/mitmproxy/data/test_flow_export/locust_get.py b/test/mitmproxy/data/test_flow_export/locust_get.py deleted file mode 100644 index 632d5d53..00000000 --- a/test/mitmproxy/data/test_flow_export/locust_get.py +++ /dev/null @@ -1,35 +0,0 @@ -from locust import HttpLocust, TaskSet, task - -class UserBehavior(TaskSet): - def on_start(self): - ''' on_start is called when a Locust start before any task is scheduled ''' - self.path() - - @task() - def path(self): - url = self.locust.host + '/path' - - headers = { - 'header': 'qvalue', - 'content-length': '7', - } - - params = { - 'a': ['foo', 'bar'], - 'b': 'baz', - } - - self.response = self.client.request( - method='GET', - url=url, - headers=headers, - params=params, - ) - - ### Additional tasks can go here ### - - -class WebsiteUser(HttpLocust): - task_set = UserBehavior - min_wait = 1000 - max_wait = 3000 diff --git a/test/mitmproxy/data/test_flow_export/locust_patch.py b/test/mitmproxy/data/test_flow_export/locust_patch.py deleted file mode 100644 index f64e0857..00000000 --- a/test/mitmproxy/data/test_flow_export/locust_patch.py +++ /dev/null @@ -1,37 +0,0 @@ -from locust import HttpLocust, TaskSet, task - -class UserBehavior(TaskSet): - def on_start(self): - ''' on_start is called when a Locust start before any task is scheduled ''' - self.path() - - @task() - def path(self): - url = self.locust.host + '/path' - - headers = { - 'header': 'qvalue', - 'content-length': '7', - } - - params = { - 'query': 'param', - } - - data = '''content''' - - self.response = self.client.request( - method='PATCH', - url=url, - headers=headers, - params=params, - data=data, - ) - - ### Additional tasks can go here ### - - -class WebsiteUser(HttpLocust): - task_set = UserBehavior - min_wait = 1000 - max_wait = 3000 diff --git a/test/mitmproxy/data/test_flow_export/locust_post.py b/test/mitmproxy/data/test_flow_export/locust_post.py deleted file mode 100644 index df23476a..00000000 --- a/test/mitmproxy/data/test_flow_export/locust_post.py +++ /dev/null @@ -1,26 +0,0 @@ -from locust import HttpLocust, TaskSet, task - -class UserBehavior(TaskSet): - def on_start(self): - ''' on_start is called when a Locust start before any task is scheduled ''' - self.path() - - @task() - def path(self): - url = self.locust.host + '/path' - - data = '''content''' - - self.response = self.client.request( - method='POST', - url=url, - data=data, - ) - - ### Additional tasks can go here ### - - -class WebsiteUser(HttpLocust): - task_set = UserBehavior - min_wait = 1000 - max_wait = 3000 diff --git a/test/mitmproxy/data/test_flow_export/locust_task_get.py b/test/mitmproxy/data/test_flow_export/locust_task_get.py deleted file mode 100644 index 03821cd8..00000000 --- a/test/mitmproxy/data/test_flow_export/locust_task_get.py +++ /dev/null @@ -1,20 +0,0 @@ - @task() - def path(self): - url = self.locust.host + '/path' - - headers = { - 'header': 'qvalue', - 'content-length': '7', - } - - params = { - 'a': ['foo', 'bar'], - 'b': 'baz', - } - - self.response = self.client.request( - method='GET', - url=url, - headers=headers, - params=params, - ) diff --git a/test/mitmproxy/data/test_flow_export/locust_task_patch.py b/test/mitmproxy/data/test_flow_export/locust_task_patch.py deleted file mode 100644 index d425209c..00000000 --- a/test/mitmproxy/data/test_flow_export/locust_task_patch.py +++ /dev/null @@ -1,22 +0,0 @@ - @task() - def path(self): - url = self.locust.host + '/path' - - headers = { - 'header': 'qvalue', - 'content-length': '7', - } - - params = { - 'query': 'param', - } - - data = '''content''' - - self.response = self.client.request( - method='PATCH', - url=url, - headers=headers, - params=params, - data=data, - ) diff --git a/test/mitmproxy/data/test_flow_export/locust_task_post.py b/test/mitmproxy/data/test_flow_export/locust_task_post.py deleted file mode 100644 index a5f307ee..00000000 --- a/test/mitmproxy/data/test_flow_export/locust_task_post.py +++ /dev/null @@ -1,11 +0,0 @@ - @task() - def path(self): - url = self.locust.host + '/path' - - data = '''\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff''' - - self.response = self.client.request( - method='POST', - url=url, - data=data, - ) diff --git a/test/mitmproxy/data/test_flow_export/python_get.py b/test/mitmproxy/data/test_flow_export/python_get.py deleted file mode 100644 index e9ed072a..00000000 --- a/test/mitmproxy/data/test_flow_export/python_get.py +++ /dev/null @@ -1,9 +0,0 @@ -import requests - -response = requests.get( - 'http://address:22/path', - params=[('a', 'foo'), ('a', 'bar'), ('b', 'baz')], - headers={'header': 'qvalue'} -) - -print(response.text)
\ No newline at end of file diff --git a/test/mitmproxy/data/test_flow_export/python_patch.py b/test/mitmproxy/data/test_flow_export/python_patch.py deleted file mode 100644 index d83a57b9..00000000 --- a/test/mitmproxy/data/test_flow_export/python_patch.py +++ /dev/null @@ -1,10 +0,0 @@ -import requests - -response = requests.patch( - 'http://address:22/path', - params=[('query', 'param')], - headers={'header': 'qvalue'}, - data=b'content' -) - -print(response.text)
\ No newline at end of file diff --git a/test/mitmproxy/data/test_flow_export/python_post.py b/test/mitmproxy/data/test_flow_export/python_post.py deleted file mode 100644 index 42f1af9a..00000000 --- a/test/mitmproxy/data/test_flow_export/python_post.py +++ /dev/null @@ -1,17 +0,0 @@ -import requests - -response = requests.post( - 'http://address:22/path', - data=(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13' - b'\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./01234567' - b'89:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f' - b'\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f' - b'\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f' - b'\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf' - b'\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf' - b'\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf' - b'\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf' - b'\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef' - b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff') -) -print(response.text) diff --git a/test/mitmproxy/data/test_flow_export/python_post_json.py b/test/mitmproxy/data/test_flow_export/python_post_json.py deleted file mode 100644 index d6ae6357..00000000 --- a/test/mitmproxy/data/test_flow_export/python_post_json.py +++ /dev/null @@ -1,9 +0,0 @@ -import requests - -response = requests.post( - 'http://address:22/path', - headers={'content-type': 'application/json'}, - json={'email': 'example@example.com', 'name': 'example'} -) - -print(response.text)
\ No newline at end of file diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index 64928dbf..958328b2 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -1,12 +1,10 @@ import typing from mitmproxy import command from mitmproxy import flow -from mitmproxy import master -from mitmproxy import options -from mitmproxy import proxy from mitmproxy import exceptions from mitmproxy.test import tflow from mitmproxy.test import taddons +import io import pytest @@ -18,24 +16,41 @@ class TAddon: def cmd2(self, foo: str) -> str: return 99 + def cmd3(self, foo: int) -> int: + return foo + def empty(self) -> None: pass + def varargs(self, one: str, *var: typing.Sequence[str]) -> typing.Sequence[str]: + return list(var) + class TestCommand: + def test_varargs(self): + with taddons.context() as tctx: + cm = command.CommandManager(tctx.master) + a = TAddon() + c = command.Command(cm, "varargs", a.varargs) + assert c.signature_help() == "varargs str *str -> [str]" + assert c.call(["one", "two", "three"]) == ["two", "three"] + with pytest.raises(exceptions.CommandError): + c.call(["one", "two", 3]) + def test_call(self): - o = options.Options() - m = master.Master(o, proxy.DummyServer(o)) - cm = command.CommandManager(m) + with taddons.context() as tctx: + cm = command.CommandManager(tctx.master) + a = TAddon() + c = command.Command(cm, "cmd.path", a.cmd1) + assert c.call(["foo"]) == "ret foo" + assert c.signature_help() == "cmd.path str -> str" - a = TAddon() - c = command.Command(cm, "cmd.path", a.cmd1) - assert c.call(["foo"]) == "ret foo" - assert c.signature_help() == "cmd.path str -> str" + c = command.Command(cm, "cmd.two", a.cmd2) + with pytest.raises(exceptions.CommandError): + c.call(["foo"]) - c = command.Command(cm, "cmd.two", a.cmd2) - with pytest.raises(exceptions.CommandError): - c.call(["foo"]) + c = command.Command(cm, "cmd.three", a.cmd3) + assert c.call(["1"]) == 1 def test_simple(): @@ -55,27 +70,48 @@ def test_simple(): c.add("empty", a.empty) c.call("empty") + fp = io.StringIO() + c.dump(fp) + assert fp.getvalue() + def test_typename(): assert command.typename(str, True) == "str" assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]" assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec" + + assert command.typename(command.Cuts, False) == "cutspec" + assert command.typename(command.Cuts, True) == "[cuts]" + assert command.typename(flow.Flow, False) == "flow" + assert command.typename(typing.Sequence[str], False) == "[str]" class DummyConsole: - def load(self, l): - l.add_command("console.resolve", self.resolve) - + @command.command("view.resolve") def resolve(self, spec: str) -> typing.Sequence[flow.Flow]: n = int(spec) return [tflow.tflow(resp=True)] * n + @command.command("cut") + def cut(self, spec: str) -> command.Cuts: + return [["test"]] + def test_parsearg(): with taddons.context() as tctx: tctx.master.addons.add(DummyConsole()) assert command.parsearg(tctx.master.commands, "foo", str) == "foo" + + assert command.parsearg(tctx.master.commands, "1", int) == 1 + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "foo", int) + + assert command.parsearg(tctx.master.commands, "true", bool) is True + assert command.parsearg(tctx.master.commands, "false", bool) is False + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "flobble", bool) + assert len(command.parsearg( tctx.master.commands, "2", typing.Sequence[flow.Flow] )) == 2 @@ -87,6 +123,17 @@ def test_parsearg(): with pytest.raises(exceptions.CommandError): command.parsearg(tctx.master.commands, "foo", Exception) + assert command.parsearg( + tctx.master.commands, "foo", command.Cuts + ) == [["test"]] + + assert command.parsearg( + tctx.master.commands, "foo", typing.Sequence[str] + ) == ["foo"] + assert command.parsearg( + tctx.master.commands, "foo, bar", typing.Sequence[str] + ) == ["foo", "bar"] + class TDec: @command.command("cmd1") diff --git a/test/mitmproxy/test_connections.py b/test/mitmproxy/test_connections.py index 67a6552f..e320885d 100644 --- a/test/mitmproxy/test_connections.py +++ b/test/mitmproxy/test_connections.py @@ -99,7 +99,7 @@ class TestServerConnection: c.alpn_proto_negotiated = b'h2' assert 'address:22' in repr(c) assert 'ALPN' in repr(c) - assert 'TLS: foobar' in repr(c) + assert 'TLSv1.2: foobar' in repr(c) c.sni = None c.tls_established = True diff --git a/test/mitmproxy/test_export.py b/test/mitmproxy/test_export.py deleted file mode 100644 index b789e6b5..00000000 --- a/test/mitmproxy/test_export.py +++ /dev/null @@ -1,133 +0,0 @@ -import re - -import pytest - -from mitmproxy import export # heh -from mitmproxy.net.http import Headers -from mitmproxy.test import tflow -from mitmproxy.test import tutils - - -def clean_blanks(s): - return re.sub(r"^\s+", "", s, flags=re.MULTILINE) - - -def python_equals(testdata, text): - """ - Compare two bits of Python code, disregarding non-significant differences - like whitespace on blank lines and trailing space. - """ - d = open(tutils.test_data.path(testdata)).read() - assert clean_blanks(text).rstrip() == clean_blanks(d).rstrip() - - -@pytest.fixture -def get_request(): - return tflow.tflow( - req=tutils.treq( - method=b'GET', - content=b'', - path=b"/path?a=foo&a=bar&b=baz" - ) - ) - - -@pytest.fixture -def post_request(): - return tflow.tflow( - req=tutils.treq( - method=b'POST', - headers=(), - content=bytes(range(256)) - ) - ) - - -@pytest.fixture -def patch_request(): - return tflow.tflow( - req=tutils.treq(method=b'PATCH', path=b"/path?query=param") - ) - - -class TExport: - def test_get(self, get_request): - raise NotImplementedError() - - def test_post(self, post_request): - raise NotImplementedError() - - def test_patch(self, patch_request): - raise NotImplementedError() - - -class TestExportCurlCommand(TExport): - def test_get(self, get_request): - result = """curl -H 'header:qvalue' -H 'content-length:7' 'http://address:22/path?a=foo&a=bar&b=baz'""" - assert export.curl_command(get_request) == result - - def test_post(self, post_request): - result = "curl -X POST 'http://address:22/path' --data-binary '{}'".format( - str(bytes(range(256)))[2:-1] - ) - assert export.curl_command(post_request) == result - - def test_patch(self, patch_request): - result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'""" - assert export.curl_command(patch_request) == result - - -class TestExportPythonCode(TExport): - def test_get(self, get_request): - python_equals("mitmproxy/data/test_flow_export/python_get.py", - export.python_code(get_request)) - - def test_post(self, post_request): - python_equals("mitmproxy/data/test_flow_export/python_post.py", - export.python_code(post_request)) - - def test_post_json(self, post_request): - post_request.request.content = b'{"name": "example", "email": "example@example.com"}' - post_request.request.headers = Headers(content_type="application/json") - python_equals("mitmproxy/data/test_flow_export/python_post_json.py", - export.python_code(post_request)) - - def test_patch(self, patch_request): - python_equals("mitmproxy/data/test_flow_export/python_patch.py", - export.python_code(patch_request)) - - -class TestExportLocustCode(TExport): - def test_get(self, get_request): - python_equals("mitmproxy/data/test_flow_export/locust_get.py", - export.locust_code(get_request)) - - def test_post(self, post_request): - post_request.request.content = b'content' - post_request.request.headers.clear() - python_equals("mitmproxy/data/test_flow_export/locust_post.py", - export.locust_code(post_request)) - - def test_patch(self, patch_request): - python_equals("mitmproxy/data/test_flow_export/locust_patch.py", - export.locust_code(patch_request)) - - -class TestExportLocustTask(TExport): - def test_get(self, get_request): - python_equals("mitmproxy/data/test_flow_export/locust_task_get.py", - export.locust_task(get_request)) - - def test_post(self, post_request): - python_equals("mitmproxy/data/test_flow_export/locust_task_post.py", - export.locust_task(post_request)) - - def test_patch(self, patch_request): - python_equals("mitmproxy/data/test_flow_export/locust_task_patch.py", - export.locust_task(patch_request)) - - -class TestURL: - def test_url(self): - flow = tflow.tflow() - assert export.url(flow) == "http://address:22/path" diff --git a/test/mitmproxy/test_flow.py b/test/mitmproxy/test_flow.py index 78f893c0..19f0e7d9 100644 --- a/test/mitmproxy/test_flow.py +++ b/test/mitmproxy/test_flow.py @@ -113,10 +113,6 @@ class TestFlowMaster: with pytest.raises(Exception, match="live"): fm.replay_request(f) - def test_create_flow(self): - fm = master.Master(None, DummyServer()) - assert fm.create_request("GET", "http://example.com/") - def test_all(self): s = tservers.TestState() fm = master.Master(None, DummyServer()) diff --git a/test/mitmproxy/tools/web/test_app.py b/test/mitmproxy/tools/web/test_app.py index e3d5dc44..2b6181d3 100644 --- a/test/mitmproxy/tools/web/test_app.py +++ b/test/mitmproxy/tools/web/test_app.py @@ -23,8 +23,8 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): m = webmaster.WebMaster(o, proxy.DummyServer(), with_termlog=False) f = tflow.tflow(resp=True) f.id = "42" - m.view.add(f) - m.view.add(tflow.tflow(err=True)) + m.view.add([f]) + m.view.add([tflow.tflow(err=True)]) m.add_log("test log", "info") self.master = m self.view = m.view @@ -78,7 +78,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): # restore for f in flows: - self.view.add(f) + self.view.add([f]) self.events.data = events def test_resume(self): @@ -110,7 +110,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): assert self.fetch("/flows/42", method="DELETE").code == 200 assert not self.view.get_by_id("42") - self.view.add(f) + self.view.add([f]) assert self.fetch("/flows/1234", method="DELETE").code == 404 @@ -162,7 +162,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): f = self.view.get_by_id(resp.body.decode()) assert f assert f.id != "42" - self.view.remove(f) + self.view.remove([f]) def test_flow_revert(self): f = self.view.get_by_id("42") diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py index 22bd7c34..fe33070e 100644 --- a/test/mitmproxy/utils/test_typecheck.py +++ b/test/mitmproxy/utils/test_typecheck.py @@ -4,6 +4,7 @@ from unittest import mock import pytest from mitmproxy.utils import typecheck +from mitmproxy import command class TBase: @@ -87,15 +88,26 @@ def test_check_any(): typecheck.check_option_type("foo", None, typing.Any) -def test_check_command_return_type(): - assert(typecheck.check_command_return_type("foo", str)) - assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str])) - assert(typecheck.check_command_return_type(None, None)) - assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int])) - assert(not typecheck.check_command_return_type("foo", typing.Sequence[int])) +def test_check_command_type(): + assert(typecheck.check_command_type("foo", str)) + assert(typecheck.check_command_type(["foo"], typing.Sequence[str])) + assert(not typecheck.check_command_type(["foo", 1], typing.Sequence[str])) + assert(typecheck.check_command_type(None, None)) + assert(not typecheck.check_command_type(["foo"], typing.Sequence[int])) + assert(not typecheck.check_command_type("foo", typing.Sequence[int])) + assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts)) + assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts)) + assert(not typecheck.check_command_type([["foo", 22]], command.Cuts)) # Python 3.5 only defines __parameters__ m = mock.Mock() m.__str__ = lambda self: "typing.Sequence" m.__parameters__ = (int,) - typecheck.check_command_return_type([10], m) + + typecheck.check_command_type([10], m) + + # Python 3.5 only defines __union_params__ + m = mock.Mock() + m.__str__ = lambda self: "typing.Union" + m.__union_params__ = (int,) + assert not typecheck.check_command_type([22], m) |