aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_cmdline.py13
-rw-r--r--test/test_dump.py6
-rw-r--r--test/test_flow.py27
-rw-r--r--test/test_script.py103
-rw-r--r--test/tutils.py5
5 files changed, 71 insertions, 83 deletions
diff --git a/test/test_cmdline.py b/test/test_cmdline.py
index 92e2adbd..dbc61bfc 100644
--- a/test/test_cmdline.py
+++ b/test/test_cmdline.py
@@ -40,19 +40,6 @@ def test_parse_setheaders():
x = cmdline.parse_setheader("/foo/bar/voing")
assert x == ("foo", "bar", "voing")
-def test_shlex():
- """
- shlex.split assumes posix=True by default, we do manual detection for windows.
- Test whether script paths are parsed correctly
- """
- absfilepath = os.path.normcase(os.path.abspath(__file__))
-
- parser = argparse.ArgumentParser()
- cmdline.common_options(parser)
- opts = parser.parse_args(args=["-s",absfilepath])
-
- assert os.path.isfile(opts.scripts[0][0])
-
def test_common():
parser = argparse.ArgumentParser()
cmdline.common_options(parser)
diff --git a/test/test_dump.py b/test/test_dump.py
index 9874b650..a958a2ec 100644
--- a/test/test_dump.py
+++ b/test/test_dump.py
@@ -153,7 +153,7 @@ class TestDumpMaster:
def test_script(self):
ret = self._dummy_cycle(
1, None, "",
- scripts=[[tutils.test_data.path("scripts/all.py")]], verbosity=0, eventlog=True
+ scripts=[tutils.test_data.path("scripts/all.py")], verbosity=0, eventlog=True
)
assert "XCLIENTCONNECT" in ret
assert "XSERVERCONNECT" in ret
@@ -162,11 +162,11 @@ class TestDumpMaster:
assert "XCLIENTDISCONNECT" in ret
tutils.raises(
dump.DumpError,
- self._dummy_cycle, 1, None, "", scripts=[["nonexistent"]]
+ self._dummy_cycle, 1, None, "", scripts=["nonexistent"]
)
tutils.raises(
dump.DumpError,
- self._dummy_cycle, 1, None, "", scripts=[["starterr.py"]]
+ self._dummy_cycle, 1, None, "", scripts=["starterr.py"]
)
def test_stickycookie(self):
diff --git a/test/test_flow.py b/test/test_flow.py
index bf6a7a42..f9198f0c 100644
--- a/test/test_flow.py
+++ b/test/test_flow.py
@@ -563,12 +563,11 @@ class TestFlowMaster:
def test_load_script(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- assert not fm.load_script([tutils.test_data.path("scripts/a.py")])
- assert not fm.load_script([tutils.test_data.path("scripts/a.py")])
- assert not fm.unload_script(fm.scripts[0])
- assert not fm.unload_script(fm.scripts[0])
- assert fm.load_script(["nonexistent"])
- assert "ValueError" in fm.load_script([tutils.test_data.path("scripts/starterr.py")])
+ assert not fm.load_script(tutils.test_data.path("scripts/a.py"))
+ assert not fm.load_script(tutils.test_data.path("scripts/a.py"))
+ assert not fm.unload_scripts()
+ assert fm.load_script("nonexistent")
+ assert "ValueError" in fm.load_script(tutils.test_data.path("scripts/starterr.py"))
assert len(fm.scripts) == 0
def test_replay(self):
@@ -584,7 +583,7 @@ class TestFlowMaster:
def test_script_reqerr(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- assert not fm.load_script([tutils.test_data.path("scripts/reqerr.py")])
+ assert not fm.load_script(tutils.test_data.path("scripts/reqerr.py"))
req = tutils.treq()
fm.handle_clientconnect(req.client_conn)
assert fm.handle_request(req)
@@ -592,7 +591,7 @@ class TestFlowMaster:
def test_script(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- assert not fm.load_script([tutils.test_data.path("scripts/all.py")])
+ assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
req = tutils.treq()
fm.handle_clientconnect(req.client_conn)
assert fm.scripts[0].ns["log"][-1] == "clientconnect"
@@ -606,16 +605,20 @@ class TestFlowMaster:
fm.handle_response(resp)
assert fm.scripts[0].ns["log"][-1] == "response"
#load second script
- assert not fm.load_script([tutils.test_data.path("scripts/all.py")])
+ assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
assert len(fm.scripts) == 2
dc = flow.ClientDisconnect(req.client_conn)
dc.reply = controller.DummyReply()
fm.handle_clientdisconnect(dc)
assert fm.scripts[0].ns["log"][-1] == "clientdisconnect"
assert fm.scripts[1].ns["log"][-1] == "clientdisconnect"
+
+
#unload first script
- fm.unload_script(fm.scripts[0])
- assert len(fm.scripts) == 1
+ fm.unload_scripts()
+ assert len(fm.scripts) == 0
+
+ assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
err = flow.Error(f.request, "msg")
err.reply = controller.DummyReply()
fm.handle_error(err)
@@ -659,7 +662,7 @@ class TestFlowMaster:
err.reply = controller.DummyReply()
fm.handle_error(err)
- fm.load_script([tutils.test_data.path("scripts/a.py")])
+ fm.load_script(tutils.test_data.path("scripts/a.py"))
fm.shutdown()
def test_client_playback(self):
diff --git a/test/test_script.py b/test/test_script.py
index ad2296ef..025e9f37 100644
--- a/test/test_script.py
+++ b/test/test_script.py
@@ -3,27 +3,15 @@ import tutils
import shlex
import os
import time
+import mock
-class TCounter:
- count = 0
-
- def __call__(self, *args, **kwargs):
- self.count += 1
-
-
-class TScriptContext(TCounter):
- def log(self, msg):
- self.__call__()
-
class TestScript:
def test_simple(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- ctx = flow.ScriptContext(fm)
-
- p = script.Script(shlex.split(tutils.test_data.path("scripts/a.py")+" --var 40", posix=(os.name != "nt")), ctx)
- p.load()
+ sp = tutils.test_data.path("scripts/a.py")
+ p = script.Script("%s --var 40"%sp, fm)
assert "here" in p.ns
assert p.run("here") == (True, 41)
@@ -40,7 +28,7 @@ class TestScript:
def test_duplicate_flow(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- fm.load_script([tutils.test_data.path("scripts/duplicate_flow.py")])
+ fm.load_script(tutils.test_data.path("scripts/duplicate_flow.py"))
r = tutils.treq()
fm.handle_request(r)
assert fm.state.flow_count() == 2
@@ -50,70 +38,79 @@ class TestScript:
def test_err(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- ctx = flow.ScriptContext(fm)
-
- s = script.Script(["nonexistent"], ctx)
tutils.raises(
- "no such file",
- s.load
+ "not found",
+ script.Script, "nonexistent", fm
)
- s = script.Script([tutils.test_data.path("scripts")], ctx)
tutils.raises(
"not a file",
- s.load
+ script.Script, tutils.test_data.path("scripts"), fm
)
- s = script.Script([tutils.test_data.path("scripts/syntaxerr.py")], ctx)
tutils.raises(
script.ScriptError,
- s.load
+ script.Script, tutils.test_data.path("scripts/syntaxerr.py"), fm
)
- s = script.Script([tutils.test_data.path("scripts/loaderr.py")], ctx)
tutils.raises(
script.ScriptError,
- s.load
+ script.Script, tutils.test_data.path("scripts/loaderr.py"), fm
)
def test_concurrent(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- fm.load_script([tutils.test_data.path("scripts/concurrent_decorator.py")])
-
- reply = TCounter()
- r1, r2 = tutils.treq(), tutils.treq()
- r1.reply, r2.reply = reply, reply
- t_start = time.time()
- fm.handle_request(r1)
- r1.reply()
- fm.handle_request(r2)
- r2.reply()
- assert reply.count < 2
- assert (time.time() - t_start) < 0.09
- time.sleep(0.2)
- assert reply.count == 2
+ fm.load_script(tutils.test_data.path("scripts/concurrent_decorator.py"))
+
+ with mock.patch("libmproxy.controller.DummyReply.__call__") as m:
+ r1, r2 = tutils.treq(), tutils.treq()
+ t_start = time.time()
+ fm.handle_request(r1)
+ r1.reply()
+ fm.handle_request(r2)
+ r2.reply()
+
+ # Two instantiations
+ assert m.call_count == 2
+ assert (time.time() - t_start) < 0.09
+ time.sleep(0.2)
+ # Plus two invocations
+ assert m.call_count == 4
def test_concurrent2(self):
- ctx = TScriptContext()
- s = script.Script([tutils.test_data.path("scripts/concurrent_decorator.py")], ctx)
+ s = flow.State()
+ fm = flow.FlowMaster(None, s)
+ s = script.Script(tutils.test_data.path("scripts/concurrent_decorator.py"), fm)
s.load()
f = tutils.tflow_full()
f.error = tutils.terr(f.request)
f.reply = f.request.reply
- s.run("clientconnect", f)
- s.run("serverconnect", f)
- s.run("response", f)
- s.run("error", f)
- s.run("clientdisconnect", f)
- time.sleep(0.1)
- assert ctx.count == 5
+ with mock.patch("libmproxy.controller.DummyReply.__call__") as m:
+ s.run("clientconnect", f)
+ s.run("serverconnect", f)
+ s.run("response", f)
+ s.run("error", f)
+ s.run("clientdisconnect", f)
+ time.sleep(0.1)
+ assert m.call_count == 5
def test_concurrent_err(self):
- s = script.Script([tutils.test_data.path("scripts/concurrent_decorator_err.py")], TScriptContext())
+ s = flow.State()
+ fm = flow.FlowMaster(None, s)
tutils.raises(
"decorator not supported for this method",
- s.load
- ) \ No newline at end of file
+ script.Script, tutils.test_data.path("scripts/concurrent_decorator_err.py"), fm
+ )
+
+
+def test_command_parsing():
+ s = flow.State()
+ fm = flow.FlowMaster(None, s)
+ absfilepath = os.path.normcase(tutils.test_data.path("scripts/a.py"))
+ s = script.Script(absfilepath, fm)
+ assert os.path.isfile(s.argv[0])
+
+
diff --git a/test/tutils.py b/test/tutils.py
index d6332107..fb41d77a 100644
--- a/test/tutils.py
+++ b/test/tutils.py
@@ -1,8 +1,9 @@
import os, shutil, tempfile
from contextlib import contextmanager
from libmproxy import flow, utils, controller
-from libmproxy.console.flowview import FlowView
-from libmproxy.console import ConsoleState
+if os.name != "nt":
+ from libmproxy.console.flowview import FlowView
+ from libmproxy.console import ConsoleState
from netlib import certutils
from nose.plugins.skip import SkipTest
from mock import Mock