diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/mitmproxy/addons/test_save.py | 83 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_streamfile.py | 62 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_view.py | 50 | ||||
-rw-r--r-- | test/mitmproxy/test_addonmanager.py | 9 | ||||
-rw-r--r-- | test/mitmproxy/test_command.py | 74 | ||||
-rw-r--r-- | test/mitmproxy/utils/test_typecheck.py | 66 |
6 files changed, 254 insertions, 90 deletions
diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py new file mode 100644 index 00000000..85c2a398 --- /dev/null +++ b/test/mitmproxy/addons/test_save.py @@ -0,0 +1,83 @@ +import pytest + +from mitmproxy.test import taddons +from mitmproxy.test import tflow + +from mitmproxy import io +from mitmproxy import exceptions +from mitmproxy import options +from mitmproxy.addons import save +from mitmproxy.addons import view + + +def test_configure(tmpdir): + sa = save.Save() + with taddons.context(options=options.Options()) as tctx: + with pytest.raises(exceptions.OptionsError): + tctx.configure(sa, save_stream_file=str(tmpdir)) + with pytest.raises(Exception, match="Invalid filter"): + tctx.configure( + sa, save_stream_file=str(tmpdir.join("foo")), save_stream_filter="~~" + ) + tctx.configure(sa, save_stream_filter="foo") + assert sa.filt + tctx.configure(sa, save_stream_filter=None) + assert not sa.filt + + +def rd(p): + x = io.FlowReader(open(p, "rb")) + return list(x.stream()) + + +def test_tcp(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + tctx.configure(sa, save_stream_file=p) + + tt = tflow.ttcpflow() + sa.tcp_start(tt) + sa.tcp_end(tt) + tctx.configure(sa, save_stream_file=None) + assert rd(p) + + +def test_save_command(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + sa.save([tflow.tflow(resp=True)], p) + assert len(rd(p)) == 1 + sa.save([tflow.tflow(resp=True)], p) + assert len(rd(p)) == 1 + sa.save([tflow.tflow(resp=True)], "+" + p) + assert len(rd(p)) == 2 + + with pytest.raises(exceptions.CommandError): + sa.save([tflow.tflow(resp=True)], str(tmpdir)) + + v = view.View() + tctx.master.addons.add(v) + tctx.master.addons.add(sa) + tctx.master.commands.call_args("save.file", ["@shown", p]) + + +def test_simple(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + + tctx.configure(sa, save_stream_file=p) + + f = tflow.tflow(resp=True) + sa.request(f) + sa.response(f) + tctx.configure(sa, save_stream_file=None) + assert rd(p)[0].response + + tctx.configure(sa, save_stream_file="+" + p) + f = tflow.tflow() + sa.request(f) + tctx.configure(sa, save_stream_file=None) + assert not rd(p)[1].response diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py deleted file mode 100644 index bcb27c79..00000000 --- a/test/mitmproxy/addons/test_streamfile.py +++ /dev/null @@ -1,62 +0,0 @@ -import pytest - -from mitmproxy.test import taddons -from mitmproxy.test import tflow - -from mitmproxy import io -from mitmproxy import exceptions -from mitmproxy import options -from mitmproxy.addons import streamfile - - -def test_configure(tmpdir): - sa = streamfile.StreamFile() - with taddons.context(options=options.Options()) as tctx: - with pytest.raises(exceptions.OptionsError): - tctx.configure(sa, streamfile=str(tmpdir)) - with pytest.raises(Exception, match="Invalid filter"): - tctx.configure( - sa, streamfile=str(tmpdir.join("foo")), streamfile_filter="~~" - ) - tctx.configure(sa, streamfile_filter="foo") - assert sa.filt - tctx.configure(sa, streamfile_filter=None) - assert not sa.filt - - -def rd(p): - x = io.FlowReader(open(p, "rb")) - return list(x.stream()) - - -def test_tcp(tmpdir): - sa = streamfile.StreamFile() - with taddons.context() as tctx: - p = str(tmpdir.join("foo")) - tctx.configure(sa, streamfile=p) - - tt = tflow.ttcpflow() - sa.tcp_start(tt) - sa.tcp_end(tt) - tctx.configure(sa, streamfile=None) - assert rd(p) - - -def test_simple(tmpdir): - sa = streamfile.StreamFile() - with taddons.context() as tctx: - p = str(tmpdir.join("foo")) - - tctx.configure(sa, streamfile=p) - - f = tflow.tflow(resp=True) - sa.request(f) - sa.response(f) - tctx.configure(sa, streamfile=None) - assert rd(p)[0].response - - tctx.configure(sa, streamfile="+" + p) - f = tflow.tflow() - sa.request(f) - tctx.configure(sa, streamfile=None) - assert not rd(p)[1].response diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py index 7fa3819e..05d4af30 100644 --- a/test/mitmproxy/addons/test_view.py +++ b/test/mitmproxy/addons/test_view.py @@ -5,6 +5,7 @@ from mitmproxy.test import tflow from mitmproxy.addons import view from mitmproxy import flowfilter from mitmproxy import options +from mitmproxy import exceptions from mitmproxy.test import taddons @@ -130,6 +131,55 @@ def test_filter(): assert len(v) == 4 +def test_load(): + v = view.View() + with taddons.context(options=options.Options()) as tctx: + tctx.master.addons.add(v) + + +def test_resolve(): + v = view.View() + with taddons.context(options=options.Options()) as tctx: + assert tctx.command(v.resolve, "@focus") == [] + assert tctx.command(v.resolve, "@shown") == [] + assert tctx.command(v.resolve, "@hidden") == [] + assert tctx.command(v.resolve, "@marked") == [] + assert tctx.command(v.resolve, "@unmarked") == [] + assert tctx.command(v.resolve, "~m get") == [] + v.request(tft(method="get")) + assert len(tctx.command(v.resolve, "~m get")) == 1 + assert len(tctx.command(v.resolve, "@focus")) == 1 + assert len(tctx.command(v.resolve, "@shown")) == 1 + assert len(tctx.command(v.resolve, "@unmarked")) == 1 + assert tctx.command(v.resolve, "@hidden") == [] + assert tctx.command(v.resolve, "@marked") == [] + v.request(tft(method="put")) + assert len(tctx.command(v.resolve, "@focus")) == 1 + assert len(tctx.command(v.resolve, "@shown")) == 2 + assert tctx.command(v.resolve, "@hidden") == [] + assert tctx.command(v.resolve, "@marked") == [] + + v.request(tft(method="get")) + v.request(tft(method="put")) + + f = flowfilter.parse("~m get") + v.set_filter(f) + v[0].marked = True + + def m(l): + return [i.request.method for i in l] + + assert m(tctx.command(v.resolve, "~m get")) == ["GET", "GET"] + assert m(tctx.command(v.resolve, "~m put")) == ["PUT", "PUT"] + assert m(tctx.command(v.resolve, "@shown")) == ["GET", "GET"] + assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"] + assert m(tctx.command(v.resolve, "@marked")) == ["GET"] + assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"] + + with pytest.raises(exceptions.CommandError, match="Invalid flow filter"): + tctx.command(v.resolve, "~") + + def test_order(): v = view.View() with taddons.context(options=options.Options()) as tctx: diff --git a/test/mitmproxy/test_addonmanager.py b/test/mitmproxy/test_addonmanager.py index 7b461580..034182a6 100644 --- a/test/mitmproxy/test_addonmanager.py +++ b/test/mitmproxy/test_addonmanager.py @@ -61,9 +61,9 @@ def test_lifecycle(): a = addonmanager.AddonManager(m) a.add(TAddon("one")) - with pytest.raises(exceptions.AddonError): + with pytest.raises(exceptions.AddonManagerError): a.add(TAddon("one")) - with pytest.raises(exceptions.AddonError): + with pytest.raises(exceptions.AddonManagerError): a.remove(TAddon("nonexistent")) f = tflow.tflow() @@ -82,6 +82,11 @@ def test_loader(): l.add_option("custom_option", bool, False, "help") l.add_option("custom_option", bool, False, "help") + def cmd(a: str) -> str: + return "foo" + + l.add_command("test.command", cmd) + def test_simple(): with taddons.context() as tctx: diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py new file mode 100644 index 00000000..92d8c77b --- /dev/null +++ b/test/mitmproxy/test_command.py @@ -0,0 +1,74 @@ +import typing +from mitmproxy import command +from mitmproxy import flow +from mitmproxy import master +from mitmproxy import options +from mitmproxy import proxy +from mitmproxy import exceptions +from mitmproxy.test import tflow +from mitmproxy.test import taddons +import pytest + + +class TAddon: + def cmd1(self, foo: str) -> str: + return "ret " + foo + + def cmd2(self, foo: str) -> str: + return 99 + + +class TestCommand: + def test_call(self): + o = options.Options() + m = master.Master(o, proxy.DummyServer(o)) + cm = command.CommandManager(m) + + a = TAddon() + c = command.Command(cm, "cmd.path", a.cmd1) + assert c.call(["foo"]) == "ret foo" + assert c.signature_help() == "cmd.path str -> str" + + c = command.Command(cm, "cmd.two", a.cmd2) + with pytest.raises(exceptions.CommandError): + c.call(["foo"]) + + +def test_simple(): + o = options.Options() + m = master.Master(o, proxy.DummyServer(o)) + c = command.CommandManager(m) + a = TAddon() + c.add("one.two", a.cmd1) + assert(c.call("one.two foo") == "ret foo") + with pytest.raises(exceptions.CommandError, match="Unknown"): + c.call("nonexistent") + with pytest.raises(exceptions.CommandError, match="Invalid"): + c.call("") + with pytest.raises(exceptions.CommandError, match="Usage"): + c.call("one.two too many args") + + +def test_typename(): + assert command.typename(str, True) == "str" + assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]" + assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec" + + +class DummyConsole: + def load(self, l): + l.add_command("console.resolve", self.resolve) + + def resolve(self, spec: str) -> typing.Sequence[flow.Flow]: + return [tflow.tflow(resp=True)] + + +def test_parsearg(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + assert command.parsearg(tctx.master.commands, "foo", str) == "foo" + assert len(command.parsearg( + tctx.master.commands, "~b", typing.Sequence[flow.Flow] + )) == 1 + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "foo", Exception) diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py index fd0c6e0c..22bd7c34 100644 --- a/test/mitmproxy/utils/test_typecheck.py +++ b/test/mitmproxy/utils/test_typecheck.py @@ -16,72 +16,86 @@ class T(TBase): super(T, self).__init__(42) -def test_check_type(): - typecheck.check_type("foo", 42, int) +def test_check_option_type(): + typecheck.check_option_type("foo", 42, int) with pytest.raises(TypeError): - typecheck.check_type("foo", 42, str) + typecheck.check_option_type("foo", 42, str) with pytest.raises(TypeError): - typecheck.check_type("foo", None, str) + typecheck.check_option_type("foo", None, str) with pytest.raises(TypeError): - typecheck.check_type("foo", b"foo", str) + typecheck.check_option_type("foo", b"foo", str) def test_check_union(): - typecheck.check_type("foo", 42, typing.Union[int, str]) - typecheck.check_type("foo", "42", typing.Union[int, str]) + typecheck.check_option_type("foo", 42, typing.Union[int, str]) + typecheck.check_option_type("foo", "42", typing.Union[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", [], typing.Union[int, str]) + typecheck.check_option_type("foo", [], typing.Union[int, str]) # Python 3.5 only defines __union_params__ m = mock.Mock() m.__str__ = lambda self: "typing.Union" m.__union_params__ = (int,) - typecheck.check_type("foo", 42, m) + typecheck.check_option_type("foo", 42, m) def test_check_tuple(): - typecheck.check_type("foo", (42, "42"), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (42, "42"), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", None, typing.Tuple[int, str]) + typecheck.check_option_type("foo", None, typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", (), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", (42, 42), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (42, 42), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", ("42", 42), typing.Tuple[int, str]) + typecheck.check_option_type("foo", ("42", 42), typing.Tuple[int, str]) # Python 3.5 only defines __tuple_params__ m = mock.Mock() m.__str__ = lambda self: "typing.Tuple" m.__tuple_params__ = (int, str) - typecheck.check_type("foo", (42, "42"), m) + typecheck.check_option_type("foo", (42, "42"), m) def test_check_sequence(): - typecheck.check_type("foo", [10], typing.Sequence[int]) + typecheck.check_option_type("foo", [10], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", ["foo"], typing.Sequence[int]) + typecheck.check_option_type("foo", ["foo"], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", [10, "foo"], typing.Sequence[int]) + typecheck.check_option_type("foo", [10, "foo"], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", [b"foo"], typing.Sequence[str]) + typecheck.check_option_type("foo", [b"foo"], typing.Sequence[str]) with pytest.raises(TypeError): - typecheck.check_type("foo", "foo", typing.Sequence[str]) + typecheck.check_option_type("foo", "foo", typing.Sequence[str]) # Python 3.5 only defines __parameters__ m = mock.Mock() m.__str__ = lambda self: "typing.Sequence" m.__parameters__ = (int,) - typecheck.check_type("foo", [10], m) + typecheck.check_option_type("foo", [10], m) def test_check_io(): - typecheck.check_type("foo", io.StringIO(), typing.IO[str]) + typecheck.check_option_type("foo", io.StringIO(), typing.IO[str]) with pytest.raises(TypeError): - typecheck.check_type("foo", "foo", typing.IO[str]) + typecheck.check_option_type("foo", "foo", typing.IO[str]) def test_check_any(): - typecheck.check_type("foo", 42, typing.Any) - typecheck.check_type("foo", object(), typing.Any) - typecheck.check_type("foo", None, typing.Any) + typecheck.check_option_type("foo", 42, typing.Any) + typecheck.check_option_type("foo", object(), typing.Any) + typecheck.check_option_type("foo", None, typing.Any) + + +def test_check_command_return_type(): + assert(typecheck.check_command_return_type("foo", str)) + assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str])) + assert(typecheck.check_command_return_type(None, None)) + assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int])) + assert(not typecheck.check_command_return_type("foo", typing.Sequence[int])) + + # Python 3.5 only defines __parameters__ + m = mock.Mock() + m.__str__ = lambda self: "typing.Sequence" + m.__parameters__ = (int,) + typecheck.check_command_return_type([10], m) |