aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/mitmproxy/addons/test_save.py83
-rw-r--r--test/mitmproxy/addons/test_streamfile.py62
-rw-r--r--test/mitmproxy/addons/test_view.py50
-rw-r--r--test/mitmproxy/test_addonmanager.py9
-rw-r--r--test/mitmproxy/test_command.py74
-rw-r--r--test/mitmproxy/utils/test_typecheck.py66
6 files changed, 254 insertions, 90 deletions
diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py
new file mode 100644
index 00000000..85c2a398
--- /dev/null
+++ b/test/mitmproxy/addons/test_save.py
@@ -0,0 +1,83 @@
+import pytest
+
+from mitmproxy.test import taddons
+from mitmproxy.test import tflow
+
+from mitmproxy import io
+from mitmproxy import exceptions
+from mitmproxy import options
+from mitmproxy.addons import save
+from mitmproxy.addons import view
+
+
+def test_configure(tmpdir):
+ sa = save.Save()
+ with taddons.context(options=options.Options()) as tctx:
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(sa, save_stream_file=str(tmpdir))
+ with pytest.raises(Exception, match="Invalid filter"):
+ tctx.configure(
+ sa, save_stream_file=str(tmpdir.join("foo")), save_stream_filter="~~"
+ )
+ tctx.configure(sa, save_stream_filter="foo")
+ assert sa.filt
+ tctx.configure(sa, save_stream_filter=None)
+ assert not sa.filt
+
+
+def rd(p):
+ x = io.FlowReader(open(p, "rb"))
+ return list(x.stream())
+
+
+def test_tcp(tmpdir):
+ sa = save.Save()
+ with taddons.context() as tctx:
+ p = str(tmpdir.join("foo"))
+ tctx.configure(sa, save_stream_file=p)
+
+ tt = tflow.ttcpflow()
+ sa.tcp_start(tt)
+ sa.tcp_end(tt)
+ tctx.configure(sa, save_stream_file=None)
+ assert rd(p)
+
+
+def test_save_command(tmpdir):
+ sa = save.Save()
+ with taddons.context() as tctx:
+ p = str(tmpdir.join("foo"))
+ sa.save([tflow.tflow(resp=True)], p)
+ assert len(rd(p)) == 1
+ sa.save([tflow.tflow(resp=True)], p)
+ assert len(rd(p)) == 1
+ sa.save([tflow.tflow(resp=True)], "+" + p)
+ assert len(rd(p)) == 2
+
+ with pytest.raises(exceptions.CommandError):
+ sa.save([tflow.tflow(resp=True)], str(tmpdir))
+
+ v = view.View()
+ tctx.master.addons.add(v)
+ tctx.master.addons.add(sa)
+ tctx.master.commands.call_args("save.file", ["@shown", p])
+
+
+def test_simple(tmpdir):
+ sa = save.Save()
+ with taddons.context() as tctx:
+ p = str(tmpdir.join("foo"))
+
+ tctx.configure(sa, save_stream_file=p)
+
+ f = tflow.tflow(resp=True)
+ sa.request(f)
+ sa.response(f)
+ tctx.configure(sa, save_stream_file=None)
+ assert rd(p)[0].response
+
+ tctx.configure(sa, save_stream_file="+" + p)
+ f = tflow.tflow()
+ sa.request(f)
+ tctx.configure(sa, save_stream_file=None)
+ assert not rd(p)[1].response
diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py
deleted file mode 100644
index bcb27c79..00000000
--- a/test/mitmproxy/addons/test_streamfile.py
+++ /dev/null
@@ -1,62 +0,0 @@
-import pytest
-
-from mitmproxy.test import taddons
-from mitmproxy.test import tflow
-
-from mitmproxy import io
-from mitmproxy import exceptions
-from mitmproxy import options
-from mitmproxy.addons import streamfile
-
-
-def test_configure(tmpdir):
- sa = streamfile.StreamFile()
- with taddons.context(options=options.Options()) as tctx:
- with pytest.raises(exceptions.OptionsError):
- tctx.configure(sa, streamfile=str(tmpdir))
- with pytest.raises(Exception, match="Invalid filter"):
- tctx.configure(
- sa, streamfile=str(tmpdir.join("foo")), streamfile_filter="~~"
- )
- tctx.configure(sa, streamfile_filter="foo")
- assert sa.filt
- tctx.configure(sa, streamfile_filter=None)
- assert not sa.filt
-
-
-def rd(p):
- x = io.FlowReader(open(p, "rb"))
- return list(x.stream())
-
-
-def test_tcp(tmpdir):
- sa = streamfile.StreamFile()
- with taddons.context() as tctx:
- p = str(tmpdir.join("foo"))
- tctx.configure(sa, streamfile=p)
-
- tt = tflow.ttcpflow()
- sa.tcp_start(tt)
- sa.tcp_end(tt)
- tctx.configure(sa, streamfile=None)
- assert rd(p)
-
-
-def test_simple(tmpdir):
- sa = streamfile.StreamFile()
- with taddons.context() as tctx:
- p = str(tmpdir.join("foo"))
-
- tctx.configure(sa, streamfile=p)
-
- f = tflow.tflow(resp=True)
- sa.request(f)
- sa.response(f)
- tctx.configure(sa, streamfile=None)
- assert rd(p)[0].response
-
- tctx.configure(sa, streamfile="+" + p)
- f = tflow.tflow()
- sa.request(f)
- tctx.configure(sa, streamfile=None)
- assert not rd(p)[1].response
diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py
index 7fa3819e..05d4af30 100644
--- a/test/mitmproxy/addons/test_view.py
+++ b/test/mitmproxy/addons/test_view.py
@@ -5,6 +5,7 @@ from mitmproxy.test import tflow
from mitmproxy.addons import view
from mitmproxy import flowfilter
from mitmproxy import options
+from mitmproxy import exceptions
from mitmproxy.test import taddons
@@ -130,6 +131,55 @@ def test_filter():
assert len(v) == 4
+def test_load():
+ v = view.View()
+ with taddons.context(options=options.Options()) as tctx:
+ tctx.master.addons.add(v)
+
+
+def test_resolve():
+ v = view.View()
+ with taddons.context(options=options.Options()) as tctx:
+ assert tctx.command(v.resolve, "@focus") == []
+ assert tctx.command(v.resolve, "@shown") == []
+ assert tctx.command(v.resolve, "@hidden") == []
+ assert tctx.command(v.resolve, "@marked") == []
+ assert tctx.command(v.resolve, "@unmarked") == []
+ assert tctx.command(v.resolve, "~m get") == []
+ v.request(tft(method="get"))
+ assert len(tctx.command(v.resolve, "~m get")) == 1
+ assert len(tctx.command(v.resolve, "@focus")) == 1
+ assert len(tctx.command(v.resolve, "@shown")) == 1
+ assert len(tctx.command(v.resolve, "@unmarked")) == 1
+ assert tctx.command(v.resolve, "@hidden") == []
+ assert tctx.command(v.resolve, "@marked") == []
+ v.request(tft(method="put"))
+ assert len(tctx.command(v.resolve, "@focus")) == 1
+ assert len(tctx.command(v.resolve, "@shown")) == 2
+ assert tctx.command(v.resolve, "@hidden") == []
+ assert tctx.command(v.resolve, "@marked") == []
+
+ v.request(tft(method="get"))
+ v.request(tft(method="put"))
+
+ f = flowfilter.parse("~m get")
+ v.set_filter(f)
+ v[0].marked = True
+
+ def m(l):
+ return [i.request.method for i in l]
+
+ assert m(tctx.command(v.resolve, "~m get")) == ["GET", "GET"]
+ assert m(tctx.command(v.resolve, "~m put")) == ["PUT", "PUT"]
+ assert m(tctx.command(v.resolve, "@shown")) == ["GET", "GET"]
+ assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"]
+ assert m(tctx.command(v.resolve, "@marked")) == ["GET"]
+ assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"]
+
+ with pytest.raises(exceptions.CommandError, match="Invalid flow filter"):
+ tctx.command(v.resolve, "~")
+
+
def test_order():
v = view.View()
with taddons.context(options=options.Options()) as tctx:
diff --git a/test/mitmproxy/test_addonmanager.py b/test/mitmproxy/test_addonmanager.py
index 7b461580..034182a6 100644
--- a/test/mitmproxy/test_addonmanager.py
+++ b/test/mitmproxy/test_addonmanager.py
@@ -61,9 +61,9 @@ def test_lifecycle():
a = addonmanager.AddonManager(m)
a.add(TAddon("one"))
- with pytest.raises(exceptions.AddonError):
+ with pytest.raises(exceptions.AddonManagerError):
a.add(TAddon("one"))
- with pytest.raises(exceptions.AddonError):
+ with pytest.raises(exceptions.AddonManagerError):
a.remove(TAddon("nonexistent"))
f = tflow.tflow()
@@ -82,6 +82,11 @@ def test_loader():
l.add_option("custom_option", bool, False, "help")
l.add_option("custom_option", bool, False, "help")
+ def cmd(a: str) -> str:
+ return "foo"
+
+ l.add_command("test.command", cmd)
+
def test_simple():
with taddons.context() as tctx:
diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py
new file mode 100644
index 00000000..92d8c77b
--- /dev/null
+++ b/test/mitmproxy/test_command.py
@@ -0,0 +1,74 @@
+import typing
+from mitmproxy import command
+from mitmproxy import flow
+from mitmproxy import master
+from mitmproxy import options
+from mitmproxy import proxy
+from mitmproxy import exceptions
+from mitmproxy.test import tflow
+from mitmproxy.test import taddons
+import pytest
+
+
+class TAddon:
+ def cmd1(self, foo: str) -> str:
+ return "ret " + foo
+
+ def cmd2(self, foo: str) -> str:
+ return 99
+
+
+class TestCommand:
+ def test_call(self):
+ o = options.Options()
+ m = master.Master(o, proxy.DummyServer(o))
+ cm = command.CommandManager(m)
+
+ a = TAddon()
+ c = command.Command(cm, "cmd.path", a.cmd1)
+ assert c.call(["foo"]) == "ret foo"
+ assert c.signature_help() == "cmd.path str -> str"
+
+ c = command.Command(cm, "cmd.two", a.cmd2)
+ with pytest.raises(exceptions.CommandError):
+ c.call(["foo"])
+
+
+def test_simple():
+ o = options.Options()
+ m = master.Master(o, proxy.DummyServer(o))
+ c = command.CommandManager(m)
+ a = TAddon()
+ c.add("one.two", a.cmd1)
+ assert(c.call("one.two foo") == "ret foo")
+ with pytest.raises(exceptions.CommandError, match="Unknown"):
+ c.call("nonexistent")
+ with pytest.raises(exceptions.CommandError, match="Invalid"):
+ c.call("")
+ with pytest.raises(exceptions.CommandError, match="Usage"):
+ c.call("one.two too many args")
+
+
+def test_typename():
+ assert command.typename(str, True) == "str"
+ assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]"
+ assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec"
+
+
+class DummyConsole:
+ def load(self, l):
+ l.add_command("console.resolve", self.resolve)
+
+ def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
+ return [tflow.tflow(resp=True)]
+
+
+def test_parsearg():
+ with taddons.context() as tctx:
+ tctx.master.addons.add(DummyConsole())
+ assert command.parsearg(tctx.master.commands, "foo", str) == "foo"
+ assert len(command.parsearg(
+ tctx.master.commands, "~b", typing.Sequence[flow.Flow]
+ )) == 1
+ with pytest.raises(exceptions.CommandError):
+ command.parsearg(tctx.master.commands, "foo", Exception)
diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py
index fd0c6e0c..22bd7c34 100644
--- a/test/mitmproxy/utils/test_typecheck.py
+++ b/test/mitmproxy/utils/test_typecheck.py
@@ -16,72 +16,86 @@ class T(TBase):
super(T, self).__init__(42)
-def test_check_type():
- typecheck.check_type("foo", 42, int)
+def test_check_option_type():
+ typecheck.check_option_type("foo", 42, int)
with pytest.raises(TypeError):
- typecheck.check_type("foo", 42, str)
+ typecheck.check_option_type("foo", 42, str)
with pytest.raises(TypeError):
- typecheck.check_type("foo", None, str)
+ typecheck.check_option_type("foo", None, str)
with pytest.raises(TypeError):
- typecheck.check_type("foo", b"foo", str)
+ typecheck.check_option_type("foo", b"foo", str)
def test_check_union():
- typecheck.check_type("foo", 42, typing.Union[int, str])
- typecheck.check_type("foo", "42", typing.Union[int, str])
+ typecheck.check_option_type("foo", 42, typing.Union[int, str])
+ typecheck.check_option_type("foo", "42", typing.Union[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", [], typing.Union[int, str])
+ typecheck.check_option_type("foo", [], typing.Union[int, str])
# Python 3.5 only defines __union_params__
m = mock.Mock()
m.__str__ = lambda self: "typing.Union"
m.__union_params__ = (int,)
- typecheck.check_type("foo", 42, m)
+ typecheck.check_option_type("foo", 42, m)
def test_check_tuple():
- typecheck.check_type("foo", (42, "42"), typing.Tuple[int, str])
+ typecheck.check_option_type("foo", (42, "42"), typing.Tuple[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", None, typing.Tuple[int, str])
+ typecheck.check_option_type("foo", None, typing.Tuple[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", (), typing.Tuple[int, str])
+ typecheck.check_option_type("foo", (), typing.Tuple[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", (42, 42), typing.Tuple[int, str])
+ typecheck.check_option_type("foo", (42, 42), typing.Tuple[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", ("42", 42), typing.Tuple[int, str])
+ typecheck.check_option_type("foo", ("42", 42), typing.Tuple[int, str])
# Python 3.5 only defines __tuple_params__
m = mock.Mock()
m.__str__ = lambda self: "typing.Tuple"
m.__tuple_params__ = (int, str)
- typecheck.check_type("foo", (42, "42"), m)
+ typecheck.check_option_type("foo", (42, "42"), m)
def test_check_sequence():
- typecheck.check_type("foo", [10], typing.Sequence[int])
+ typecheck.check_option_type("foo", [10], typing.Sequence[int])
with pytest.raises(TypeError):
- typecheck.check_type("foo", ["foo"], typing.Sequence[int])
+ typecheck.check_option_type("foo", ["foo"], typing.Sequence[int])
with pytest.raises(TypeError):
- typecheck.check_type("foo", [10, "foo"], typing.Sequence[int])
+ typecheck.check_option_type("foo", [10, "foo"], typing.Sequence[int])
with pytest.raises(TypeError):
- typecheck.check_type("foo", [b"foo"], typing.Sequence[str])
+ typecheck.check_option_type("foo", [b"foo"], typing.Sequence[str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", "foo", typing.Sequence[str])
+ typecheck.check_option_type("foo", "foo", typing.Sequence[str])
# Python 3.5 only defines __parameters__
m = mock.Mock()
m.__str__ = lambda self: "typing.Sequence"
m.__parameters__ = (int,)
- typecheck.check_type("foo", [10], m)
+ typecheck.check_option_type("foo", [10], m)
def test_check_io():
- typecheck.check_type("foo", io.StringIO(), typing.IO[str])
+ typecheck.check_option_type("foo", io.StringIO(), typing.IO[str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", "foo", typing.IO[str])
+ typecheck.check_option_type("foo", "foo", typing.IO[str])
def test_check_any():
- typecheck.check_type("foo", 42, typing.Any)
- typecheck.check_type("foo", object(), typing.Any)
- typecheck.check_type("foo", None, typing.Any)
+ typecheck.check_option_type("foo", 42, typing.Any)
+ typecheck.check_option_type("foo", object(), typing.Any)
+ typecheck.check_option_type("foo", None, typing.Any)
+
+
+def test_check_command_return_type():
+ assert(typecheck.check_command_return_type("foo", str))
+ assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str]))
+ assert(typecheck.check_command_return_type(None, None))
+ assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int]))
+ assert(not typecheck.check_command_return_type("foo", typing.Sequence[int]))
+
+ # Python 3.5 only defines __parameters__
+ m = mock.Mock()
+ m.__str__ = lambda self: "typing.Sequence"
+ m.__parameters__ = (int,)
+ typecheck.check_command_return_type([10], m)