aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/mitmproxy/addons/test_clientplayback.py19
-rw-r--r--test/mitmproxy/addons/test_replace.py97
-rw-r--r--test/mitmproxy/addons/test_script.py79
-rw-r--r--test/mitmproxy/addons/test_serverplayback.py19
-rw-r--r--test/mitmproxy/addons/test_streamfile.py68
-rw-r--r--test/mitmproxy/addons/test_termstatus.py12
-rw-r--r--test/mitmproxy/data/addonscripts/addon.py4
-rw-r--r--test/mitmproxy/data/addonscripts/concurrent_decorator_class.py2
-rw-r--r--test/mitmproxy/data/addonscripts/concurrent_decorator_err.py2
-rw-r--r--test/mitmproxy/data/addonscripts/recorder.py2
-rw-r--r--test/mitmproxy/net/http/test_message.py6
-rw-r--r--test/mitmproxy/net/test_tcp.py31
-rw-r--r--test/mitmproxy/proxy/test_server.py3
-rw-r--r--test/mitmproxy/script/test_concurrent.py6
-rw-r--r--test/mitmproxy/test_addonmanager.py8
-rw-r--r--test/mitmproxy/test_certs.py205
-rw-r--r--test/mitmproxy/test_connections.py25
-rw-r--r--test/mitmproxy/test_examples.py49
-rw-r--r--test/mitmproxy/test_optmanager.py67
-rw-r--r--test/mitmproxy/tools/console/test_master.py4
-rw-r--r--test/mitmproxy/tools/test_dump.py25
-rw-r--r--test/mitmproxy/tservers.py2
-rw-r--r--test/mitmproxy/types/test_multidict.py14
-rw-r--r--test/mitmproxy/types/test_serializable.py19
-rw-r--r--test/pathod/language/test_base.py35
-rw-r--r--test/pathod/language/test_generators.py34
26 files changed, 429 insertions, 408 deletions
diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py
index 6b8b7c90..c22b3589 100644
--- a/test/mitmproxy/addons/test_clientplayback.py
+++ b/test/mitmproxy/addons/test_clientplayback.py
@@ -1,9 +1,7 @@
-import os
import pytest
from unittest import mock
from mitmproxy.test import tflow
-from mitmproxy.test import tutils
from mitmproxy import io
from mitmproxy import exceptions
@@ -49,14 +47,13 @@ class TestClientPlayback:
cp.tick()
assert cp.current_thread is None
- def test_configure(self):
+ def test_configure(self, tmpdir):
cp = clientplayback.ClientPlayback()
with taddons.context() as tctx:
- with tutils.tmpdir() as td:
- path = os.path.join(td, "flows")
- tdump(path, [tflow.tflow()])
- tctx.configure(cp, client_replay=[path])
- tctx.configure(cp, client_replay=[])
- tctx.configure(cp)
- with pytest.raises(exceptions.OptionsError):
- tctx.configure(cp, client_replay=["nonexistent"])
+ path = str(tmpdir.join("flows"))
+ tdump(path, [tflow.tflow()])
+ tctx.configure(cp, client_replay=[path])
+ tctx.configure(cp, client_replay=[])
+ tctx.configure(cp)
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(cp, client_replay=["nonexistent"])
diff --git a/test/mitmproxy/addons/test_replace.py b/test/mitmproxy/addons/test_replace.py
index 8c280c51..7d590b35 100644
--- a/test/mitmproxy/addons/test_replace.py
+++ b/test/mitmproxy/addons/test_replace.py
@@ -1,11 +1,8 @@
-import os.path
import pytest
-from mitmproxy.test import tflow
-from mitmproxy.test import tutils
-from .. import tservers
from mitmproxy.addons import replace
from mitmproxy.test import taddons
+from mitmproxy.test import tflow
class TestReplace:
@@ -34,7 +31,7 @@ class TestReplace:
with taddons.context() as tctx:
tctx.configure(
r,
- replacements = [
+ replacements=[
"/~q/foo/bar",
"/~s/foo/bar",
]
@@ -49,55 +46,57 @@ class TestReplace:
r.response(f)
assert f.response.content == b"bar"
-
-class TestUpstreamProxy(tservers.HTTPUpstreamProxyTest):
- ssl = False
-
def test_order(self):
- sa = replace.Replace()
- self.proxy.tmaster.addons.add(sa)
-
- self.proxy.tmaster.options.replacements = [
- "/~q/foo/bar",
- "/~q/bar/baz",
- "/~q/foo/oh noes!",
- "/~s/baz/ORLY"
- ]
- p = self.pathoc()
- with p.connect():
- req = p.request("get:'%s/p/418:b\"foo\"'" % self.server.urlbase)
- assert req.content == b"ORLY"
- assert req.status_code == 418
+ r = replace.Replace()
+ with taddons.context() as tctx:
+ tctx.configure(
+ r,
+ replacements=[
+ "/foo/bar",
+ "/bar/baz",
+ "/foo/oh noes!",
+ "/bar/oh noes!",
+ ]
+ )
+ f = tflow.tflow()
+ f.request.content = b"foo"
+ r.request(f)
+ assert f.request.content == b"baz"
class TestReplaceFile:
- def test_simple(self):
- r = replace.ReplaceFile()
- with tutils.tmpdir() as td:
- rp = os.path.join(td, "replacement")
- with open(rp, "w") as f:
- f.write("bar")
- with taddons.context() as tctx:
+ def test_simple(self, tmpdir):
+ r = replace.Replace()
+ with taddons.context() as tctx:
+ tmpfile = tmpdir.join("replacement")
+ tmpfile.write("bar")
+ tctx.configure(
+ r,
+ replacements=["/~q/foo/@" + str(tmpfile)]
+ )
+ f = tflow.tflow()
+ f.request.content = b"foo"
+ r.request(f)
+ assert f.request.content == b"bar"
+
+ def test_nonexistent(self, tmpdir):
+ r = replace.Replace()
+ with taddons.context() as tctx:
+ with pytest.raises(Exception, match="Invalid file path"):
tctx.configure(
r,
- replacement_files = [
- "/~q/foo/" + rp,
- "/~s/foo/" + rp,
- "/~b nonexistent/nonexistent/nonexistent",
- ]
+ replacements=["/~q/foo/@nonexistent"]
)
- f = tflow.tflow()
- f.request.content = b"foo"
- r.request(f)
- assert f.request.content == b"bar"
-
- f = tflow.tflow(resp=True)
- f.response.content = b"foo"
- r.response(f)
- assert f.response.content == b"bar"
- f = tflow.tflow()
- f.request.content = b"nonexistent"
- assert not tctx.master.event_log
- r.request(f)
- assert tctx.master.event_log
+ tmpfile = tmpdir.join("replacement")
+ tmpfile.write("bar")
+ tctx.configure(
+ r,
+ replacements=["/~q/foo/@" + str(tmpfile)]
+ )
+ tmpfile.remove()
+ f = tflow.tflow()
+ f.request.content = b"foo"
+ assert not tctx.master.event_log
+ r.request(f)
+ assert tctx.master.event_log
diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py
index 5f196ebf..4c1b2e43 100644
--- a/test/mitmproxy/addons/test_script.py
+++ b/test/mitmproxy/addons/test_script.py
@@ -68,13 +68,12 @@ class TestParseCommand:
with pytest.raises(ValueError):
script.parse_command(" ")
- def test_no_script_file(self):
+ def test_no_script_file(self, tmpdir):
with pytest.raises(Exception, match="not found"):
script.parse_command("notfound")
- with tutils.tmpdir() as dir:
- with pytest.raises(Exception, match="Not a file"):
- script.parse_command(dir)
+ with pytest.raises(Exception, match="Not a file"):
+ script.parse_command(str(tmpdir))
def test_parse_args(self):
with utils.chdir(tutils.test_data.dirname):
@@ -117,9 +116,7 @@ class TestScript:
)
)
sc.load_script()
- assert sc.ns.call_log == [
- ("solo", "start", (), {}),
- ]
+ assert sc.ns.call_log[0][0:2] == ("solo", "start")
sc.ns.call_log = []
f = tflow.tflow(resp=True)
@@ -128,28 +125,26 @@ class TestScript:
recf = sc.ns.call_log[0]
assert recf[1] == "request"
- def test_reload(self):
+ def test_reload(self, tmpdir):
with taddons.context() as tctx:
- with tutils.tmpdir():
- with open("foo.py", "w"):
- pass
- sc = script.Script("foo.py")
- tctx.configure(sc)
- for _ in range(100):
- with open("foo.py", "a") as f:
- f.write(".")
- sc.tick()
- time.sleep(0.1)
- if tctx.master.event_log:
- return
- raise AssertionError("Change event not detected.")
+ f = tmpdir.join("foo.py")
+ f.ensure(file=True)
+ sc = script.Script(str(f))
+ tctx.configure(sc)
+ for _ in range(100):
+ f.write(".")
+ sc.tick()
+ time.sleep(0.1)
+ if tctx.master.event_log:
+ return
+ raise AssertionError("Change event not detected.")
def test_exception(self):
with taddons.context() as tctx:
sc = script.Script(
tutils.test_data.path("mitmproxy/data/addonscripts/error.py")
)
- sc.start()
+ sc.start(tctx.options)
f = tflow.tflow(resp=True)
sc.request(f)
assert tctx.master.event_log[0][0] == "error"
@@ -165,7 +160,7 @@ class TestScript:
"mitmproxy/data/addonscripts/addon.py"
)
)
- sc.start()
+ sc.start(tctx.options)
tctx.configure(sc)
assert sc.ns.event_log == [
'scriptstart', 'addonstart', 'addonconfigure'
@@ -228,24 +223,31 @@ class TestScriptLoader:
assert len(m.addons) == 1
def test_dupes(self):
- o = options.Options(scripts=["one", "one"])
- m = master.Master(o, proxy.DummyServer())
sc = script.ScriptLoader()
- with pytest.raises(exceptions.OptionsError):
- m.addons.add(o, sc)
+ with taddons.context() as tctx:
+ tctx.master.addons.add(sc)
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(
+ sc,
+ scripts = ["one", "one"]
+ )
def test_nonexistent(self):
- o = options.Options(scripts=["nonexistent"])
- m = master.Master(o, proxy.DummyServer())
sc = script.ScriptLoader()
- with pytest.raises(exceptions.OptionsError):
- m.addons.add(o, sc)
+ with taddons.context() as tctx:
+ tctx.master.addons.add(sc)
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(
+ sc,
+ scripts = ["nonexistent"]
+ )
def test_order(self):
rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py")
sc = script.ScriptLoader()
with taddons.context() as tctx:
tctx.master.addons.add(sc)
+ sc.running()
tctx.configure(
sc,
scripts = [
@@ -256,9 +258,17 @@ class TestScriptLoader:
)
debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"]
assert debug == [
- ('debug', 'a start'), ('debug', 'a configure'),
- ('debug', 'b start'), ('debug', 'b configure'),
- ('debug', 'c start'), ('debug', 'c configure')
+ ('debug', 'a start'),
+ ('debug', 'a configure'),
+ ('debug', 'a running'),
+
+ ('debug', 'b start'),
+ ('debug', 'b configure'),
+ ('debug', 'b running'),
+
+ ('debug', 'c start'),
+ ('debug', 'c configure'),
+ ('debug', 'c running'),
]
tctx.master.event_log = []
tctx.configure(
@@ -287,4 +297,5 @@ class TestScriptLoader:
('debug', 'b done'),
('debug', 'x start'),
('debug', 'x configure'),
+ ('debug', 'x running'),
]
diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py
index e2afa516..54e4d281 100644
--- a/test/mitmproxy/addons/test_serverplayback.py
+++ b/test/mitmproxy/addons/test_serverplayback.py
@@ -1,10 +1,8 @@
-import os
import urllib
import pytest
-from mitmproxy.test import tutils
-from mitmproxy.test import tflow
from mitmproxy.test import taddons
+from mitmproxy.test import tflow
import mitmproxy.test.tutils
from mitmproxy.addons import serverplayback
@@ -19,15 +17,14 @@ def tdump(path, flows):
w.add(i)
-def test_config():
+def test_config(tmpdir):
s = serverplayback.ServerPlayback()
- with tutils.tmpdir() as p:
- with taddons.context() as tctx:
- fpath = os.path.join(p, "flows")
- tdump(fpath, [tflow.tflow(resp=True)])
- tctx.configure(s, server_replay=[fpath])
- with pytest.raises(exceptions.OptionsError):
- tctx.configure(s, server_replay=[p])
+ with taddons.context() as tctx:
+ fpath = str(tmpdir.join("flows"))
+ tdump(fpath, [tflow.tflow(resp=True)])
+ tctx.configure(s, server_replay=[fpath])
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(s, server_replay=[str(tmpdir)])
def test_tick():
diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py
index 4105c1fc..3f78521c 100644
--- a/test/mitmproxy/addons/test_streamfile.py
+++ b/test/mitmproxy/addons/test_streamfile.py
@@ -1,9 +1,7 @@
-import os.path
import pytest
-from mitmproxy.test import tflow
-from mitmproxy.test import tutils
from mitmproxy.test import taddons
+from mitmproxy.test import tflow
from mitmproxy import io
from mitmproxy import exceptions
@@ -11,19 +9,17 @@ from mitmproxy import options
from mitmproxy.addons import streamfile
-def test_configure():
+def test_configure(tmpdir):
sa = streamfile.StreamFile()
with taddons.context(options=options.Options()) as tctx:
- with tutils.tmpdir() as tdir:
- p = os.path.join(tdir, "foo")
- with pytest.raises(exceptions.OptionsError):
- tctx.configure(sa, streamfile=tdir)
- with pytest.raises(Exception, match="Invalid filter"):
- tctx.configure(sa, streamfile=p, filtstr="~~")
- tctx.configure(sa, filtstr="foo")
- assert sa.filt
- tctx.configure(sa, filtstr=None)
- assert not sa.filt
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(sa, streamfile=str(tmpdir))
+ with pytest.raises(Exception, match="Invalid filter"):
+ tctx.configure(sa, streamfile=str(tmpdir.join("foo")), filtstr="~~")
+ tctx.configure(sa, filtstr="foo")
+ assert sa.filt
+ tctx.configure(sa, filtstr=None)
+ assert not sa.filt
def rd(p):
@@ -31,36 +27,34 @@ def rd(p):
return list(x.stream())
-def test_tcp():
+def test_tcp(tmpdir):
sa = streamfile.StreamFile()
with taddons.context() as tctx:
- with tutils.tmpdir() as tdir:
- p = os.path.join(tdir, "foo")
- tctx.configure(sa, streamfile=p)
+ p = str(tmpdir.join("foo"))
+ tctx.configure(sa, streamfile=p)
- tt = tflow.ttcpflow()
- sa.tcp_start(tt)
- sa.tcp_end(tt)
- tctx.configure(sa, streamfile=None)
- assert rd(p)
+ tt = tflow.ttcpflow()
+ sa.tcp_start(tt)
+ sa.tcp_end(tt)
+ tctx.configure(sa, streamfile=None)
+ assert rd(p)
-def test_simple():
+def test_simple(tmpdir):
sa = streamfile.StreamFile()
with taddons.context() as tctx:
- with tutils.tmpdir() as tdir:
- p = os.path.join(tdir, "foo")
+ p = str(tmpdir.join("foo"))
- tctx.configure(sa, streamfile=p)
+ tctx.configure(sa, streamfile=p)
- f = tflow.tflow(resp=True)
- sa.request(f)
- sa.response(f)
- tctx.configure(sa, streamfile=None)
- assert rd(p)[0].response
+ f = tflow.tflow(resp=True)
+ sa.request(f)
+ sa.response(f)
+ tctx.configure(sa, streamfile=None)
+ assert rd(p)[0].response
- tctx.configure(sa, streamfile="+" + p)
- f = tflow.tflow()
- sa.request(f)
- tctx.configure(sa, streamfile=None)
- assert not rd(p)[1].response
+ tctx.configure(sa, streamfile="+" + p)
+ f = tflow.tflow()
+ sa.request(f)
+ tctx.configure(sa, streamfile=None)
+ assert not rd(p)[1].response
diff --git a/test/mitmproxy/addons/test_termstatus.py b/test/mitmproxy/addons/test_termstatus.py
new file mode 100644
index 00000000..01c14814
--- /dev/null
+++ b/test/mitmproxy/addons/test_termstatus.py
@@ -0,0 +1,12 @@
+from mitmproxy.addons import termstatus
+from mitmproxy.test import taddons
+
+
+def test_configure():
+ ts = termstatus.TermStatus()
+ with taddons.context() as ctx:
+ ts.running()
+ assert not ctx.master.event_log
+ ctx.configure(ts, server=True)
+ ts.running()
+ assert ctx.master.event_log
diff --git a/test/mitmproxy/data/addonscripts/addon.py b/test/mitmproxy/data/addonscripts/addon.py
index 84173cb6..f34f41cb 100644
--- a/test/mitmproxy/data/addonscripts/addon.py
+++ b/test/mitmproxy/data/addonscripts/addon.py
@@ -6,7 +6,7 @@ class Addon:
def event_log(self):
return event_log
- def start(self):
+ def start(self, opts):
event_log.append("addonstart")
def configure(self, options, updated):
@@ -17,6 +17,6 @@ def configure(options, updated):
event_log.append("addonconfigure")
-def start():
+def start(opts):
event_log.append("scriptstart")
return Addon()
diff --git a/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py b/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py
index bd047c99..10ba24cd 100644
--- a/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py
+++ b/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py
@@ -9,5 +9,5 @@ class ConcurrentClass:
time.sleep(0.1)
-def start():
+def start(opts):
return ConcurrentClass()
diff --git a/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py b/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py
index 756869c8..7bc28182 100644
--- a/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py
+++ b/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py
@@ -2,5 +2,5 @@ from mitmproxy.script import concurrent
@concurrent
-def start():
+def start(opts):
pass
diff --git a/test/mitmproxy/data/addonscripts/recorder.py b/test/mitmproxy/data/addonscripts/recorder.py
index 6b9b6ea8..aff524a8 100644
--- a/test/mitmproxy/data/addonscripts/recorder.py
+++ b/test/mitmproxy/data/addonscripts/recorder.py
@@ -22,5 +22,5 @@ class CallLogger:
raise AttributeError
-def start():
+def start(opts):
return CallLogger(*sys.argv[1:])
diff --git a/test/mitmproxy/net/http/test_message.py b/test/mitmproxy/net/http/test_message.py
index 034bd600..b75bc7c2 100644
--- a/test/mitmproxy/net/http/test_message.py
+++ b/test/mitmproxy/net/http/test_message.py
@@ -38,14 +38,12 @@ def _test_decoded_attr(message, attr):
class TestMessageData:
- def test_eq_ne(self):
+ def test_eq(self):
data = tutils.tresp(timestamp_start=42, timestamp_end=42).data
same = tutils.tresp(timestamp_start=42, timestamp_end=42).data
assert data == same
- assert not data != same
other = tutils.tresp(content=b"foo").data
- assert not data == other
assert data != other
assert data != 0
@@ -61,10 +59,8 @@ class TestMessage:
resp = tutils.tresp(timestamp_start=42, timestamp_end=42)
same = tutils.tresp(timestamp_start=42, timestamp_end=42)
assert resp == same
- assert not resp != same
other = tutils.tresp(timestamp_start=0, timestamp_end=0)
- assert not resp == other
assert resp != other
assert resp != 0
diff --git a/test/mitmproxy/net/test_tcp.py b/test/mitmproxy/net/test_tcp.py
index cf010f6e..8b26784a 100644
--- a/test/mitmproxy/net/test_tcp.py
+++ b/test/mitmproxy/net/test_tcp.py
@@ -11,8 +11,8 @@ from OpenSSL import SSL
from mitmproxy import certs
from mitmproxy.net import tcp
-from mitmproxy.test import tutils
from mitmproxy import exceptions
+from mitmproxy.test import tutils
from . import tservers
from ...conftest import requires_alpn
@@ -783,25 +783,24 @@ class TestSSLKeyLogger(tservers.ServerTestBase):
cipher_list="AES256-SHA"
)
- def test_log(self):
+ def test_log(self, tmpdir):
testval = b"echo!\n"
_logfun = tcp.log_ssl_key
- with tutils.tmpdir() as d:
- logfile = os.path.join(d, "foo", "bar", "logfile")
- tcp.log_ssl_key = tcp.SSLKeyLogger(logfile)
+ logfile = str(tmpdir.join("foo", "bar", "logfile"))
+ tcp.log_ssl_key = tcp.SSLKeyLogger(logfile)
- c = tcp.TCPClient(("127.0.0.1", self.port))
- with c.connect():
- c.convert_to_ssl()
- c.wfile.write(testval)
- c.wfile.flush()
- assert c.rfile.readline() == testval
- c.finish()
-
- tcp.log_ssl_key.close()
- with open(logfile, "rb") as f:
- assert f.read().count(b"CLIENT_RANDOM") == 2
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ with c.connect():
+ c.convert_to_ssl()
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
+ c.finish()
+
+ tcp.log_ssl_key.close()
+ with open(logfile, "rb") as f:
+ assert f.read().count(b"CLIENT_RANDOM") == 2
tcp.log_ssl_key = _logfun
diff --git a/test/mitmproxy/proxy/test_server.py b/test/mitmproxy/proxy/test_server.py
index aa45761a..16efe415 100644
--- a/test/mitmproxy/proxy/test_server.py
+++ b/test/mitmproxy/proxy/test_server.py
@@ -302,6 +302,9 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin):
class TestHTTPAuth(tservers.HTTPProxyTest):
def test_auth(self):
self.master.addons.add(proxyauth.ProxyAuth())
+ self.master.addons.configure_all(
+ self.master.options, self.master.options.keys()
+ )
self.master.options.proxyauth = "test:test"
assert self.pathod("202").status_code == 407
p = self.pathoc()
diff --git a/test/mitmproxy/script/test_concurrent.py b/test/mitmproxy/script/test_concurrent.py
index e81c023d..a9b6f0c4 100644
--- a/test/mitmproxy/script/test_concurrent.py
+++ b/test/mitmproxy/script/test_concurrent.py
@@ -24,7 +24,7 @@ class TestConcurrent(tservers.MasterTest):
"mitmproxy/data/addonscripts/concurrent_decorator.py"
)
)
- sc.start()
+ sc.start(tctx.options)
f1, f2 = tflow.tflow(), tflow.tflow()
tctx.cycle(sc, f1)
@@ -42,7 +42,7 @@ class TestConcurrent(tservers.MasterTest):
"mitmproxy/data/addonscripts/concurrent_decorator_err.py"
)
)
- sc.start()
+ sc.start(tctx.options)
assert "decorator not supported" in tctx.master.event_log[0][1]
def test_concurrent_class(self):
@@ -52,7 +52,7 @@ class TestConcurrent(tservers.MasterTest):
"mitmproxy/data/addonscripts/concurrent_decorator_class.py"
)
)
- sc.start()
+ sc.start(tctx.options)
f1, f2 = tflow.tflow(), tflow.tflow()
tctx.cycle(sc, f1)
diff --git a/test/mitmproxy/test_addonmanager.py b/test/mitmproxy/test_addonmanager.py
index 17402e26..3e5f71c6 100644
--- a/test/mitmproxy/test_addonmanager.py
+++ b/test/mitmproxy/test_addonmanager.py
@@ -10,12 +10,12 @@ from mitmproxy import proxy
class TAddon:
def __init__(self, name):
self.name = name
- self.noop_member = True
+ self.tick = True
def __repr__(self):
return "Addon(%s)" % self.name
- def noop(self):
+ def done(self):
pass
@@ -30,6 +30,6 @@ def test_simple():
assert not a.chain
a.add(TAddon("one"))
- a("noop")
+ a("done")
with pytest.raises(exceptions.AddonError):
- a("noop_member")
+ a("tick")
diff --git a/test/mitmproxy/test_certs.py b/test/mitmproxy/test_certs.py
index 9bd3ad25..88c49561 100644
--- a/test/mitmproxy/test_certs.py
+++ b/test/mitmproxy/test_certs.py
@@ -34,118 +34,106 @@ from mitmproxy.test import tutils
class TestCertStore:
- def test_create_explicit(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- assert ca.get_cert(b"foo", [])
-
- ca2 = certs.CertStore.from_store(d, "test")
- assert ca2.get_cert(b"foo", [])
-
- assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
-
- def test_create_no_common_name(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- assert ca.get_cert(None, [])[0].cn is None
-
- def test_create_tmp(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- assert ca.get_cert(b"foo.com", [])
- assert ca.get_cert(b"foo.com", [])
- assert ca.get_cert(b"*.foo.com", [])
-
- r = ca.get_cert(b"*.foo.com", [])
- assert r[1] == ca.default_privatekey
-
- def test_sans(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- c1 = ca.get_cert(b"foo.com", [b"*.bar.com"])
- ca.get_cert(b"foo.bar.com", [])
- # assert c1 == c2
- c3 = ca.get_cert(b"bar.com", [])
- assert not c1 == c3
-
- def test_sans_change(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- ca.get_cert(b"foo.com", [b"*.bar.com"])
- cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"])
- assert b"*.baz.com" in cert.altnames
-
- def test_expire(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- ca.STORE_CAP = 3
- ca.get_cert(b"one.com", [])
- ca.get_cert(b"two.com", [])
- ca.get_cert(b"three.com", [])
-
- assert (b"one.com", ()) in ca.certs
- assert (b"two.com", ()) in ca.certs
- assert (b"three.com", ()) in ca.certs
-
- ca.get_cert(b"one.com", [])
-
- assert (b"one.com", ()) in ca.certs
- assert (b"two.com", ()) in ca.certs
- assert (b"three.com", ()) in ca.certs
-
- ca.get_cert(b"four.com", [])
-
- assert (b"one.com", ()) not in ca.certs
- assert (b"two.com", ()) in ca.certs
- assert (b"three.com", ()) in ca.certs
- assert (b"four.com", ()) in ca.certs
-
- def test_overrides(self):
- with tutils.tmpdir() as d:
- ca1 = certs.CertStore.from_store(os.path.join(d, "ca1"), "test")
- ca2 = certs.CertStore.from_store(os.path.join(d, "ca2"), "test")
- assert not ca1.default_ca.get_serial_number(
- ) == ca2.default_ca.get_serial_number()
-
- dc = ca2.get_cert(b"foo.com", [b"sans.example.com"])
- dcp = os.path.join(d, "dc")
- f = open(dcp, "wb")
- f.write(dc[0].to_pem())
- f.close()
- ca1.add_cert_file(b"foo.com", dcp)
-
- ret = ca1.get_cert(b"foo.com", [])
- assert ret[0].serial == dc[0].serial
-
- def test_create_dhparams(self):
- with tutils.tmpdir() as d:
- filename = os.path.join(d, "dhparam.pem")
- certs.CertStore.load_dhparam(filename)
- assert os.path.exists(filename)
+ def test_create_explicit(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ assert ca.get_cert(b"foo", [])
+
+ ca2 = certs.CertStore.from_store(str(tmpdir), "test")
+ assert ca2.get_cert(b"foo", [])
+
+ assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
+
+ def test_create_no_common_name(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ assert ca.get_cert(None, [])[0].cn is None
+
+ def test_create_tmp(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ assert ca.get_cert(b"foo.com", [])
+ assert ca.get_cert(b"foo.com", [])
+ assert ca.get_cert(b"*.foo.com", [])
+
+ r = ca.get_cert(b"*.foo.com", [])
+ assert r[1] == ca.default_privatekey
+
+ def test_sans(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ c1 = ca.get_cert(b"foo.com", [b"*.bar.com"])
+ ca.get_cert(b"foo.bar.com", [])
+ # assert c1 == c2
+ c3 = ca.get_cert(b"bar.com", [])
+ assert not c1 == c3
+
+ def test_sans_change(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ ca.get_cert(b"foo.com", [b"*.bar.com"])
+ cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"])
+ assert b"*.baz.com" in cert.altnames
+
+ def test_expire(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ ca.STORE_CAP = 3
+ ca.get_cert(b"one.com", [])
+ ca.get_cert(b"two.com", [])
+ ca.get_cert(b"three.com", [])
+
+ assert (b"one.com", ()) in ca.certs
+ assert (b"two.com", ()) in ca.certs
+ assert (b"three.com", ()) in ca.certs
+
+ ca.get_cert(b"one.com", [])
+
+ assert (b"one.com", ()) in ca.certs
+ assert (b"two.com", ()) in ca.certs
+ assert (b"three.com", ()) in ca.certs
+
+ ca.get_cert(b"four.com", [])
+
+ assert (b"one.com", ()) not in ca.certs
+ assert (b"two.com", ()) in ca.certs
+ assert (b"three.com", ()) in ca.certs
+ assert (b"four.com", ()) in ca.certs
+
+ def test_overrides(self, tmpdir):
+ ca1 = certs.CertStore.from_store(str(tmpdir.join("ca1")), "test")
+ ca2 = certs.CertStore.from_store(str(tmpdir.join("ca2")), "test")
+ assert not ca1.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
+
+ dc = ca2.get_cert(b"foo.com", [b"sans.example.com"])
+ dcp = tmpdir.join("dc")
+ dcp.write(dc[0].to_pem())
+ ca1.add_cert_file(b"foo.com", str(dcp))
+
+ ret = ca1.get_cert(b"foo.com", [])
+ assert ret[0].serial == dc[0].serial
+
+ def test_create_dhparams(self, tmpdir):
+ filename = str(tmpdir.join("dhparam.pem"))
+ certs.CertStore.load_dhparam(filename)
+ assert os.path.exists(filename)
class TestDummyCert:
- def test_with_ca(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- r = certs.dummy_cert(
- ca.default_privatekey,
- ca.default_ca,
- b"foo.com",
- [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"]
- )
- assert r.cn == b"foo.com"
- assert r.altnames == [b'one.com', b'two.com', b'*.three.com']
-
- r = certs.dummy_cert(
- ca.default_privatekey,
- ca.default_ca,
- None,
- []
- )
- assert r.cn is None
- assert r.altnames == []
+ def test_with_ca(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ r = certs.dummy_cert(
+ ca.default_privatekey,
+ ca.default_ca,
+ b"foo.com",
+ [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"]
+ )
+ assert r.cn == b"foo.com"
+ assert r.altnames == [b'one.com', b'two.com', b'*.three.com']
+
+ r = certs.dummy_cert(
+ ca.default_privatekey,
+ ca.default_ca,
+ None,
+ []
+ )
+ assert r.cn is None
+ assert r.altnames == []
class TestSSLCert:
@@ -172,7 +160,6 @@ class TestSSLCert:
assert c2.to_pem()
assert c2.has_expired is not None
- assert not c1 == c2
assert c1 != c2
def test_err_broken_sans(self):
diff --git a/test/mitmproxy/test_connections.py b/test/mitmproxy/test_connections.py
index 0083f57c..67a6552f 100644
--- a/test/mitmproxy/test_connections.py
+++ b/test/mitmproxy/test_connections.py
@@ -66,8 +66,18 @@ class TestClientConnection:
assert c.timestamp_start == 42
c3 = c.copy()
+ assert c3.get_state() != c.get_state()
+ c.id = c3.id = "foo"
assert c3.get_state() == c.get_state()
+ def test_eq(self):
+ c = tflow.tclient_conn()
+ c2 = c.copy()
+ assert c == c
+ assert c != c2
+ assert c != 42
+ assert hash(c) != hash(c2)
+
class TestServerConnection:
@@ -147,6 +157,21 @@ class TestServerConnection:
with pytest.raises(ValueError, matches='sni must be str, not '):
c.establish_ssl(None, b'foobar')
+ def test_state(self):
+ c = tflow.tserver_conn()
+ c2 = c.copy()
+ assert c2.get_state() != c.get_state()
+ c.id = c2.id = "foo"
+ assert c2.get_state() == c.get_state()
+
+ def test_eq(self):
+ c = tflow.tserver_conn()
+ c2 = c.copy()
+ assert c == c
+ assert c != c2
+ assert c != 42
+ assert hash(c) != hash(c2)
+
class TestClientConnectionTLS:
diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py
index 668d0d4a..f20e0c8c 100644
--- a/test/mitmproxy/test_examples.py
+++ b/test/mitmproxy/test_examples.py
@@ -1,5 +1,4 @@
import json
-import os
import shlex
import pytest
@@ -142,30 +141,26 @@ class TestHARDump:
with pytest.raises(ScriptError):
tscript("complex/har_dump.py")
- def test_simple(self):
- with tutils.tmpdir() as tdir:
- path = os.path.join(tdir, "somefile")
+ def test_simple(self, tmpdir):
+ path = str(tmpdir.join("somefile"))
- m, sc = tscript("complex/har_dump.py", shlex.quote(path))
- m.addons.invoke(m, "response", self.flow())
- m.addons.remove(sc)
-
- with open(path, "r") as inp:
- har = json.load(inp)
+ m, sc = tscript("complex/har_dump.py", shlex.quote(path))
+ m.addons.invoke(m, "response", self.flow())
+ m.addons.remove(sc)
+ with open(path, "r") as inp:
+ har = json.load(inp)
assert len(har["log"]["entries"]) == 1
- def test_base64(self):
- with tutils.tmpdir() as tdir:
- path = os.path.join(tdir, "somefile")
-
- m, sc = tscript("complex/har_dump.py", shlex.quote(path))
- m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10))
- m.addons.remove(sc)
+ def test_base64(self, tmpdir):
+ path = str(tmpdir.join("somefile"))
- with open(path, "r") as inp:
- har = json.load(inp)
+ m, sc = tscript("complex/har_dump.py", shlex.quote(path))
+ m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10))
+ m.addons.remove(sc)
+ with open(path, "r") as inp:
+ har = json.load(inp)
assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64"
def test_format_cookies(self):
@@ -187,7 +182,7 @@ class TestHARDump:
f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0]
assert f['expires']
- def test_binary(self):
+ def test_binary(self, tmpdir):
f = self.flow()
f.request.method = "POST"
@@ -196,14 +191,12 @@ class TestHARDump:
f.response.headers["random-junk"] = bytes(range(256))
f.response.content = bytes(range(256))
- with tutils.tmpdir() as tdir:
- path = os.path.join(tdir, "somefile")
-
- m, sc = tscript("complex/har_dump.py", shlex.quote(path))
- m.addons.invoke(m, "response", f)
- m.addons.remove(sc)
+ path = str(tmpdir.join("somefile"))
- with open(path, "r") as inp:
- har = json.load(inp)
+ m, sc = tscript("complex/har_dump.py", shlex.quote(path))
+ m.addons.invoke(m, "response", f)
+ m.addons.remove(sc)
+ with open(path, "r") as inp:
+ har = json.load(inp)
assert len(har["log"]["entries"]) == 1
diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py
index db33cddd..df392829 100644
--- a/test/mitmproxy/test_optmanager.py
+++ b/test/mitmproxy/test_optmanager.py
@@ -1,5 +1,4 @@
import copy
-import os
import pytest
import typing
import argparse
@@ -7,7 +6,6 @@ import argparse
from mitmproxy import options
from mitmproxy import optmanager
from mitmproxy import exceptions
-from mitmproxy.test import tutils
class TO(optmanager.OptManager):
@@ -85,10 +83,11 @@ def test_options():
with pytest.raises(TypeError):
TO(nonexistent = "value")
- with pytest.raises(Exception, match="No such option"):
+ with pytest.raises(Exception, match="Unknown options"):
o.nonexistent = "value"
- with pytest.raises(Exception, match="No such option"):
+ with pytest.raises(Exception, match="Unknown options"):
o.update(nonexistent = "value")
+ assert o.update_known(nonexistent = "value") == {"nonexistent": "value"}
rec = []
@@ -201,62 +200,63 @@ def test_simple():
def test_serialize():
o = TD2()
o.three = "set"
- assert "dfour" in o.serialize(None, defaults=True)
+ assert "dfour" in optmanager.serialize(o, None, defaults=True)
- data = o.serialize(None)
+ data = optmanager.serialize(o, None)
assert "dfour" not in data
o2 = TD2()
- o2.load(data)
+ optmanager.load(o2, data)
assert o2 == o
t = """
unknown: foo
"""
- data = o.serialize(t)
+ data = optmanager.serialize(o, t)
o2 = TD2()
- o2.load(data)
+ optmanager.load(o2, data)
assert o2 == o
t = "invalid: foo\ninvalid"
with pytest.raises(Exception, match="Config error"):
- o2.load(t)
+ optmanager.load(o2, t)
t = "invalid"
with pytest.raises(Exception, match="Config error"):
- o2.load(t)
+ optmanager.load(o2, t)
t = ""
- o2.load(t)
-
- with pytest.raises(exceptions.OptionsError, matches='No such option: foobar'):
- o2.load("foobar: '123'")
+ optmanager.load(o2, t)
+ assert optmanager.load(o2, "foobar: '123'") == {"foobar": "123"}
def test_serialize_defaults():
o = options.Options()
- assert o.serialize(None, defaults=True)
+ assert optmanager.serialize(o, None, defaults=True)
-def test_saving():
+def test_saving(tmpdir):
o = TD2()
o.three = "set"
- with tutils.tmpdir() as tdir:
- dst = os.path.join(tdir, "conf")
- o.save(dst, defaults=True)
+ dst = str(tmpdir.join("conf"))
+ optmanager.save(o, dst, defaults=True)
+
+ o2 = TD2()
+ optmanager.load_paths(o2, dst)
+ o2.three = "foo"
+ optmanager.save(o2, dst, defaults=True)
- o2 = TD2()
- o2.load_paths(dst)
- o2.three = "foo"
- o2.save(dst, defaults=True)
+ optmanager.load_paths(o, dst)
+ assert o.three == "foo"
- o.load_paths(dst)
- assert o.three == "foo"
+ with open(dst, 'a') as f:
+ f.write("foobar: '123'")
+ assert optmanager.load_paths(o, dst) == {"foobar": "123"}
- with open(dst, 'a') as f:
- f.write("foobar: '123'")
- with pytest.raises(exceptions.OptionsError, matches=''):
- o.load_paths(dst)
+ with open(dst, 'a') as f:
+ f.write("'''")
+ with pytest.raises(exceptions.OptionsError):
+ optmanager.load_paths(o, dst)
def test_merge():
@@ -283,9 +283,9 @@ def test_option():
assert o2 != o
-def test_dump():
+def test_dump_defaults():
o = options.Options()
- assert optmanager.dump(o)
+ assert optmanager.dump_defaults(o)
class TTypes(optmanager.OptManager):
@@ -349,3 +349,6 @@ def test_set():
assert opts.seqstr == ["foo", "bar"]
opts.set("seqstr")
assert opts.seqstr == []
+
+ with pytest.raises(exceptions.OptionsError):
+ opts.set("nonexistent=wobble")
diff --git a/test/mitmproxy/tools/console/test_master.py b/test/mitmproxy/tools/console/test_master.py
index 0bf3734b..6c716ad1 100644
--- a/test/mitmproxy/tools/console/test_master.py
+++ b/test/mitmproxy/tools/console/test_master.py
@@ -28,7 +28,9 @@ class TestMaster(tservers.MasterTest):
if "verbosity" not in opts:
opts["verbosity"] = 1
o = options.Options(**opts)
- return console.master.ConsoleMaster(o, proxy.DummyServer())
+ m = console.master.ConsoleMaster(o, proxy.DummyServer())
+ m.addons.configure_all(o, o.keys())
+ return m
def test_basic(self):
m = self.mkmaster()
diff --git a/test/mitmproxy/tools/test_dump.py b/test/mitmproxy/tools/test_dump.py
index 2542ec4b..a15bf583 100644
--- a/test/mitmproxy/tools/test_dump.py
+++ b/test/mitmproxy/tools/test_dump.py
@@ -1,4 +1,3 @@
-import os
import pytest
from unittest import mock
@@ -9,7 +8,6 @@ from mitmproxy import controller
from mitmproxy import options
from mitmproxy.tools import dump
-from mitmproxy.test import tutils
from .. import tservers
@@ -19,18 +17,17 @@ class TestDumpMaster(tservers.MasterTest):
m = dump.DumpMaster(o, proxy.DummyServer(), with_termlog=False, with_dumper=False)
return m
- def test_read(self):
- with tutils.tmpdir() as t:
- p = os.path.join(t, "read")
- self.flowfile(p)
- self.dummy_cycle(
- self.mkmaster(None, rfile=p),
- 1, b"",
- )
- with pytest.raises(exceptions.OptionsError):
- self.mkmaster(None, rfile="/nonexistent")
- with pytest.raises(exceptions.OptionsError):
- self.mkmaster(None, rfile="test_dump.py")
+ def test_read(self, tmpdir):
+ p = str(tmpdir.join("read"))
+ self.flowfile(p)
+ self.dummy_cycle(
+ self.mkmaster(None, rfile=p),
+ 1, b"",
+ )
+ with pytest.raises(exceptions.OptionsError):
+ self.mkmaster(None, rfile="/nonexistent")
+ with pytest.raises(exceptions.OptionsError):
+ self.mkmaster(None, rfile="test_dump.py")
def test_has_error(self):
m = self.mkmaster(None)
diff --git a/test/mitmproxy/tservers.py b/test/mitmproxy/tservers.py
index a8aaa358..c47411ee 100644
--- a/test/mitmproxy/tservers.py
+++ b/test/mitmproxy/tservers.py
@@ -79,6 +79,8 @@ class TestMaster(master.Master):
self.state = TestState()
self.addons.add(self.state)
self.addons.add(*addons)
+ self.addons.configure_all(self.options, self.options.keys())
+ self.addons.invoke_all_with_context("running")
def clear_log(self):
self.tlog = []
diff --git a/test/mitmproxy/types/test_multidict.py b/test/mitmproxy/types/test_multidict.py
index 9b13c5cd..c76cd753 100644
--- a/test/mitmproxy/types/test_multidict.py
+++ b/test/mitmproxy/types/test_multidict.py
@@ -93,11 +93,6 @@ class TestMultiDict:
md1.fields = md1.fields[1:] + md1.fields[:1]
assert not (md1 == md2)
- def test_ne(self):
- assert not TMultiDict() != TMultiDict()
- assert TMultiDict() != self._multi()
- assert TMultiDict() != 42
-
def test_hash(self):
"""
If a class defines mutable objects and implements an __eq__() method,
@@ -205,3 +200,12 @@ class TestMultiDictView:
tv["c"] = "b"
assert p.vals == (("a", "b"), ("c", "b"))
assert tv["a"] == "b"
+
+ def test_copy(self):
+ p = TParent()
+ tv = multidict.MultiDictView(p.getter, p.setter)
+ c = tv.copy()
+ assert isinstance(c, multidict.MultiDict)
+ assert tv.items() == c.items()
+ c["foo"] = "bar"
+ assert tv.items() != c.items()
diff --git a/test/mitmproxy/types/test_serializable.py b/test/mitmproxy/types/test_serializable.py
index dd4a3778..390d17e1 100644
--- a/test/mitmproxy/types/test_serializable.py
+++ b/test/mitmproxy/types/test_serializable.py
@@ -1,3 +1,5 @@
+import copy
+
from mitmproxy.types import serializable
@@ -6,17 +8,17 @@ class SerializableDummy(serializable.Serializable):
self.i = i
def get_state(self):
- return self.i
+ return copy.copy(self.i)
def set_state(self, i):
self.i = i
- def from_state(self, state):
- return type(self)(state)
+ @classmethod
+ def from_state(cls, state):
+ return cls(state)
class TestSerializable:
-
def test_copy(self):
a = SerializableDummy(42)
assert a.i == 42
@@ -26,3 +28,12 @@ class TestSerializable:
a.set_state(1)
assert a.i == 1
assert b.i == 42
+
+ def test_copy_id(self):
+ a = SerializableDummy({
+ "id": "foo",
+ "foo": 42
+ })
+ b = a.copy()
+ assert a.get_state()["id"] != b.get_state()["id"]
+ assert a.get_state()["foo"] == b.get_state()["foo"]
diff --git a/test/pathod/language/test_base.py b/test/pathod/language/test_base.py
index 85e9e53b..ec460b07 100644
--- a/test/pathod/language/test_base.py
+++ b/test/pathod/language/test_base.py
@@ -1,11 +1,8 @@
-import os
import pytest
from pathod import language
from pathod.language import base, exceptions
-from mitmproxy.test import tutils
-
def parse_request(s):
return language.parse_pathoc(s).next()
@@ -137,24 +134,22 @@ class TestTokValueFile:
v = base.TokValue.parseString("<path")[0]
assert v.path == "path"
- def test_access_control(self):
+ def test_access_control(self, tmpdir):
v = base.TokValue.parseString("<path")[0]
- with tutils.tmpdir() as t:
- p = os.path.join(t, "path")
- with open(p, "wb") as f:
- f.write(b"x" * 10000)
-
- assert v.get_generator(language.Settings(staticdir=t))
-
- v = base.TokValue.parseString("<path2")[0]
- with pytest.raises(exceptions.FileAccessDenied):
- v.get_generator(language.Settings(staticdir=t))
- with pytest.raises(Exception, match="access disabled"):
- v.get_generator(language.Settings())
-
- v = base.TokValue.parseString("</outside")[0]
- with pytest.raises(Exception, match="outside"):
- v.get_generator(language.Settings(staticdir=t))
+ f = tmpdir.join("path")
+ f.write(b"x" * 10000)
+
+ assert v.get_generator(language.Settings(staticdir=str(tmpdir)))
+
+ v = base.TokValue.parseString("<path2")[0]
+ with pytest.raises(exceptions.FileAccessDenied):
+ v.get_generator(language.Settings(staticdir=str(tmpdir)))
+ with pytest.raises(Exception, match="access disabled"):
+ v.get_generator(language.Settings())
+
+ v = base.TokValue.parseString("</outside")[0]
+ with pytest.raises(Exception, match="outside"):
+ v.get_generator(language.Settings(staticdir=str(tmpdir)))
def test_spec(self):
v = base.TokValue.parseString("<'one two'")[0]
diff --git a/test/pathod/language/test_generators.py b/test/pathod/language/test_generators.py
index b3ce0335..dc15aaa1 100644
--- a/test/pathod/language/test_generators.py
+++ b/test/pathod/language/test_generators.py
@@ -1,7 +1,4 @@
-import os
-
from pathod.language import generators
-from mitmproxy.test import tutils
def test_randomgenerator():
@@ -15,23 +12,20 @@ def test_randomgenerator():
assert len(g[1000:1001]) == 0
-def test_filegenerator():
- with tutils.tmpdir() as t:
- path = os.path.join(t, "foo")
- f = open(path, "wb")
- f.write(b"x" * 10000)
- f.close()
- g = generators.FileGenerator(path)
- assert len(g) == 10000
- assert g[0] == b"x"
- assert g[-1] == b"x"
- assert g[0:5] == b"xxxxx"
- assert len(g[1:10]) == 9
- assert len(g[10000:10001]) == 0
- assert repr(g)
- # remove all references to FileGenerator instance to close the file
- # handle.
- del g
+def test_filegenerator(tmpdir):
+ f = tmpdir.join("foo")
+ f.write(b"x" * 10000)
+ g = generators.FileGenerator(str(f))
+ assert len(g) == 10000
+ assert g[0] == b"x"
+ assert g[-1] == b"x"
+ assert g[0:5] == b"xxxxx"
+ assert len(g[1:10]) == 9
+ assert len(g[10000:10001]) == 0
+ assert repr(g)
+ # remove all references to FileGenerator instance to close the file
+ # handle.
+ del g
def test_transform_generator():