aboutsummaryrefslogtreecommitdiffstats
path: root/test/mitmproxy/builtins/test_script.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/mitmproxy/builtins/test_script.py')
-rw-r--r--test/mitmproxy/builtins/test_script.py128
1 files changed, 128 insertions, 0 deletions
diff --git a/test/mitmproxy/builtins/test_script.py b/test/mitmproxy/builtins/test_script.py
new file mode 100644
index 00000000..7c9787f8
--- /dev/null
+++ b/test/mitmproxy/builtins/test_script.py
@@ -0,0 +1,128 @@
+import time
+
+from mitmproxy.builtins import script
+from mitmproxy import exceptions
+from mitmproxy.flow import master
+from mitmproxy.flow import state
+from mitmproxy.flow import options
+
+from .. import tutils, mastertest
+
+
+class TestParseCommand:
+ def test_empty_command(self):
+ with tutils.raises(exceptions.AddonError):
+ script.parse_command("")
+
+ with tutils.raises(exceptions.AddonError):
+ script.parse_command(" ")
+
+ def test_no_script_file(self):
+ with tutils.raises("not found"):
+ script.parse_command("notfound")
+
+ with tutils.tmpdir() as dir:
+ with tutils.raises("not a file"):
+ script.parse_command(dir)
+
+ def test_parse_args(self):
+ with tutils.chdir(tutils.test_data.dirname):
+ assert script.parse_command("data/scripts/a.py") == ("data/scripts/a.py", [])
+ assert script.parse_command("data/scripts/a.py foo bar") == ("data/scripts/a.py", ["foo", "bar"])
+ assert script.parse_command("data/scripts/a.py 'foo bar'") == ("data/scripts/a.py", ["foo bar"])
+
+ @tutils.skip_not_windows
+ def test_parse_windows(self):
+ with tutils.chdir(tutils.test_data.dirname):
+ assert script.parse_command("data\\scripts\\a.py") == ("data\\scripts\\a.py", [])
+ assert script.parse_command("data\\scripts\\a.py 'foo \\ bar'") == ("data\\scripts\\a.py", 'foo \\ bar', [])
+
+
+def test_load_script():
+ ns = script.load_script(
+ tutils.test_data.path(
+ "data/addonscripts/recorder.py"
+ ), []
+ )
+ assert ns["configure"]
+
+
+class TestScript(mastertest.MasterTest):
+ def test_simple(self):
+ s = state.State()
+ m = master.FlowMaster(options.Options(), None, s)
+ sc = script.Script(
+ tutils.test_data.path(
+ "data/addonscripts/recorder.py"
+ )
+ )
+ m.addons.add(sc)
+ assert sc.ns["call_log"] == [("configure", (options.Options(),), {})]
+
+ sc.ns["call_log"] = []
+ f = tutils.tflow(resp=True)
+ self.invoke(m, "request", f)
+
+ recf = sc.ns["call_log"][0]
+ assert recf[0] == "request"
+
+ def test_reload(self):
+ s = state.State()
+ m = mastertest.RecordingMaster(options.Options(), None, s)
+ with tutils.tmpdir():
+ with open("foo.py", "w"):
+ pass
+ sc = script.Script("foo.py")
+ m.addons.add(sc)
+
+ for _ in range(100):
+ with open("foo.py", "a") as f:
+ f.write(".")
+ m.addons.invoke_with_context(sc, "tick")
+ time.sleep(0.1)
+ if m.event_log:
+ return
+ raise AssertionError("Change event not detected.")
+
+ def test_exception(self):
+ s = state.State()
+ m = mastertest.RecordingMaster(options.Options(), None, s)
+ sc = script.Script(
+ tutils.test_data.path("data/addonscripts/error.py")
+ )
+ m.addons.add(sc)
+ f = tutils.tflow(resp=True)
+ self.invoke(m, "request", f)
+ assert m.event_log[0][0] == "warn"
+
+ def test_duplicate_flow(self):
+ s = state.State()
+ fm = master.FlowMaster(None, None, s)
+ fm.addons.add(
+ script.Script(
+ tutils.test_data.path("data/addonscripts/duplicate_flow.py")
+ )
+ )
+ f = tutils.tflow()
+ fm.request(f)
+ assert fm.state.flow_count() == 2
+ assert not fm.state.view[0].request.is_replay
+ assert fm.state.view[1].request.is_replay
+
+
+class TestScriptLoader(mastertest.MasterTest):
+ def test_simple(self):
+ s = state.State()
+ o = options.Options(scripts=[])
+ m = master.FlowMaster(o, None, s)
+ sc = script.ScriptLoader()
+ m.addons.add(sc)
+ assert len(m.addons) == 1
+ o.update(
+ scripts = [
+ tutils.test_data.path("data/addonscripts/recorder.py")
+ ]
+ )
+ assert len(m.addons) == 2
+ o.update(scripts = [])
+ assert len(m.addons) == 1