aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/mitmproxy/addons/test_clientplayback.py19
-rw-r--r--test/mitmproxy/addons/test_core.py63
-rw-r--r--test/mitmproxy/addons/test_save.py83
-rw-r--r--test/mitmproxy/addons/test_script.py70
-rw-r--r--test/mitmproxy/addons/test_serverplayback.py12
-rw-r--r--test/mitmproxy/addons/test_streamfile.py62
-rw-r--r--test/mitmproxy/addons/test_view.py159
-rw-r--r--test/mitmproxy/test_addonmanager.py20
-rw-r--r--test/mitmproxy/test_command.py128
-rw-r--r--test/mitmproxy/test_optmanager.py5
-rw-r--r--test/mitmproxy/tools/web/test_app.py10
-rw-r--r--test/mitmproxy/utils/test_typecheck.py66
12 files changed, 538 insertions, 159 deletions
diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py
index f71662f0..7ffda317 100644
--- a/test/mitmproxy/addons/test_clientplayback.py
+++ b/test/mitmproxy/addons/test_clientplayback.py
@@ -26,7 +26,7 @@ class TestClientPlayback:
with taddons.context() as tctx:
assert cp.count() == 0
f = tflow.tflow(resp=True)
- cp.load([f])
+ cp.start_replay([f])
assert cp.count() == 1
RP = "mitmproxy.proxy.protocol.http_replay.RequestReplayThread"
with mock.patch(RP) as rp:
@@ -44,13 +44,30 @@ class TestClientPlayback:
cp.tick()
assert cp.current_thread is None
+ cp.start_replay([f])
+ cp.stop_replay()
+ assert not cp.flows
+
+ def test_load_file(self, tmpdir):
+ cp = clientplayback.ClientPlayback()
+ with taddons.context():
+ fpath = str(tmpdir.join("flows"))
+ tdump(fpath, [tflow.tflow(resp=True)])
+ cp.load_file(fpath)
+ assert cp.flows
+ with pytest.raises(exceptions.CommandError):
+ cp.load_file("/nonexistent")
+
def test_configure(self, tmpdir):
cp = clientplayback.ClientPlayback()
with taddons.context() as tctx:
path = str(tmpdir.join("flows"))
tdump(path, [tflow.tflow()])
tctx.configure(cp, client_replay=[path])
+ cp.configured = False
tctx.configure(cp, client_replay=[])
+ cp.configured = False
tctx.configure(cp)
+ cp.configured = False
with pytest.raises(exceptions.OptionsError):
tctx.configure(cp, client_replay=["nonexistent"])
diff --git a/test/mitmproxy/addons/test_core.py b/test/mitmproxy/addons/test_core.py
new file mode 100644
index 00000000..64d0fa19
--- /dev/null
+++ b/test/mitmproxy/addons/test_core.py
@@ -0,0 +1,63 @@
+from mitmproxy.addons import core
+from mitmproxy.test import taddons
+from mitmproxy.test import tflow
+from mitmproxy import exceptions
+import pytest
+
+
+def test_set():
+ sa = core.Core()
+ with taddons.context() as tctx:
+ tctx.master.addons.add(sa)
+
+ assert not tctx.master.options.anticomp
+ tctx.command(sa.set, "anticomp")
+ assert tctx.master.options.anticomp
+
+ with pytest.raises(exceptions.CommandError):
+ tctx.command(sa.set, "nonexistent")
+
+
+def test_resume():
+ sa = core.Core()
+ with taddons.context():
+ f = tflow.tflow()
+ assert not sa.resume([f])
+ f.intercept()
+ sa.resume([f])
+ assert not f.reply.state == "taken"
+
+
+def test_mark():
+ sa = core.Core()
+ with taddons.context():
+ f = tflow.tflow()
+ assert not f.marked
+ sa.mark([f], True)
+ assert f.marked
+
+ sa.mark_toggle([f])
+ assert not f.marked
+ sa.mark_toggle([f])
+ assert f.marked
+
+
+def test_kill():
+ sa = core.Core()
+ with taddons.context():
+ f = tflow.tflow()
+ f.intercept()
+ assert f.killable
+ sa.kill([f])
+ assert not f.killable
+
+
+def test_revert():
+ sa = core.Core()
+ with taddons.context():
+ f = tflow.tflow()
+ f.backup()
+ f.request.content = b"bar"
+ assert f.modified()
+ sa.revert([f])
+ assert not f.modified()
diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py
new file mode 100644
index 00000000..85c2a398
--- /dev/null
+++ b/test/mitmproxy/addons/test_save.py
@@ -0,0 +1,83 @@
+import pytest
+
+from mitmproxy.test import taddons
+from mitmproxy.test import tflow
+
+from mitmproxy import io
+from mitmproxy import exceptions
+from mitmproxy import options
+from mitmproxy.addons import save
+from mitmproxy.addons import view
+
+
+def test_configure(tmpdir):
+ sa = save.Save()
+ with taddons.context(options=options.Options()) as tctx:
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(sa, save_stream_file=str(tmpdir))
+ with pytest.raises(Exception, match="Invalid filter"):
+ tctx.configure(
+ sa, save_stream_file=str(tmpdir.join("foo")), save_stream_filter="~~"
+ )
+ tctx.configure(sa, save_stream_filter="foo")
+ assert sa.filt
+ tctx.configure(sa, save_stream_filter=None)
+ assert not sa.filt
+
+
+def rd(p):
+ x = io.FlowReader(open(p, "rb"))
+ return list(x.stream())
+
+
+def test_tcp(tmpdir):
+ sa = save.Save()
+ with taddons.context() as tctx:
+ p = str(tmpdir.join("foo"))
+ tctx.configure(sa, save_stream_file=p)
+
+ tt = tflow.ttcpflow()
+ sa.tcp_start(tt)
+ sa.tcp_end(tt)
+ tctx.configure(sa, save_stream_file=None)
+ assert rd(p)
+
+
+def test_save_command(tmpdir):
+ sa = save.Save()
+ with taddons.context() as tctx:
+ p = str(tmpdir.join("foo"))
+ sa.save([tflow.tflow(resp=True)], p)
+ assert len(rd(p)) == 1
+ sa.save([tflow.tflow(resp=True)], p)
+ assert len(rd(p)) == 1
+ sa.save([tflow.tflow(resp=True)], "+" + p)
+ assert len(rd(p)) == 2
+
+ with pytest.raises(exceptions.CommandError):
+ sa.save([tflow.tflow(resp=True)], str(tmpdir))
+
+ v = view.View()
+ tctx.master.addons.add(v)
+ tctx.master.addons.add(sa)
+ tctx.master.commands.call_args("save.file", ["@shown", p])
+
+
+def test_simple(tmpdir):
+ sa = save.Save()
+ with taddons.context() as tctx:
+ p = str(tmpdir.join("foo"))
+
+ tctx.configure(sa, save_stream_file=p)
+
+ f = tflow.tflow(resp=True)
+ sa.request(f)
+ sa.response(f)
+ tctx.configure(sa, save_stream_file=None)
+ assert rd(p)[0].response
+
+ tctx.configure(sa, save_stream_file="+" + p)
+ f = tflow.tflow()
+ sa.request(f)
+ tctx.configure(sa, save_stream_file=None)
+ assert not rd(p)[1].response
diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py
index 859d99f9..a3df1fcf 100644
--- a/test/mitmproxy/addons/test_script.py
+++ b/test/mitmproxy/addons/test_script.py
@@ -9,9 +9,6 @@ from mitmproxy.test import tutils
from mitmproxy.test import taddons
from mitmproxy import addonmanager
from mitmproxy import exceptions
-from mitmproxy import options
-from mitmproxy import proxy
-from mitmproxy import master
from mitmproxy.addons import script
@@ -48,9 +45,9 @@ def test_script_print_stdout():
class TestScript:
def test_notfound(self):
- with taddons.context() as tctx:
- sc = script.Script("nonexistent")
- tctx.master.addons.add(sc)
+ with taddons.context():
+ with pytest.raises(exceptions.OptionsError):
+ script.Script("nonexistent")
def test_simple(self):
with taddons.context() as tctx:
@@ -136,25 +133,45 @@ class TestCutTraceback:
class TestScriptLoader:
- def test_simple(self):
- o = options.Options(scripts=[])
- m = master.Master(o, proxy.DummyServer())
+ def test_script_run(self):
+ rp = tutils.test_data.path(
+ "mitmproxy/data/addonscripts/recorder/recorder.py"
+ )
sc = script.ScriptLoader()
- sc.running()
- m.addons.add(sc)
- assert len(m.addons) == 1
- o.update(
- scripts = [
- tutils.test_data.path(
- "mitmproxy/data/addonscripts/recorder/recorder.py"
- )
+ with taddons.context() as tctx:
+ sc.script_run([tflow.tflow(resp=True)], rp)
+ debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
+ assert debug == [
+ 'recorder load', 'recorder running', 'recorder configure',
+ 'recorder tick',
+ 'recorder requestheaders', 'recorder request',
+ 'recorder responseheaders', 'recorder response'
]
- )
- assert len(m.addons) == 1
- assert len(sc.addons) == 1
- o.update(scripts = [])
- assert len(m.addons) == 1
- assert len(sc.addons) == 0
+
+ def test_script_run_nonexistent(self):
+ sc = script.ScriptLoader()
+ with taddons.context():
+ with pytest.raises(exceptions.CommandError):
+ sc.script_run([tflow.tflow(resp=True)], "/nonexistent")
+
+ def test_simple(self):
+ sc = script.ScriptLoader()
+ with taddons.context() as tctx:
+ tctx.master.addons.add(sc)
+ sc.running()
+ assert len(tctx.master.addons) == 1
+ tctx.master.options.update(
+ scripts = [
+ tutils.test_data.path(
+ "mitmproxy/data/addonscripts/recorder/recorder.py"
+ )
+ ]
+ )
+ assert len(tctx.master.addons) == 1
+ assert len(sc.addons) == 1
+ tctx.master.options.update(scripts = [])
+ assert len(tctx.master.addons) == 1
+ assert len(sc.addons) == 0
def test_dupes(self):
sc = script.ScriptLoader()
@@ -166,13 +183,6 @@ class TestScriptLoader:
scripts = ["one", "one"]
)
- def test_nonexistent(self):
- sc = script.ScriptLoader()
- with taddons.context() as tctx:
- tctx.master.addons.add(sc)
- tctx.configure(sc, scripts = ["nonexistent"])
- tctx.master.has_log("nonexistent: file not found")
-
def test_order(self):
rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder")
sc = script.ScriptLoader()
diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py
index 29de48a0..3ceab3fa 100644
--- a/test/mitmproxy/addons/test_serverplayback.py
+++ b/test/mitmproxy/addons/test_serverplayback.py
@@ -16,12 +16,24 @@ def tdump(path, flows):
w.add(i)
+def test_load_file(tmpdir):
+ s = serverplayback.ServerPlayback()
+ with taddons.context():
+ fpath = str(tmpdir.join("flows"))
+ tdump(fpath, [tflow.tflow(resp=True)])
+ s.load_file(fpath)
+ assert s.flowmap
+ with pytest.raises(exceptions.CommandError):
+ s.load_file("/nonexistent")
+
+
def test_config(tmpdir):
s = serverplayback.ServerPlayback()
with taddons.context() as tctx:
fpath = str(tmpdir.join("flows"))
tdump(fpath, [tflow.tflow(resp=True)])
tctx.configure(s, server_replay=[fpath])
+ s.configured = False
with pytest.raises(exceptions.OptionsError):
tctx.configure(s, server_replay=[str(tmpdir)])
diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py
deleted file mode 100644
index bcb27c79..00000000
--- a/test/mitmproxy/addons/test_streamfile.py
+++ /dev/null
@@ -1,62 +0,0 @@
-import pytest
-
-from mitmproxy.test import taddons
-from mitmproxy.test import tflow
-
-from mitmproxy import io
-from mitmproxy import exceptions
-from mitmproxy import options
-from mitmproxy.addons import streamfile
-
-
-def test_configure(tmpdir):
- sa = streamfile.StreamFile()
- with taddons.context(options=options.Options()) as tctx:
- with pytest.raises(exceptions.OptionsError):
- tctx.configure(sa, streamfile=str(tmpdir))
- with pytest.raises(Exception, match="Invalid filter"):
- tctx.configure(
- sa, streamfile=str(tmpdir.join("foo")), streamfile_filter="~~"
- )
- tctx.configure(sa, streamfile_filter="foo")
- assert sa.filt
- tctx.configure(sa, streamfile_filter=None)
- assert not sa.filt
-
-
-def rd(p):
- x = io.FlowReader(open(p, "rb"))
- return list(x.stream())
-
-
-def test_tcp(tmpdir):
- sa = streamfile.StreamFile()
- with taddons.context() as tctx:
- p = str(tmpdir.join("foo"))
- tctx.configure(sa, streamfile=p)
-
- tt = tflow.ttcpflow()
- sa.tcp_start(tt)
- sa.tcp_end(tt)
- tctx.configure(sa, streamfile=None)
- assert rd(p)
-
-
-def test_simple(tmpdir):
- sa = streamfile.StreamFile()
- with taddons.context() as tctx:
- p = str(tmpdir.join("foo"))
-
- tctx.configure(sa, streamfile=p)
-
- f = tflow.tflow(resp=True)
- sa.request(f)
- sa.response(f)
- tctx.configure(sa, streamfile=None)
- assert rd(p)[0].response
-
- tctx.configure(sa, streamfile="+" + p)
- f = tflow.tflow()
- sa.request(f)
- tctx.configure(sa, streamfile=None)
- assert not rd(p)[1].response
diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py
index 7fa3819e..979f0aa1 100644
--- a/test/mitmproxy/addons/test_view.py
+++ b/test/mitmproxy/addons/test_view.py
@@ -4,7 +4,7 @@ from mitmproxy.test import tflow
from mitmproxy.addons import view
from mitmproxy import flowfilter
-from mitmproxy import options
+from mitmproxy import exceptions
from mitmproxy.test import taddons
@@ -25,12 +25,12 @@ def test_order_refresh():
v.sig_view_refresh.connect(save)
tf = tflow.tflow(resp=True)
- with taddons.context(options=options.Options()) as tctx:
+ with taddons.context() as tctx:
tctx.configure(v, console_order="time")
- v.add(tf)
+ v.add([tf])
tf.request.timestamp_start = 1
assert not sargs
- v.update(tf)
+ v.update([tf])
assert sargs
@@ -130,9 +130,99 @@ def test_filter():
assert len(v) == 4
+def test_load():
+ v = view.View()
+ with taddons.context() as tctx:
+ tctx.master.addons.add(v)
+
+
+def test_resolve():
+ v = view.View()
+ with taddons.context() as tctx:
+ assert tctx.command(v.resolve, "@all") == []
+ assert tctx.command(v.resolve, "@focus") == []
+ assert tctx.command(v.resolve, "@shown") == []
+ assert tctx.command(v.resolve, "@hidden") == []
+ assert tctx.command(v.resolve, "@marked") == []
+ assert tctx.command(v.resolve, "@unmarked") == []
+ assert tctx.command(v.resolve, "~m get") == []
+ v.request(tft(method="get"))
+ assert len(tctx.command(v.resolve, "~m get")) == 1
+ assert len(tctx.command(v.resolve, "@focus")) == 1
+ assert len(tctx.command(v.resolve, "@all")) == 1
+ assert len(tctx.command(v.resolve, "@shown")) == 1
+ assert len(tctx.command(v.resolve, "@unmarked")) == 1
+ assert tctx.command(v.resolve, "@hidden") == []
+ assert tctx.command(v.resolve, "@marked") == []
+ v.request(tft(method="put"))
+ assert len(tctx.command(v.resolve, "@focus")) == 1
+ assert len(tctx.command(v.resolve, "@shown")) == 2
+ assert len(tctx.command(v.resolve, "@all")) == 2
+ assert tctx.command(v.resolve, "@hidden") == []
+ assert tctx.command(v.resolve, "@marked") == []
+
+ v.request(tft(method="get"))
+ v.request(tft(method="put"))
+
+ f = flowfilter.parse("~m get")
+ v.set_filter(f)
+ v[0].marked = True
+
+ def m(l):
+ return [i.request.method for i in l]
+
+ assert m(tctx.command(v.resolve, "~m get")) == ["GET", "GET"]
+ assert m(tctx.command(v.resolve, "~m put")) == ["PUT", "PUT"]
+ assert m(tctx.command(v.resolve, "@shown")) == ["GET", "GET"]
+ assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"]
+ assert m(tctx.command(v.resolve, "@marked")) == ["GET"]
+ assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"]
+ assert m(tctx.command(v.resolve, "@all")) == ["GET", "PUT", "GET", "PUT"]
+
+ with pytest.raises(exceptions.CommandError, match="Invalid flow filter"):
+ tctx.command(v.resolve, "~")
+
+
+def test_go():
+ v = view.View()
+ with taddons.context():
+ v.add([
+ tflow.tflow(),
+ tflow.tflow(),
+ tflow.tflow(),
+ tflow.tflow(),
+ tflow.tflow(),
+ ])
+ assert v.focus.index == 0
+ v.go(-1)
+ assert v.focus.index == 4
+ v.go(0)
+ assert v.focus.index == 0
+ v.go(1)
+ assert v.focus.index == 1
+ v.go(999)
+ assert v.focus.index == 4
+ v.go(-999)
+ assert v.focus.index == 0
+
+
+def test_duplicate():
+ v = view.View()
+ with taddons.context():
+ f = [
+ tflow.tflow(),
+ tflow.tflow(),
+ ]
+ v.add(f)
+ assert len(v) == 2
+ v.duplicate(f)
+ assert len(v) == 4
+ assert v.focus.index == 2
+
+
def test_order():
v = view.View()
- with taddons.context(options=options.Options()) as tctx:
+ with taddons.context() as tctx:
v.request(tft(method="get", start=1))
v.request(tft(method="put", start=2))
v.request(tft(method="get", start=3))
@@ -180,14 +270,14 @@ def test_update():
assert f in v
f.request.method = "put"
- v.update(f)
+ v.update([f])
assert f not in v
f.request.method = "get"
- v.update(f)
+ v.update([f])
assert f in v
- v.update(f)
+ v.update([f])
assert f in v
@@ -226,7 +316,7 @@ def test_signals():
assert not any([rec_add, rec_update, rec_remove, rec_refresh])
# Simple add
- v.add(tft())
+ v.add([tft()])
assert rec_add
assert not any([rec_update, rec_remove, rec_refresh])
@@ -241,14 +331,14 @@ def test_signals():
# An update that results in a flow being added to the view
clearrec()
v[0].request.method = "PUT"
- v.update(v[0])
+ v.update([v[0]])
assert rec_remove
assert not any([rec_update, rec_refresh, rec_add])
# An update that does not affect the view just sends update
v.set_filter(flowfilter.parse("~m put"))
clearrec()
- v.update(v[0])
+ v.update([v[0]])
assert rec_update
assert not any([rec_remove, rec_refresh, rec_add])
@@ -257,33 +347,33 @@ def test_signals():
v.set_filter(flowfilter.parse("~m get"))
assert not len(v)
clearrec()
- v.update(f)
+ v.update([f])
assert not any([rec_add, rec_update, rec_remove, rec_refresh])
def test_focus_follow():
v = view.View()
- with taddons.context(options=options.Options()) as tctx:
+ with taddons.context() as tctx:
tctx.configure(v, console_focus_follow=True, view_filter="~m get")
- v.add(tft(start=5))
+ v.add([tft(start=5)])
assert v.focus.index == 0
- v.add(tft(start=4))
+ v.add([tft(start=4)])
assert v.focus.index == 0
assert v.focus.flow.request.timestamp_start == 4
- v.add(tft(start=7))
+ v.add([tft(start=7)])
assert v.focus.index == 2
assert v.focus.flow.request.timestamp_start == 7
mod = tft(method="put", start=6)
- v.add(mod)
+ v.add([mod])
assert v.focus.index == 2
assert v.focus.flow.request.timestamp_start == 7
mod.request.method = "GET"
- v.update(mod)
+ v.update([mod])
assert v.focus.index == 2
assert v.focus.flow.request.timestamp_start == 6
@@ -291,7 +381,7 @@ def test_focus_follow():
def test_focus():
# Special case - initialising with a view that already contains data
v = view.View()
- v.add(tft())
+ v.add([tft()])
f = view.Focus(v)
assert f.index is 0
assert f.flow is v[0]
@@ -302,7 +392,7 @@ def test_focus():
assert f.index is None
assert f.flow is None
- v.add(tft(start=1))
+ v.add([tft(start=1)])
assert f.index == 0
assert f.flow is v[0]
@@ -312,11 +402,11 @@ def test_focus():
with pytest.raises(ValueError):
f.__setattr__("index", 99)
- v.add(tft(start=0))
+ v.add([tft(start=0)])
assert f.index == 1
assert f.flow is v[1]
- v.add(tft(start=2))
+ v.add([tft(start=2)])
assert f.index == 1
assert f.flow is v[1]
@@ -324,22 +414,25 @@ def test_focus():
assert f.index == 0
f.index = 1
- v.remove(v[1])
+ v.remove([v[1]])
+ v[1].intercept()
assert f.index == 1
assert f.flow is v[1]
- v.remove(v[1])
+ v.remove([v[1]])
assert f.index == 0
assert f.flow is v[0]
- v.remove(v[0])
+ v.remove([v[0]])
assert f.index is None
assert f.flow is None
- v.add(tft(method="get", start=0))
- v.add(tft(method="get", start=1))
- v.add(tft(method="put", start=2))
- v.add(tft(method="get", start=3))
+ v.add([
+ tft(method="get", start=0),
+ tft(method="get", start=1),
+ tft(method="put", start=2),
+ tft(method="get", start=3),
+ ])
f.flow = v[2]
assert f.flow.request.method == "PUT"
@@ -359,16 +452,16 @@ def test_settings():
with pytest.raises(KeyError):
v.settings[f]
- v.add(f)
+ v.add([f])
v.settings[f]["foo"] = "bar"
assert v.settings[f]["foo"] == "bar"
assert len(list(v.settings)) == 1
- v.remove(f)
+ v.remove([f])
with pytest.raises(KeyError):
v.settings[f]
assert not v.settings.keys()
- v.add(f)
+ v.add([f])
v.settings[f]["foo"] = "bar"
assert v.settings.keys()
v.clear()
@@ -377,7 +470,7 @@ def test_settings():
def test_configure():
v = view.View()
- with taddons.context(options=options.Options()) as tctx:
+ with taddons.context() as tctx:
tctx.configure(v, view_filter="~q")
with pytest.raises(Exception, match="Invalid interception filter"):
tctx.configure(v, view_filter="~~")
diff --git a/test/mitmproxy/test_addonmanager.py b/test/mitmproxy/test_addonmanager.py
index 7b461580..678bc1b7 100644
--- a/test/mitmproxy/test_addonmanager.py
+++ b/test/mitmproxy/test_addonmanager.py
@@ -4,6 +4,7 @@ from mitmproxy import addons
from mitmproxy import addonmanager
from mitmproxy import exceptions
from mitmproxy import options
+from mitmproxy import command
from mitmproxy import master
from mitmproxy import proxy
from mitmproxy.test import taddons
@@ -18,6 +19,10 @@ class TAddon:
if addons:
self.addons = addons
+ @command.command("test.command")
+ def testcommand(self) -> str:
+ return "here"
+
def __repr__(self):
return "Addon(%s)" % self.name
@@ -38,6 +43,12 @@ class AOption:
l.add_option("custom_option", bool, False, "help")
+def test_command():
+ with taddons.context() as tctx:
+ tctx.master.addons.add(TAddon("test"))
+ assert tctx.master.commands.call("test.command") == "here"
+
+
def test_halt():
o = options.Options()
m = master.Master(o, proxy.DummyServer(o))
@@ -61,9 +72,9 @@ def test_lifecycle():
a = addonmanager.AddonManager(m)
a.add(TAddon("one"))
- with pytest.raises(exceptions.AddonError):
+ with pytest.raises(exceptions.AddonManagerError):
a.add(TAddon("one"))
- with pytest.raises(exceptions.AddonError):
+ with pytest.raises(exceptions.AddonManagerError):
a.remove(TAddon("nonexistent"))
f = tflow.tflow()
@@ -82,6 +93,11 @@ def test_loader():
l.add_option("custom_option", bool, False, "help")
l.add_option("custom_option", bool, False, "help")
+ def cmd(a: str) -> str:
+ return "foo"
+
+ l.add_command("test.command", cmd)
+
def test_simple():
with taddons.context() as tctx:
diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py
new file mode 100644
index 00000000..96d79dba
--- /dev/null
+++ b/test/mitmproxy/test_command.py
@@ -0,0 +1,128 @@
+import typing
+from mitmproxy import command
+from mitmproxy import flow
+from mitmproxy import master
+from mitmproxy import options
+from mitmproxy import proxy
+from mitmproxy import exceptions
+from mitmproxy.test import tflow
+from mitmproxy.test import taddons
+import pytest
+
+
+class TAddon:
+ def cmd1(self, foo: str) -> str:
+ """cmd1 help"""
+ return "ret " + foo
+
+ def cmd2(self, foo: str) -> str:
+ return 99
+
+ def empty(self) -> None:
+ pass
+
+
+class TestCommand:
+ def test_call(self):
+ o = options.Options()
+ m = master.Master(o, proxy.DummyServer(o))
+ cm = command.CommandManager(m)
+
+ a = TAddon()
+ c = command.Command(cm, "cmd.path", a.cmd1)
+ assert c.call(["foo"]) == "ret foo"
+ assert c.signature_help() == "cmd.path str -> str"
+
+ c = command.Command(cm, "cmd.two", a.cmd2)
+ with pytest.raises(exceptions.CommandError):
+ c.call(["foo"])
+
+
+def test_simple():
+ with taddons.context() as tctx:
+ c = command.CommandManager(tctx.master)
+ a = TAddon()
+ c.add("one.two", a.cmd1)
+ assert c.commands["one.two"].help == "cmd1 help"
+ assert(c.call("one.two foo") == "ret foo")
+ with pytest.raises(exceptions.CommandError, match="Unknown"):
+ c.call("nonexistent")
+ with pytest.raises(exceptions.CommandError, match="Invalid"):
+ c.call("")
+ with pytest.raises(exceptions.CommandError, match="Usage"):
+ c.call("one.two too many args")
+
+ c.add("empty", a.empty)
+ c.call("empty")
+
+
+def test_typename():
+ assert command.typename(str, True) == "str"
+ assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]"
+ assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec"
+ assert command.typename(flow.Flow, False) == "flow"
+
+
+class DummyConsole:
+ def load(self, l):
+ l.add_command("view.resolve", self.resolve)
+
+ def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
+ n = int(spec)
+ return [tflow.tflow(resp=True)] * n
+
+
+def test_parsearg():
+ with taddons.context() as tctx:
+ tctx.master.addons.add(DummyConsole())
+ assert command.parsearg(tctx.master.commands, "foo", str) == "foo"
+
+ assert command.parsearg(tctx.master.commands, "1", int) == 1
+ with pytest.raises(exceptions.CommandError):
+ command.parsearg(tctx.master.commands, "foo", int)
+
+ assert command.parsearg(tctx.master.commands, "true", bool) is True
+ assert command.parsearg(tctx.master.commands, "false", bool) is False
+ with pytest.raises(exceptions.CommandError):
+ command.parsearg(tctx.master.commands, "flobble", bool)
+
+ assert len(command.parsearg(
+ tctx.master.commands, "2", typing.Sequence[flow.Flow]
+ )) == 2
+ assert command.parsearg(tctx.master.commands, "1", flow.Flow)
+ with pytest.raises(exceptions.CommandError):
+ command.parsearg(tctx.master.commands, "2", flow.Flow)
+ with pytest.raises(exceptions.CommandError):
+ command.parsearg(tctx.master.commands, "0", flow.Flow)
+ with pytest.raises(exceptions.CommandError):
+ command.parsearg(tctx.master.commands, "foo", Exception)
+
+
+class TDec:
+ @command.command("cmd1")
+ def cmd1(self, foo: str) -> str:
+ """cmd1 help"""
+ return "ret " + foo
+
+ @command.command("cmd2")
+ def cmd2(self, foo: str) -> str:
+ return 99
+
+ @command.command("empty")
+ def empty(self) -> None:
+ pass
+
+
+def test_decorator():
+ with taddons.context() as tctx:
+ c = command.CommandManager(tctx.master)
+ a = TDec()
+ c.collect_commands(a)
+ assert "cmd1" in c.commands
+ assert c.call("cmd1 bar") == "ret bar"
+ assert "empty" in c.commands
+ assert c.call("empty") is None
+
+ with taddons.context() as tctx:
+ tctx.master.addons.add(a)
+ assert tctx.master.commands.call("cmd1 bar") == "ret bar"
diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py
index a685570f..04ec7ded 100644
--- a/test/mitmproxy/test_optmanager.py
+++ b/test/mitmproxy/test_optmanager.py
@@ -381,6 +381,11 @@ def test_set():
with pytest.raises(exceptions.OptionsError):
opts.set("bool=wobble")
+ opts.set("bool=toggle")
+ assert opts.bool is False
+ opts.set("bool=toggle")
+ assert opts.bool is True
+
opts.set("int=1")
assert opts.int == 1
with pytest.raises(exceptions.OptionsError):
diff --git a/test/mitmproxy/tools/web/test_app.py b/test/mitmproxy/tools/web/test_app.py
index e3d5dc44..2b6181d3 100644
--- a/test/mitmproxy/tools/web/test_app.py
+++ b/test/mitmproxy/tools/web/test_app.py
@@ -23,8 +23,8 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
m = webmaster.WebMaster(o, proxy.DummyServer(), with_termlog=False)
f = tflow.tflow(resp=True)
f.id = "42"
- m.view.add(f)
- m.view.add(tflow.tflow(err=True))
+ m.view.add([f])
+ m.view.add([tflow.tflow(err=True)])
m.add_log("test log", "info")
self.master = m
self.view = m.view
@@ -78,7 +78,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
# restore
for f in flows:
- self.view.add(f)
+ self.view.add([f])
self.events.data = events
def test_resume(self):
@@ -110,7 +110,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
assert self.fetch("/flows/42", method="DELETE").code == 200
assert not self.view.get_by_id("42")
- self.view.add(f)
+ self.view.add([f])
assert self.fetch("/flows/1234", method="DELETE").code == 404
@@ -162,7 +162,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
f = self.view.get_by_id(resp.body.decode())
assert f
assert f.id != "42"
- self.view.remove(f)
+ self.view.remove([f])
def test_flow_revert(self):
f = self.view.get_by_id("42")
diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py
index fd0c6e0c..22bd7c34 100644
--- a/test/mitmproxy/utils/test_typecheck.py
+++ b/test/mitmproxy/utils/test_typecheck.py
@@ -16,72 +16,86 @@ class T(TBase):
super(T, self).__init__(42)
-def test_check_type():
- typecheck.check_type("foo", 42, int)
+def test_check_option_type():
+ typecheck.check_option_type("foo", 42, int)
with pytest.raises(TypeError):
- typecheck.check_type("foo", 42, str)
+ typecheck.check_option_type("foo", 42, str)
with pytest.raises(TypeError):
- typecheck.check_type("foo", None, str)
+ typecheck.check_option_type("foo", None, str)
with pytest.raises(TypeError):
- typecheck.check_type("foo", b"foo", str)
+ typecheck.check_option_type("foo", b"foo", str)
def test_check_union():
- typecheck.check_type("foo", 42, typing.Union[int, str])
- typecheck.check_type("foo", "42", typing.Union[int, str])
+ typecheck.check_option_type("foo", 42, typing.Union[int, str])
+ typecheck.check_option_type("foo", "42", typing.Union[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", [], typing.Union[int, str])
+ typecheck.check_option_type("foo", [], typing.Union[int, str])
# Python 3.5 only defines __union_params__
m = mock.Mock()
m.__str__ = lambda self: "typing.Union"
m.__union_params__ = (int,)
- typecheck.check_type("foo", 42, m)
+ typecheck.check_option_type("foo", 42, m)
def test_check_tuple():
- typecheck.check_type("foo", (42, "42"), typing.Tuple[int, str])
+ typecheck.check_option_type("foo", (42, "42"), typing.Tuple[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", None, typing.Tuple[int, str])
+ typecheck.check_option_type("foo", None, typing.Tuple[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", (), typing.Tuple[int, str])
+ typecheck.check_option_type("foo", (), typing.Tuple[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", (42, 42), typing.Tuple[int, str])
+ typecheck.check_option_type("foo", (42, 42), typing.Tuple[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", ("42", 42), typing.Tuple[int, str])
+ typecheck.check_option_type("foo", ("42", 42), typing.Tuple[int, str])
# Python 3.5 only defines __tuple_params__
m = mock.Mock()
m.__str__ = lambda self: "typing.Tuple"
m.__tuple_params__ = (int, str)
- typecheck.check_type("foo", (42, "42"), m)
+ typecheck.check_option_type("foo", (42, "42"), m)
def test_check_sequence():
- typecheck.check_type("foo", [10], typing.Sequence[int])
+ typecheck.check_option_type("foo", [10], typing.Sequence[int])
with pytest.raises(TypeError):
- typecheck.check_type("foo", ["foo"], typing.Sequence[int])
+ typecheck.check_option_type("foo", ["foo"], typing.Sequence[int])
with pytest.raises(TypeError):
- typecheck.check_type("foo", [10, "foo"], typing.Sequence[int])
+ typecheck.check_option_type("foo", [10, "foo"], typing.Sequence[int])
with pytest.raises(TypeError):
- typecheck.check_type("foo", [b"foo"], typing.Sequence[str])
+ typecheck.check_option_type("foo", [b"foo"], typing.Sequence[str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", "foo", typing.Sequence[str])
+ typecheck.check_option_type("foo", "foo", typing.Sequence[str])
# Python 3.5 only defines __parameters__
m = mock.Mock()
m.__str__ = lambda self: "typing.Sequence"
m.__parameters__ = (int,)
- typecheck.check_type("foo", [10], m)
+ typecheck.check_option_type("foo", [10], m)
def test_check_io():
- typecheck.check_type("foo", io.StringIO(), typing.IO[str])
+ typecheck.check_option_type("foo", io.StringIO(), typing.IO[str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", "foo", typing.IO[str])
+ typecheck.check_option_type("foo", "foo", typing.IO[str])
def test_check_any():
- typecheck.check_type("foo", 42, typing.Any)
- typecheck.check_type("foo", object(), typing.Any)
- typecheck.check_type("foo", None, typing.Any)
+ typecheck.check_option_type("foo", 42, typing.Any)
+ typecheck.check_option_type("foo", object(), typing.Any)
+ typecheck.check_option_type("foo", None, typing.Any)
+
+
+def test_check_command_return_type():
+ assert(typecheck.check_command_return_type("foo", str))
+ assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str]))
+ assert(typecheck.check_command_return_type(None, None))
+ assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int]))
+ assert(not typecheck.check_command_return_type("foo", typing.Sequence[int]))
+
+ # Python 3.5 only defines __parameters__
+ m = mock.Mock()
+ m.__str__ = lambda self: "typing.Sequence"
+ m.__parameters__ = (int,)
+ typecheck.check_command_return_type([10], m)