diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/mitmproxy/addons/test_clientplayback.py | 19 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_core.py | 63 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_save.py | 83 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_script.py | 70 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_serverplayback.py | 12 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_streamfile.py | 62 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_view.py | 159 | ||||
-rw-r--r-- | test/mitmproxy/test_addonmanager.py | 20 | ||||
-rw-r--r-- | test/mitmproxy/test_command.py | 128 | ||||
-rw-r--r-- | test/mitmproxy/test_optmanager.py | 5 | ||||
-rw-r--r-- | test/mitmproxy/tools/web/test_app.py | 10 | ||||
-rw-r--r-- | test/mitmproxy/utils/test_typecheck.py | 66 |
12 files changed, 538 insertions, 159 deletions
diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py index f71662f0..7ffda317 100644 --- a/test/mitmproxy/addons/test_clientplayback.py +++ b/test/mitmproxy/addons/test_clientplayback.py @@ -26,7 +26,7 @@ class TestClientPlayback: with taddons.context() as tctx: assert cp.count() == 0 f = tflow.tflow(resp=True) - cp.load([f]) + cp.start_replay([f]) assert cp.count() == 1 RP = "mitmproxy.proxy.protocol.http_replay.RequestReplayThread" with mock.patch(RP) as rp: @@ -44,13 +44,30 @@ class TestClientPlayback: cp.tick() assert cp.current_thread is None + cp.start_replay([f]) + cp.stop_replay() + assert not cp.flows + + def test_load_file(self, tmpdir): + cp = clientplayback.ClientPlayback() + with taddons.context(): + fpath = str(tmpdir.join("flows")) + tdump(fpath, [tflow.tflow(resp=True)]) + cp.load_file(fpath) + assert cp.flows + with pytest.raises(exceptions.CommandError): + cp.load_file("/nonexistent") + def test_configure(self, tmpdir): cp = clientplayback.ClientPlayback() with taddons.context() as tctx: path = str(tmpdir.join("flows")) tdump(path, [tflow.tflow()]) tctx.configure(cp, client_replay=[path]) + cp.configured = False tctx.configure(cp, client_replay=[]) + cp.configured = False tctx.configure(cp) + cp.configured = False with pytest.raises(exceptions.OptionsError): tctx.configure(cp, client_replay=["nonexistent"]) diff --git a/test/mitmproxy/addons/test_core.py b/test/mitmproxy/addons/test_core.py new file mode 100644 index 00000000..64d0fa19 --- /dev/null +++ b/test/mitmproxy/addons/test_core.py @@ -0,0 +1,63 @@ +from mitmproxy.addons import core +from mitmproxy.test import taddons +from mitmproxy.test import tflow +from mitmproxy import exceptions +import pytest + + +def test_set(): + sa = core.Core() + with taddons.context() as tctx: + tctx.master.addons.add(sa) + + assert not tctx.master.options.anticomp + tctx.command(sa.set, "anticomp") + assert tctx.master.options.anticomp + + with pytest.raises(exceptions.CommandError): + tctx.command(sa.set, "nonexistent") + + +def test_resume(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + assert not sa.resume([f]) + f.intercept() + sa.resume([f]) + assert not f.reply.state == "taken" + + +def test_mark(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + assert not f.marked + sa.mark([f], True) + assert f.marked + + sa.mark_toggle([f]) + assert not f.marked + sa.mark_toggle([f]) + assert f.marked + + +def test_kill(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + f.intercept() + assert f.killable + sa.kill([f]) + assert not f.killable + + +def test_revert(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + f.backup() + f.request.content = b"bar" + assert f.modified() + sa.revert([f]) + assert not f.modified() diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py new file mode 100644 index 00000000..85c2a398 --- /dev/null +++ b/test/mitmproxy/addons/test_save.py @@ -0,0 +1,83 @@ +import pytest + +from mitmproxy.test import taddons +from mitmproxy.test import tflow + +from mitmproxy import io +from mitmproxy import exceptions +from mitmproxy import options +from mitmproxy.addons import save +from mitmproxy.addons import view + + +def test_configure(tmpdir): + sa = save.Save() + with taddons.context(options=options.Options()) as tctx: + with pytest.raises(exceptions.OptionsError): + tctx.configure(sa, save_stream_file=str(tmpdir)) + with pytest.raises(Exception, match="Invalid filter"): + tctx.configure( + sa, save_stream_file=str(tmpdir.join("foo")), save_stream_filter="~~" + ) + tctx.configure(sa, save_stream_filter="foo") + assert sa.filt + tctx.configure(sa, save_stream_filter=None) + assert not sa.filt + + +def rd(p): + x = io.FlowReader(open(p, "rb")) + return list(x.stream()) + + +def test_tcp(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + tctx.configure(sa, save_stream_file=p) + + tt = tflow.ttcpflow() + sa.tcp_start(tt) + sa.tcp_end(tt) + tctx.configure(sa, save_stream_file=None) + assert rd(p) + + +def test_save_command(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + sa.save([tflow.tflow(resp=True)], p) + assert len(rd(p)) == 1 + sa.save([tflow.tflow(resp=True)], p) + assert len(rd(p)) == 1 + sa.save([tflow.tflow(resp=True)], "+" + p) + assert len(rd(p)) == 2 + + with pytest.raises(exceptions.CommandError): + sa.save([tflow.tflow(resp=True)], str(tmpdir)) + + v = view.View() + tctx.master.addons.add(v) + tctx.master.addons.add(sa) + tctx.master.commands.call_args("save.file", ["@shown", p]) + + +def test_simple(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + + tctx.configure(sa, save_stream_file=p) + + f = tflow.tflow(resp=True) + sa.request(f) + sa.response(f) + tctx.configure(sa, save_stream_file=None) + assert rd(p)[0].response + + tctx.configure(sa, save_stream_file="+" + p) + f = tflow.tflow() + sa.request(f) + tctx.configure(sa, save_stream_file=None) + assert not rd(p)[1].response diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index 859d99f9..a3df1fcf 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -9,9 +9,6 @@ from mitmproxy.test import tutils from mitmproxy.test import taddons from mitmproxy import addonmanager from mitmproxy import exceptions -from mitmproxy import options -from mitmproxy import proxy -from mitmproxy import master from mitmproxy.addons import script @@ -48,9 +45,9 @@ def test_script_print_stdout(): class TestScript: def test_notfound(self): - with taddons.context() as tctx: - sc = script.Script("nonexistent") - tctx.master.addons.add(sc) + with taddons.context(): + with pytest.raises(exceptions.OptionsError): + script.Script("nonexistent") def test_simple(self): with taddons.context() as tctx: @@ -136,25 +133,45 @@ class TestCutTraceback: class TestScriptLoader: - def test_simple(self): - o = options.Options(scripts=[]) - m = master.Master(o, proxy.DummyServer()) + def test_script_run(self): + rp = tutils.test_data.path( + "mitmproxy/data/addonscripts/recorder/recorder.py" + ) sc = script.ScriptLoader() - sc.running() - m.addons.add(sc) - assert len(m.addons) == 1 - o.update( - scripts = [ - tutils.test_data.path( - "mitmproxy/data/addonscripts/recorder/recorder.py" - ) + with taddons.context() as tctx: + sc.script_run([tflow.tflow(resp=True)], rp) + debug = [i.msg for i in tctx.master.logs if i.level == "debug"] + assert debug == [ + 'recorder load', 'recorder running', 'recorder configure', + 'recorder tick', + 'recorder requestheaders', 'recorder request', + 'recorder responseheaders', 'recorder response' ] - ) - assert len(m.addons) == 1 - assert len(sc.addons) == 1 - o.update(scripts = []) - assert len(m.addons) == 1 - assert len(sc.addons) == 0 + + def test_script_run_nonexistent(self): + sc = script.ScriptLoader() + with taddons.context(): + with pytest.raises(exceptions.CommandError): + sc.script_run([tflow.tflow(resp=True)], "/nonexistent") + + def test_simple(self): + sc = script.ScriptLoader() + with taddons.context() as tctx: + tctx.master.addons.add(sc) + sc.running() + assert len(tctx.master.addons) == 1 + tctx.master.options.update( + scripts = [ + tutils.test_data.path( + "mitmproxy/data/addonscripts/recorder/recorder.py" + ) + ] + ) + assert len(tctx.master.addons) == 1 + assert len(sc.addons) == 1 + tctx.master.options.update(scripts = []) + assert len(tctx.master.addons) == 1 + assert len(sc.addons) == 0 def test_dupes(self): sc = script.ScriptLoader() @@ -166,13 +183,6 @@ class TestScriptLoader: scripts = ["one", "one"] ) - def test_nonexistent(self): - sc = script.ScriptLoader() - with taddons.context() as tctx: - tctx.master.addons.add(sc) - tctx.configure(sc, scripts = ["nonexistent"]) - tctx.master.has_log("nonexistent: file not found") - def test_order(self): rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder") sc = script.ScriptLoader() diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py index 29de48a0..3ceab3fa 100644 --- a/test/mitmproxy/addons/test_serverplayback.py +++ b/test/mitmproxy/addons/test_serverplayback.py @@ -16,12 +16,24 @@ def tdump(path, flows): w.add(i) +def test_load_file(tmpdir): + s = serverplayback.ServerPlayback() + with taddons.context(): + fpath = str(tmpdir.join("flows")) + tdump(fpath, [tflow.tflow(resp=True)]) + s.load_file(fpath) + assert s.flowmap + with pytest.raises(exceptions.CommandError): + s.load_file("/nonexistent") + + def test_config(tmpdir): s = serverplayback.ServerPlayback() with taddons.context() as tctx: fpath = str(tmpdir.join("flows")) tdump(fpath, [tflow.tflow(resp=True)]) tctx.configure(s, server_replay=[fpath]) + s.configured = False with pytest.raises(exceptions.OptionsError): tctx.configure(s, server_replay=[str(tmpdir)]) diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py deleted file mode 100644 index bcb27c79..00000000 --- a/test/mitmproxy/addons/test_streamfile.py +++ /dev/null @@ -1,62 +0,0 @@ -import pytest - -from mitmproxy.test import taddons -from mitmproxy.test import tflow - -from mitmproxy import io -from mitmproxy import exceptions -from mitmproxy import options -from mitmproxy.addons import streamfile - - -def test_configure(tmpdir): - sa = streamfile.StreamFile() - with taddons.context(options=options.Options()) as tctx: - with pytest.raises(exceptions.OptionsError): - tctx.configure(sa, streamfile=str(tmpdir)) - with pytest.raises(Exception, match="Invalid filter"): - tctx.configure( - sa, streamfile=str(tmpdir.join("foo")), streamfile_filter="~~" - ) - tctx.configure(sa, streamfile_filter="foo") - assert sa.filt - tctx.configure(sa, streamfile_filter=None) - assert not sa.filt - - -def rd(p): - x = io.FlowReader(open(p, "rb")) - return list(x.stream()) - - -def test_tcp(tmpdir): - sa = streamfile.StreamFile() - with taddons.context() as tctx: - p = str(tmpdir.join("foo")) - tctx.configure(sa, streamfile=p) - - tt = tflow.ttcpflow() - sa.tcp_start(tt) - sa.tcp_end(tt) - tctx.configure(sa, streamfile=None) - assert rd(p) - - -def test_simple(tmpdir): - sa = streamfile.StreamFile() - with taddons.context() as tctx: - p = str(tmpdir.join("foo")) - - tctx.configure(sa, streamfile=p) - - f = tflow.tflow(resp=True) - sa.request(f) - sa.response(f) - tctx.configure(sa, streamfile=None) - assert rd(p)[0].response - - tctx.configure(sa, streamfile="+" + p) - f = tflow.tflow() - sa.request(f) - tctx.configure(sa, streamfile=None) - assert not rd(p)[1].response diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py index 7fa3819e..979f0aa1 100644 --- a/test/mitmproxy/addons/test_view.py +++ b/test/mitmproxy/addons/test_view.py @@ -4,7 +4,7 @@ from mitmproxy.test import tflow from mitmproxy.addons import view from mitmproxy import flowfilter -from mitmproxy import options +from mitmproxy import exceptions from mitmproxy.test import taddons @@ -25,12 +25,12 @@ def test_order_refresh(): v.sig_view_refresh.connect(save) tf = tflow.tflow(resp=True) - with taddons.context(options=options.Options()) as tctx: + with taddons.context() as tctx: tctx.configure(v, console_order="time") - v.add(tf) + v.add([tf]) tf.request.timestamp_start = 1 assert not sargs - v.update(tf) + v.update([tf]) assert sargs @@ -130,9 +130,99 @@ def test_filter(): assert len(v) == 4 +def test_load(): + v = view.View() + with taddons.context() as tctx: + tctx.master.addons.add(v) + + +def test_resolve(): + v = view.View() + with taddons.context() as tctx: + assert tctx.command(v.resolve, "@all") == [] + assert tctx.command(v.resolve, "@focus") == [] + assert tctx.command(v.resolve, "@shown") == [] + assert tctx.command(v.resolve, "@hidden") == [] + assert tctx.command(v.resolve, "@marked") == [] + assert tctx.command(v.resolve, "@unmarked") == [] + assert tctx.command(v.resolve, "~m get") == [] + v.request(tft(method="get")) + assert len(tctx.command(v.resolve, "~m get")) == 1 + assert len(tctx.command(v.resolve, "@focus")) == 1 + assert len(tctx.command(v.resolve, "@all")) == 1 + assert len(tctx.command(v.resolve, "@shown")) == 1 + assert len(tctx.command(v.resolve, "@unmarked")) == 1 + assert tctx.command(v.resolve, "@hidden") == [] + assert tctx.command(v.resolve, "@marked") == [] + v.request(tft(method="put")) + assert len(tctx.command(v.resolve, "@focus")) == 1 + assert len(tctx.command(v.resolve, "@shown")) == 2 + assert len(tctx.command(v.resolve, "@all")) == 2 + assert tctx.command(v.resolve, "@hidden") == [] + assert tctx.command(v.resolve, "@marked") == [] + + v.request(tft(method="get")) + v.request(tft(method="put")) + + f = flowfilter.parse("~m get") + v.set_filter(f) + v[0].marked = True + + def m(l): + return [i.request.method for i in l] + + assert m(tctx.command(v.resolve, "~m get")) == ["GET", "GET"] + assert m(tctx.command(v.resolve, "~m put")) == ["PUT", "PUT"] + assert m(tctx.command(v.resolve, "@shown")) == ["GET", "GET"] + assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"] + assert m(tctx.command(v.resolve, "@marked")) == ["GET"] + assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"] + assert m(tctx.command(v.resolve, "@all")) == ["GET", "PUT", "GET", "PUT"] + + with pytest.raises(exceptions.CommandError, match="Invalid flow filter"): + tctx.command(v.resolve, "~") + + +def test_go(): + v = view.View() + with taddons.context(): + v.add([ + tflow.tflow(), + tflow.tflow(), + tflow.tflow(), + tflow.tflow(), + tflow.tflow(), + ]) + assert v.focus.index == 0 + v.go(-1) + assert v.focus.index == 4 + v.go(0) + assert v.focus.index == 0 + v.go(1) + assert v.focus.index == 1 + v.go(999) + assert v.focus.index == 4 + v.go(-999) + assert v.focus.index == 0 + + +def test_duplicate(): + v = view.View() + with taddons.context(): + f = [ + tflow.tflow(), + tflow.tflow(), + ] + v.add(f) + assert len(v) == 2 + v.duplicate(f) + assert len(v) == 4 + assert v.focus.index == 2 + + def test_order(): v = view.View() - with taddons.context(options=options.Options()) as tctx: + with taddons.context() as tctx: v.request(tft(method="get", start=1)) v.request(tft(method="put", start=2)) v.request(tft(method="get", start=3)) @@ -180,14 +270,14 @@ def test_update(): assert f in v f.request.method = "put" - v.update(f) + v.update([f]) assert f not in v f.request.method = "get" - v.update(f) + v.update([f]) assert f in v - v.update(f) + v.update([f]) assert f in v @@ -226,7 +316,7 @@ def test_signals(): assert not any([rec_add, rec_update, rec_remove, rec_refresh]) # Simple add - v.add(tft()) + v.add([tft()]) assert rec_add assert not any([rec_update, rec_remove, rec_refresh]) @@ -241,14 +331,14 @@ def test_signals(): # An update that results in a flow being added to the view clearrec() v[0].request.method = "PUT" - v.update(v[0]) + v.update([v[0]]) assert rec_remove assert not any([rec_update, rec_refresh, rec_add]) # An update that does not affect the view just sends update v.set_filter(flowfilter.parse("~m put")) clearrec() - v.update(v[0]) + v.update([v[0]]) assert rec_update assert not any([rec_remove, rec_refresh, rec_add]) @@ -257,33 +347,33 @@ def test_signals(): v.set_filter(flowfilter.parse("~m get")) assert not len(v) clearrec() - v.update(f) + v.update([f]) assert not any([rec_add, rec_update, rec_remove, rec_refresh]) def test_focus_follow(): v = view.View() - with taddons.context(options=options.Options()) as tctx: + with taddons.context() as tctx: tctx.configure(v, console_focus_follow=True, view_filter="~m get") - v.add(tft(start=5)) + v.add([tft(start=5)]) assert v.focus.index == 0 - v.add(tft(start=4)) + v.add([tft(start=4)]) assert v.focus.index == 0 assert v.focus.flow.request.timestamp_start == 4 - v.add(tft(start=7)) + v.add([tft(start=7)]) assert v.focus.index == 2 assert v.focus.flow.request.timestamp_start == 7 mod = tft(method="put", start=6) - v.add(mod) + v.add([mod]) assert v.focus.index == 2 assert v.focus.flow.request.timestamp_start == 7 mod.request.method = "GET" - v.update(mod) + v.update([mod]) assert v.focus.index == 2 assert v.focus.flow.request.timestamp_start == 6 @@ -291,7 +381,7 @@ def test_focus_follow(): def test_focus(): # Special case - initialising with a view that already contains data v = view.View() - v.add(tft()) + v.add([tft()]) f = view.Focus(v) assert f.index is 0 assert f.flow is v[0] @@ -302,7 +392,7 @@ def test_focus(): assert f.index is None assert f.flow is None - v.add(tft(start=1)) + v.add([tft(start=1)]) assert f.index == 0 assert f.flow is v[0] @@ -312,11 +402,11 @@ def test_focus(): with pytest.raises(ValueError): f.__setattr__("index", 99) - v.add(tft(start=0)) + v.add([tft(start=0)]) assert f.index == 1 assert f.flow is v[1] - v.add(tft(start=2)) + v.add([tft(start=2)]) assert f.index == 1 assert f.flow is v[1] @@ -324,22 +414,25 @@ def test_focus(): assert f.index == 0 f.index = 1 - v.remove(v[1]) + v.remove([v[1]]) + v[1].intercept() assert f.index == 1 assert f.flow is v[1] - v.remove(v[1]) + v.remove([v[1]]) assert f.index == 0 assert f.flow is v[0] - v.remove(v[0]) + v.remove([v[0]]) assert f.index is None assert f.flow is None - v.add(tft(method="get", start=0)) - v.add(tft(method="get", start=1)) - v.add(tft(method="put", start=2)) - v.add(tft(method="get", start=3)) + v.add([ + tft(method="get", start=0), + tft(method="get", start=1), + tft(method="put", start=2), + tft(method="get", start=3), + ]) f.flow = v[2] assert f.flow.request.method == "PUT" @@ -359,16 +452,16 @@ def test_settings(): with pytest.raises(KeyError): v.settings[f] - v.add(f) + v.add([f]) v.settings[f]["foo"] = "bar" assert v.settings[f]["foo"] == "bar" assert len(list(v.settings)) == 1 - v.remove(f) + v.remove([f]) with pytest.raises(KeyError): v.settings[f] assert not v.settings.keys() - v.add(f) + v.add([f]) v.settings[f]["foo"] = "bar" assert v.settings.keys() v.clear() @@ -377,7 +470,7 @@ def test_settings(): def test_configure(): v = view.View() - with taddons.context(options=options.Options()) as tctx: + with taddons.context() as tctx: tctx.configure(v, view_filter="~q") with pytest.raises(Exception, match="Invalid interception filter"): tctx.configure(v, view_filter="~~") diff --git a/test/mitmproxy/test_addonmanager.py b/test/mitmproxy/test_addonmanager.py index 7b461580..678bc1b7 100644 --- a/test/mitmproxy/test_addonmanager.py +++ b/test/mitmproxy/test_addonmanager.py @@ -4,6 +4,7 @@ from mitmproxy import addons from mitmproxy import addonmanager from mitmproxy import exceptions from mitmproxy import options +from mitmproxy import command from mitmproxy import master from mitmproxy import proxy from mitmproxy.test import taddons @@ -18,6 +19,10 @@ class TAddon: if addons: self.addons = addons + @command.command("test.command") + def testcommand(self) -> str: + return "here" + def __repr__(self): return "Addon(%s)" % self.name @@ -38,6 +43,12 @@ class AOption: l.add_option("custom_option", bool, False, "help") +def test_command(): + with taddons.context() as tctx: + tctx.master.addons.add(TAddon("test")) + assert tctx.master.commands.call("test.command") == "here" + + def test_halt(): o = options.Options() m = master.Master(o, proxy.DummyServer(o)) @@ -61,9 +72,9 @@ def test_lifecycle(): a = addonmanager.AddonManager(m) a.add(TAddon("one")) - with pytest.raises(exceptions.AddonError): + with pytest.raises(exceptions.AddonManagerError): a.add(TAddon("one")) - with pytest.raises(exceptions.AddonError): + with pytest.raises(exceptions.AddonManagerError): a.remove(TAddon("nonexistent")) f = tflow.tflow() @@ -82,6 +93,11 @@ def test_loader(): l.add_option("custom_option", bool, False, "help") l.add_option("custom_option", bool, False, "help") + def cmd(a: str) -> str: + return "foo" + + l.add_command("test.command", cmd) + def test_simple(): with taddons.context() as tctx: diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py new file mode 100644 index 00000000..96d79dba --- /dev/null +++ b/test/mitmproxy/test_command.py @@ -0,0 +1,128 @@ +import typing +from mitmproxy import command +from mitmproxy import flow +from mitmproxy import master +from mitmproxy import options +from mitmproxy import proxy +from mitmproxy import exceptions +from mitmproxy.test import tflow +from mitmproxy.test import taddons +import pytest + + +class TAddon: + def cmd1(self, foo: str) -> str: + """cmd1 help""" + return "ret " + foo + + def cmd2(self, foo: str) -> str: + return 99 + + def empty(self) -> None: + pass + + +class TestCommand: + def test_call(self): + o = options.Options() + m = master.Master(o, proxy.DummyServer(o)) + cm = command.CommandManager(m) + + a = TAddon() + c = command.Command(cm, "cmd.path", a.cmd1) + assert c.call(["foo"]) == "ret foo" + assert c.signature_help() == "cmd.path str -> str" + + c = command.Command(cm, "cmd.two", a.cmd2) + with pytest.raises(exceptions.CommandError): + c.call(["foo"]) + + +def test_simple(): + with taddons.context() as tctx: + c = command.CommandManager(tctx.master) + a = TAddon() + c.add("one.two", a.cmd1) + assert c.commands["one.two"].help == "cmd1 help" + assert(c.call("one.two foo") == "ret foo") + with pytest.raises(exceptions.CommandError, match="Unknown"): + c.call("nonexistent") + with pytest.raises(exceptions.CommandError, match="Invalid"): + c.call("") + with pytest.raises(exceptions.CommandError, match="Usage"): + c.call("one.two too many args") + + c.add("empty", a.empty) + c.call("empty") + + +def test_typename(): + assert command.typename(str, True) == "str" + assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]" + assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec" + assert command.typename(flow.Flow, False) == "flow" + + +class DummyConsole: + def load(self, l): + l.add_command("view.resolve", self.resolve) + + def resolve(self, spec: str) -> typing.Sequence[flow.Flow]: + n = int(spec) + return [tflow.tflow(resp=True)] * n + + +def test_parsearg(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + assert command.parsearg(tctx.master.commands, "foo", str) == "foo" + + assert command.parsearg(tctx.master.commands, "1", int) == 1 + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "foo", int) + + assert command.parsearg(tctx.master.commands, "true", bool) is True + assert command.parsearg(tctx.master.commands, "false", bool) is False + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "flobble", bool) + + assert len(command.parsearg( + tctx.master.commands, "2", typing.Sequence[flow.Flow] + )) == 2 + assert command.parsearg(tctx.master.commands, "1", flow.Flow) + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "2", flow.Flow) + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "0", flow.Flow) + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "foo", Exception) + + +class TDec: + @command.command("cmd1") + def cmd1(self, foo: str) -> str: + """cmd1 help""" + return "ret " + foo + + @command.command("cmd2") + def cmd2(self, foo: str) -> str: + return 99 + + @command.command("empty") + def empty(self) -> None: + pass + + +def test_decorator(): + with taddons.context() as tctx: + c = command.CommandManager(tctx.master) + a = TDec() + c.collect_commands(a) + assert "cmd1" in c.commands + assert c.call("cmd1 bar") == "ret bar" + assert "empty" in c.commands + assert c.call("empty") is None + + with taddons.context() as tctx: + tctx.master.addons.add(a) + assert tctx.master.commands.call("cmd1 bar") == "ret bar" diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py index a685570f..04ec7ded 100644 --- a/test/mitmproxy/test_optmanager.py +++ b/test/mitmproxy/test_optmanager.py @@ -381,6 +381,11 @@ def test_set(): with pytest.raises(exceptions.OptionsError): opts.set("bool=wobble") + opts.set("bool=toggle") + assert opts.bool is False + opts.set("bool=toggle") + assert opts.bool is True + opts.set("int=1") assert opts.int == 1 with pytest.raises(exceptions.OptionsError): diff --git a/test/mitmproxy/tools/web/test_app.py b/test/mitmproxy/tools/web/test_app.py index e3d5dc44..2b6181d3 100644 --- a/test/mitmproxy/tools/web/test_app.py +++ b/test/mitmproxy/tools/web/test_app.py @@ -23,8 +23,8 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): m = webmaster.WebMaster(o, proxy.DummyServer(), with_termlog=False) f = tflow.tflow(resp=True) f.id = "42" - m.view.add(f) - m.view.add(tflow.tflow(err=True)) + m.view.add([f]) + m.view.add([tflow.tflow(err=True)]) m.add_log("test log", "info") self.master = m self.view = m.view @@ -78,7 +78,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): # restore for f in flows: - self.view.add(f) + self.view.add([f]) self.events.data = events def test_resume(self): @@ -110,7 +110,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): assert self.fetch("/flows/42", method="DELETE").code == 200 assert not self.view.get_by_id("42") - self.view.add(f) + self.view.add([f]) assert self.fetch("/flows/1234", method="DELETE").code == 404 @@ -162,7 +162,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase): f = self.view.get_by_id(resp.body.decode()) assert f assert f.id != "42" - self.view.remove(f) + self.view.remove([f]) def test_flow_revert(self): f = self.view.get_by_id("42") diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py index fd0c6e0c..22bd7c34 100644 --- a/test/mitmproxy/utils/test_typecheck.py +++ b/test/mitmproxy/utils/test_typecheck.py @@ -16,72 +16,86 @@ class T(TBase): super(T, self).__init__(42) -def test_check_type(): - typecheck.check_type("foo", 42, int) +def test_check_option_type(): + typecheck.check_option_type("foo", 42, int) with pytest.raises(TypeError): - typecheck.check_type("foo", 42, str) + typecheck.check_option_type("foo", 42, str) with pytest.raises(TypeError): - typecheck.check_type("foo", None, str) + typecheck.check_option_type("foo", None, str) with pytest.raises(TypeError): - typecheck.check_type("foo", b"foo", str) + typecheck.check_option_type("foo", b"foo", str) def test_check_union(): - typecheck.check_type("foo", 42, typing.Union[int, str]) - typecheck.check_type("foo", "42", typing.Union[int, str]) + typecheck.check_option_type("foo", 42, typing.Union[int, str]) + typecheck.check_option_type("foo", "42", typing.Union[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", [], typing.Union[int, str]) + typecheck.check_option_type("foo", [], typing.Union[int, str]) # Python 3.5 only defines __union_params__ m = mock.Mock() m.__str__ = lambda self: "typing.Union" m.__union_params__ = (int,) - typecheck.check_type("foo", 42, m) + typecheck.check_option_type("foo", 42, m) def test_check_tuple(): - typecheck.check_type("foo", (42, "42"), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (42, "42"), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", None, typing.Tuple[int, str]) + typecheck.check_option_type("foo", None, typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", (), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", (42, 42), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (42, 42), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", ("42", 42), typing.Tuple[int, str]) + typecheck.check_option_type("foo", ("42", 42), typing.Tuple[int, str]) # Python 3.5 only defines __tuple_params__ m = mock.Mock() m.__str__ = lambda self: "typing.Tuple" m.__tuple_params__ = (int, str) - typecheck.check_type("foo", (42, "42"), m) + typecheck.check_option_type("foo", (42, "42"), m) def test_check_sequence(): - typecheck.check_type("foo", [10], typing.Sequence[int]) + typecheck.check_option_type("foo", [10], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", ["foo"], typing.Sequence[int]) + typecheck.check_option_type("foo", ["foo"], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", [10, "foo"], typing.Sequence[int]) + typecheck.check_option_type("foo", [10, "foo"], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", [b"foo"], typing.Sequence[str]) + typecheck.check_option_type("foo", [b"foo"], typing.Sequence[str]) with pytest.raises(TypeError): - typecheck.check_type("foo", "foo", typing.Sequence[str]) + typecheck.check_option_type("foo", "foo", typing.Sequence[str]) # Python 3.5 only defines __parameters__ m = mock.Mock() m.__str__ = lambda self: "typing.Sequence" m.__parameters__ = (int,) - typecheck.check_type("foo", [10], m) + typecheck.check_option_type("foo", [10], m) def test_check_io(): - typecheck.check_type("foo", io.StringIO(), typing.IO[str]) + typecheck.check_option_type("foo", io.StringIO(), typing.IO[str]) with pytest.raises(TypeError): - typecheck.check_type("foo", "foo", typing.IO[str]) + typecheck.check_option_type("foo", "foo", typing.IO[str]) def test_check_any(): - typecheck.check_type("foo", 42, typing.Any) - typecheck.check_type("foo", object(), typing.Any) - typecheck.check_type("foo", None, typing.Any) + typecheck.check_option_type("foo", 42, typing.Any) + typecheck.check_option_type("foo", object(), typing.Any) + typecheck.check_option_type("foo", None, typing.Any) + + +def test_check_command_return_type(): + assert(typecheck.check_command_return_type("foo", str)) + assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str])) + assert(typecheck.check_command_return_type(None, None)) + assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int])) + assert(not typecheck.check_command_return_type("foo", typing.Sequence[int])) + + # Python 3.5 only defines __parameters__ + m = mock.Mock() + m.__str__ = lambda self: "typing.Sequence" + m.__parameters__ = (int,) + typecheck.check_command_return_type([10], m) |