diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_cmdline.py | 13 | ||||
-rw-r--r-- | test/test_dump.py | 6 | ||||
-rw-r--r-- | test/test_flow.py | 27 | ||||
-rw-r--r-- | test/test_script.py | 103 | ||||
-rw-r--r-- | test/tutils.py | 5 |
5 files changed, 71 insertions, 83 deletions
diff --git a/test/test_cmdline.py b/test/test_cmdline.py index 92e2adbd..dbc61bfc 100644 --- a/test/test_cmdline.py +++ b/test/test_cmdline.py @@ -40,19 +40,6 @@ def test_parse_setheaders(): x = cmdline.parse_setheader("/foo/bar/voing") assert x == ("foo", "bar", "voing") -def test_shlex(): - """ - shlex.split assumes posix=True by default, we do manual detection for windows. - Test whether script paths are parsed correctly - """ - absfilepath = os.path.normcase(os.path.abspath(__file__)) - - parser = argparse.ArgumentParser() - cmdline.common_options(parser) - opts = parser.parse_args(args=["-s",absfilepath]) - - assert os.path.isfile(opts.scripts[0][0]) - def test_common(): parser = argparse.ArgumentParser() cmdline.common_options(parser) diff --git a/test/test_dump.py b/test/test_dump.py index 9874b650..a958a2ec 100644 --- a/test/test_dump.py +++ b/test/test_dump.py @@ -153,7 +153,7 @@ class TestDumpMaster: def test_script(self): ret = self._dummy_cycle( 1, None, "", - scripts=[[tutils.test_data.path("scripts/all.py")]], verbosity=0, eventlog=True + scripts=[tutils.test_data.path("scripts/all.py")], verbosity=0, eventlog=True ) assert "XCLIENTCONNECT" in ret assert "XSERVERCONNECT" in ret @@ -162,11 +162,11 @@ class TestDumpMaster: assert "XCLIENTDISCONNECT" in ret tutils.raises( dump.DumpError, - self._dummy_cycle, 1, None, "", scripts=[["nonexistent"]] + self._dummy_cycle, 1, None, "", scripts=["nonexistent"] ) tutils.raises( dump.DumpError, - self._dummy_cycle, 1, None, "", scripts=[["starterr.py"]] + self._dummy_cycle, 1, None, "", scripts=["starterr.py"] ) def test_stickycookie(self): diff --git a/test/test_flow.py b/test/test_flow.py index bf6a7a42..f9198f0c 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -563,12 +563,11 @@ class TestFlowMaster: def test_load_script(self): s = flow.State() fm = flow.FlowMaster(None, s) - assert not fm.load_script([tutils.test_data.path("scripts/a.py")]) - assert not fm.load_script([tutils.test_data.path("scripts/a.py")]) - assert not fm.unload_script(fm.scripts[0]) - assert not fm.unload_script(fm.scripts[0]) - assert fm.load_script(["nonexistent"]) - assert "ValueError" in fm.load_script([tutils.test_data.path("scripts/starterr.py")]) + assert not fm.load_script(tutils.test_data.path("scripts/a.py")) + assert not fm.load_script(tutils.test_data.path("scripts/a.py")) + assert not fm.unload_scripts() + assert fm.load_script("nonexistent") + assert "ValueError" in fm.load_script(tutils.test_data.path("scripts/starterr.py")) assert len(fm.scripts) == 0 def test_replay(self): @@ -584,7 +583,7 @@ class TestFlowMaster: def test_script_reqerr(self): s = flow.State() fm = flow.FlowMaster(None, s) - assert not fm.load_script([tutils.test_data.path("scripts/reqerr.py")]) + assert not fm.load_script(tutils.test_data.path("scripts/reqerr.py")) req = tutils.treq() fm.handle_clientconnect(req.client_conn) assert fm.handle_request(req) @@ -592,7 +591,7 @@ class TestFlowMaster: def test_script(self): s = flow.State() fm = flow.FlowMaster(None, s) - assert not fm.load_script([tutils.test_data.path("scripts/all.py")]) + assert not fm.load_script(tutils.test_data.path("scripts/all.py")) req = tutils.treq() fm.handle_clientconnect(req.client_conn) assert fm.scripts[0].ns["log"][-1] == "clientconnect" @@ -606,16 +605,20 @@ class TestFlowMaster: fm.handle_response(resp) assert fm.scripts[0].ns["log"][-1] == "response" #load second script - assert not fm.load_script([tutils.test_data.path("scripts/all.py")]) + assert not fm.load_script(tutils.test_data.path("scripts/all.py")) assert len(fm.scripts) == 2 dc = flow.ClientDisconnect(req.client_conn) dc.reply = controller.DummyReply() fm.handle_clientdisconnect(dc) assert fm.scripts[0].ns["log"][-1] == "clientdisconnect" assert fm.scripts[1].ns["log"][-1] == "clientdisconnect" + + #unload first script - fm.unload_script(fm.scripts[0]) - assert len(fm.scripts) == 1 + fm.unload_scripts() + assert len(fm.scripts) == 0 + + assert not fm.load_script(tutils.test_data.path("scripts/all.py")) err = flow.Error(f.request, "msg") err.reply = controller.DummyReply() fm.handle_error(err) @@ -659,7 +662,7 @@ class TestFlowMaster: err.reply = controller.DummyReply() fm.handle_error(err) - fm.load_script([tutils.test_data.path("scripts/a.py")]) + fm.load_script(tutils.test_data.path("scripts/a.py")) fm.shutdown() def test_client_playback(self): diff --git a/test/test_script.py b/test/test_script.py index ad2296ef..025e9f37 100644 --- a/test/test_script.py +++ b/test/test_script.py @@ -3,27 +3,15 @@ import tutils import shlex import os import time +import mock -class TCounter: - count = 0 - - def __call__(self, *args, **kwargs): - self.count += 1 - - -class TScriptContext(TCounter): - def log(self, msg): - self.__call__() - class TestScript: def test_simple(self): s = flow.State() fm = flow.FlowMaster(None, s) - ctx = flow.ScriptContext(fm) - - p = script.Script(shlex.split(tutils.test_data.path("scripts/a.py")+" --var 40", posix=(os.name != "nt")), ctx) - p.load() + sp = tutils.test_data.path("scripts/a.py") + p = script.Script("%s --var 40"%sp, fm) assert "here" in p.ns assert p.run("here") == (True, 41) @@ -40,7 +28,7 @@ class TestScript: def test_duplicate_flow(self): s = flow.State() fm = flow.FlowMaster(None, s) - fm.load_script([tutils.test_data.path("scripts/duplicate_flow.py")]) + fm.load_script(tutils.test_data.path("scripts/duplicate_flow.py")) r = tutils.treq() fm.handle_request(r) assert fm.state.flow_count() == 2 @@ -50,70 +38,79 @@ class TestScript: def test_err(self): s = flow.State() fm = flow.FlowMaster(None, s) - ctx = flow.ScriptContext(fm) - - s = script.Script(["nonexistent"], ctx) tutils.raises( - "no such file", - s.load + "not found", + script.Script, "nonexistent", fm ) - s = script.Script([tutils.test_data.path("scripts")], ctx) tutils.raises( "not a file", - s.load + script.Script, tutils.test_data.path("scripts"), fm ) - s = script.Script([tutils.test_data.path("scripts/syntaxerr.py")], ctx) tutils.raises( script.ScriptError, - s.load + script.Script, tutils.test_data.path("scripts/syntaxerr.py"), fm ) - s = script.Script([tutils.test_data.path("scripts/loaderr.py")], ctx) tutils.raises( script.ScriptError, - s.load + script.Script, tutils.test_data.path("scripts/loaderr.py"), fm ) def test_concurrent(self): s = flow.State() fm = flow.FlowMaster(None, s) - fm.load_script([tutils.test_data.path("scripts/concurrent_decorator.py")]) - - reply = TCounter() - r1, r2 = tutils.treq(), tutils.treq() - r1.reply, r2.reply = reply, reply - t_start = time.time() - fm.handle_request(r1) - r1.reply() - fm.handle_request(r2) - r2.reply() - assert reply.count < 2 - assert (time.time() - t_start) < 0.09 - time.sleep(0.2) - assert reply.count == 2 + fm.load_script(tutils.test_data.path("scripts/concurrent_decorator.py")) + + with mock.patch("libmproxy.controller.DummyReply.__call__") as m: + r1, r2 = tutils.treq(), tutils.treq() + t_start = time.time() + fm.handle_request(r1) + r1.reply() + fm.handle_request(r2) + r2.reply() + + # Two instantiations + assert m.call_count == 2 + assert (time.time() - t_start) < 0.09 + time.sleep(0.2) + # Plus two invocations + assert m.call_count == 4 def test_concurrent2(self): - ctx = TScriptContext() - s = script.Script([tutils.test_data.path("scripts/concurrent_decorator.py")], ctx) + s = flow.State() + fm = flow.FlowMaster(None, s) + s = script.Script(tutils.test_data.path("scripts/concurrent_decorator.py"), fm) s.load() f = tutils.tflow_full() f.error = tutils.terr(f.request) f.reply = f.request.reply - s.run("clientconnect", f) - s.run("serverconnect", f) - s.run("response", f) - s.run("error", f) - s.run("clientdisconnect", f) - time.sleep(0.1) - assert ctx.count == 5 + with mock.patch("libmproxy.controller.DummyReply.__call__") as m: + s.run("clientconnect", f) + s.run("serverconnect", f) + s.run("response", f) + s.run("error", f) + s.run("clientdisconnect", f) + time.sleep(0.1) + assert m.call_count == 5 def test_concurrent_err(self): - s = script.Script([tutils.test_data.path("scripts/concurrent_decorator_err.py")], TScriptContext()) + s = flow.State() + fm = flow.FlowMaster(None, s) tutils.raises( "decorator not supported for this method", - s.load - )
\ No newline at end of file + script.Script, tutils.test_data.path("scripts/concurrent_decorator_err.py"), fm + ) + + +def test_command_parsing(): + s = flow.State() + fm = flow.FlowMaster(None, s) + absfilepath = os.path.normcase(tutils.test_data.path("scripts/a.py")) + s = script.Script(absfilepath, fm) + assert os.path.isfile(s.argv[0]) + + diff --git a/test/tutils.py b/test/tutils.py index d6332107..fb41d77a 100644 --- a/test/tutils.py +++ b/test/tutils.py @@ -1,8 +1,9 @@ import os, shutil, tempfile from contextlib import contextmanager from libmproxy import flow, utils, controller -from libmproxy.console.flowview import FlowView -from libmproxy.console import ConsoleState +if os.name != "nt": + from libmproxy.console.flowview import FlowView + from libmproxy.console import ConsoleState from netlib import certutils from nose.plugins.skip import SkipTest from mock import Mock |