diff options
Diffstat (limited to 'test')
26 files changed, 429 insertions, 408 deletions
diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py index 6b8b7c90..c22b3589 100644 --- a/test/mitmproxy/addons/test_clientplayback.py +++ b/test/mitmproxy/addons/test_clientplayback.py @@ -1,9 +1,7 @@ -import os import pytest from unittest import mock from mitmproxy.test import tflow -from mitmproxy.test import tutils from mitmproxy import io from mitmproxy import exceptions @@ -49,14 +47,13 @@ class TestClientPlayback: cp.tick() assert cp.current_thread is None - def test_configure(self): + def test_configure(self, tmpdir): cp = clientplayback.ClientPlayback() with taddons.context() as tctx: - with tutils.tmpdir() as td: - path = os.path.join(td, "flows") - tdump(path, [tflow.tflow()]) - tctx.configure(cp, client_replay=[path]) - tctx.configure(cp, client_replay=[]) - tctx.configure(cp) - with pytest.raises(exceptions.OptionsError): - tctx.configure(cp, client_replay=["nonexistent"]) + path = str(tmpdir.join("flows")) + tdump(path, [tflow.tflow()]) + tctx.configure(cp, client_replay=[path]) + tctx.configure(cp, client_replay=[]) + tctx.configure(cp) + with pytest.raises(exceptions.OptionsError): + tctx.configure(cp, client_replay=["nonexistent"]) diff --git a/test/mitmproxy/addons/test_replace.py b/test/mitmproxy/addons/test_replace.py index 8c280c51..7d590b35 100644 --- a/test/mitmproxy/addons/test_replace.py +++ b/test/mitmproxy/addons/test_replace.py @@ -1,11 +1,8 @@ -import os.path import pytest -from mitmproxy.test import tflow -from mitmproxy.test import tutils -from .. import tservers from mitmproxy.addons import replace from mitmproxy.test import taddons +from mitmproxy.test import tflow class TestReplace: @@ -34,7 +31,7 @@ class TestReplace: with taddons.context() as tctx: tctx.configure( r, - replacements = [ + replacements=[ "/~q/foo/bar", "/~s/foo/bar", ] @@ -49,55 +46,57 @@ class TestReplace: r.response(f) assert f.response.content == b"bar" - -class TestUpstreamProxy(tservers.HTTPUpstreamProxyTest): - ssl = False - def test_order(self): - sa = replace.Replace() - self.proxy.tmaster.addons.add(sa) - - self.proxy.tmaster.options.replacements = [ - "/~q/foo/bar", - "/~q/bar/baz", - "/~q/foo/oh noes!", - "/~s/baz/ORLY" - ] - p = self.pathoc() - with p.connect(): - req = p.request("get:'%s/p/418:b\"foo\"'" % self.server.urlbase) - assert req.content == b"ORLY" - assert req.status_code == 418 + r = replace.Replace() + with taddons.context() as tctx: + tctx.configure( + r, + replacements=[ + "/foo/bar", + "/bar/baz", + "/foo/oh noes!", + "/bar/oh noes!", + ] + ) + f = tflow.tflow() + f.request.content = b"foo" + r.request(f) + assert f.request.content == b"baz" class TestReplaceFile: - def test_simple(self): - r = replace.ReplaceFile() - with tutils.tmpdir() as td: - rp = os.path.join(td, "replacement") - with open(rp, "w") as f: - f.write("bar") - with taddons.context() as tctx: + def test_simple(self, tmpdir): + r = replace.Replace() + with taddons.context() as tctx: + tmpfile = tmpdir.join("replacement") + tmpfile.write("bar") + tctx.configure( + r, + replacements=["/~q/foo/@" + str(tmpfile)] + ) + f = tflow.tflow() + f.request.content = b"foo" + r.request(f) + assert f.request.content == b"bar" + + def test_nonexistent(self, tmpdir): + r = replace.Replace() + with taddons.context() as tctx: + with pytest.raises(Exception, match="Invalid file path"): tctx.configure( r, - replacement_files = [ - "/~q/foo/" + rp, - "/~s/foo/" + rp, - "/~b nonexistent/nonexistent/nonexistent", - ] + replacements=["/~q/foo/@nonexistent"] ) - f = tflow.tflow() - f.request.content = b"foo" - r.request(f) - assert f.request.content == b"bar" - - f = tflow.tflow(resp=True) - f.response.content = b"foo" - r.response(f) - assert f.response.content == b"bar" - f = tflow.tflow() - f.request.content = b"nonexistent" - assert not tctx.master.event_log - r.request(f) - assert tctx.master.event_log + tmpfile = tmpdir.join("replacement") + tmpfile.write("bar") + tctx.configure( + r, + replacements=["/~q/foo/@" + str(tmpfile)] + ) + tmpfile.remove() + f = tflow.tflow() + f.request.content = b"foo" + assert not tctx.master.event_log + r.request(f) + assert tctx.master.event_log diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index 5f196ebf..4c1b2e43 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -68,13 +68,12 @@ class TestParseCommand: with pytest.raises(ValueError): script.parse_command(" ") - def test_no_script_file(self): + def test_no_script_file(self, tmpdir): with pytest.raises(Exception, match="not found"): script.parse_command("notfound") - with tutils.tmpdir() as dir: - with pytest.raises(Exception, match="Not a file"): - script.parse_command(dir) + with pytest.raises(Exception, match="Not a file"): + script.parse_command(str(tmpdir)) def test_parse_args(self): with utils.chdir(tutils.test_data.dirname): @@ -117,9 +116,7 @@ class TestScript: ) ) sc.load_script() - assert sc.ns.call_log == [ - ("solo", "start", (), {}), - ] + assert sc.ns.call_log[0][0:2] == ("solo", "start") sc.ns.call_log = [] f = tflow.tflow(resp=True) @@ -128,28 +125,26 @@ class TestScript: recf = sc.ns.call_log[0] assert recf[1] == "request" - def test_reload(self): + def test_reload(self, tmpdir): with taddons.context() as tctx: - with tutils.tmpdir(): - with open("foo.py", "w"): - pass - sc = script.Script("foo.py") - tctx.configure(sc) - for _ in range(100): - with open("foo.py", "a") as f: - f.write(".") - sc.tick() - time.sleep(0.1) - if tctx.master.event_log: - return - raise AssertionError("Change event not detected.") + f = tmpdir.join("foo.py") + f.ensure(file=True) + sc = script.Script(str(f)) + tctx.configure(sc) + for _ in range(100): + f.write(".") + sc.tick() + time.sleep(0.1) + if tctx.master.event_log: + return + raise AssertionError("Change event not detected.") def test_exception(self): with taddons.context() as tctx: sc = script.Script( tutils.test_data.path("mitmproxy/data/addonscripts/error.py") ) - sc.start() + sc.start(tctx.options) f = tflow.tflow(resp=True) sc.request(f) assert tctx.master.event_log[0][0] == "error" @@ -165,7 +160,7 @@ class TestScript: "mitmproxy/data/addonscripts/addon.py" ) ) - sc.start() + sc.start(tctx.options) tctx.configure(sc) assert sc.ns.event_log == [ 'scriptstart', 'addonstart', 'addonconfigure' @@ -228,24 +223,31 @@ class TestScriptLoader: assert len(m.addons) == 1 def test_dupes(self): - o = options.Options(scripts=["one", "one"]) - m = master.Master(o, proxy.DummyServer()) sc = script.ScriptLoader() - with pytest.raises(exceptions.OptionsError): - m.addons.add(o, sc) + with taddons.context() as tctx: + tctx.master.addons.add(sc) + with pytest.raises(exceptions.OptionsError): + tctx.configure( + sc, + scripts = ["one", "one"] + ) def test_nonexistent(self): - o = options.Options(scripts=["nonexistent"]) - m = master.Master(o, proxy.DummyServer()) sc = script.ScriptLoader() - with pytest.raises(exceptions.OptionsError): - m.addons.add(o, sc) + with taddons.context() as tctx: + tctx.master.addons.add(sc) + with pytest.raises(exceptions.OptionsError): + tctx.configure( + sc, + scripts = ["nonexistent"] + ) def test_order(self): rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py") sc = script.ScriptLoader() with taddons.context() as tctx: tctx.master.addons.add(sc) + sc.running() tctx.configure( sc, scripts = [ @@ -256,9 +258,17 @@ class TestScriptLoader: ) debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"] assert debug == [ - ('debug', 'a start'), ('debug', 'a configure'), - ('debug', 'b start'), ('debug', 'b configure'), - ('debug', 'c start'), ('debug', 'c configure') + ('debug', 'a start'), + ('debug', 'a configure'), + ('debug', 'a running'), + + ('debug', 'b start'), + ('debug', 'b configure'), + ('debug', 'b running'), + + ('debug', 'c start'), + ('debug', 'c configure'), + ('debug', 'c running'), ] tctx.master.event_log = [] tctx.configure( @@ -287,4 +297,5 @@ class TestScriptLoader: ('debug', 'b done'), ('debug', 'x start'), ('debug', 'x configure'), + ('debug', 'x running'), ] diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py index e2afa516..54e4d281 100644 --- a/test/mitmproxy/addons/test_serverplayback.py +++ b/test/mitmproxy/addons/test_serverplayback.py @@ -1,10 +1,8 @@ -import os import urllib import pytest -from mitmproxy.test import tutils -from mitmproxy.test import tflow from mitmproxy.test import taddons +from mitmproxy.test import tflow import mitmproxy.test.tutils from mitmproxy.addons import serverplayback @@ -19,15 +17,14 @@ def tdump(path, flows): w.add(i) -def test_config(): +def test_config(tmpdir): s = serverplayback.ServerPlayback() - with tutils.tmpdir() as p: - with taddons.context() as tctx: - fpath = os.path.join(p, "flows") - tdump(fpath, [tflow.tflow(resp=True)]) - tctx.configure(s, server_replay=[fpath]) - with pytest.raises(exceptions.OptionsError): - tctx.configure(s, server_replay=[p]) + with taddons.context() as tctx: + fpath = str(tmpdir.join("flows")) + tdump(fpath, [tflow.tflow(resp=True)]) + tctx.configure(s, server_replay=[fpath]) + with pytest.raises(exceptions.OptionsError): + tctx.configure(s, server_replay=[str(tmpdir)]) def test_tick(): diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py index 4105c1fc..3f78521c 100644 --- a/test/mitmproxy/addons/test_streamfile.py +++ b/test/mitmproxy/addons/test_streamfile.py @@ -1,9 +1,7 @@ -import os.path import pytest -from mitmproxy.test import tflow -from mitmproxy.test import tutils from mitmproxy.test import taddons +from mitmproxy.test import tflow from mitmproxy import io from mitmproxy import exceptions @@ -11,19 +9,17 @@ from mitmproxy import options from mitmproxy.addons import streamfile -def test_configure(): +def test_configure(tmpdir): sa = streamfile.StreamFile() with taddons.context(options=options.Options()) as tctx: - with tutils.tmpdir() as tdir: - p = os.path.join(tdir, "foo") - with pytest.raises(exceptions.OptionsError): - tctx.configure(sa, streamfile=tdir) - with pytest.raises(Exception, match="Invalid filter"): - tctx.configure(sa, streamfile=p, filtstr="~~") - tctx.configure(sa, filtstr="foo") - assert sa.filt - tctx.configure(sa, filtstr=None) - assert not sa.filt + with pytest.raises(exceptions.OptionsError): + tctx.configure(sa, streamfile=str(tmpdir)) + with pytest.raises(Exception, match="Invalid filter"): + tctx.configure(sa, streamfile=str(tmpdir.join("foo")), filtstr="~~") + tctx.configure(sa, filtstr="foo") + assert sa.filt + tctx.configure(sa, filtstr=None) + assert not sa.filt def rd(p): @@ -31,36 +27,34 @@ def rd(p): return list(x.stream()) -def test_tcp(): +def test_tcp(tmpdir): sa = streamfile.StreamFile() with taddons.context() as tctx: - with tutils.tmpdir() as tdir: - p = os.path.join(tdir, "foo") - tctx.configure(sa, streamfile=p) + p = str(tmpdir.join("foo")) + tctx.configure(sa, streamfile=p) - tt = tflow.ttcpflow() - sa.tcp_start(tt) - sa.tcp_end(tt) - tctx.configure(sa, streamfile=None) - assert rd(p) + tt = tflow.ttcpflow() + sa.tcp_start(tt) + sa.tcp_end(tt) + tctx.configure(sa, streamfile=None) + assert rd(p) -def test_simple(): +def test_simple(tmpdir): sa = streamfile.StreamFile() with taddons.context() as tctx: - with tutils.tmpdir() as tdir: - p = os.path.join(tdir, "foo") + p = str(tmpdir.join("foo")) - tctx.configure(sa, streamfile=p) + tctx.configure(sa, streamfile=p) - f = tflow.tflow(resp=True) - sa.request(f) - sa.response(f) - tctx.configure(sa, streamfile=None) - assert rd(p)[0].response + f = tflow.tflow(resp=True) + sa.request(f) + sa.response(f) + tctx.configure(sa, streamfile=None) + assert rd(p)[0].response - tctx.configure(sa, streamfile="+" + p) - f = tflow.tflow() - sa.request(f) - tctx.configure(sa, streamfile=None) - assert not rd(p)[1].response + tctx.configure(sa, streamfile="+" + p) + f = tflow.tflow() + sa.request(f) + tctx.configure(sa, streamfile=None) + assert not rd(p)[1].response diff --git a/test/mitmproxy/addons/test_termstatus.py b/test/mitmproxy/addons/test_termstatus.py new file mode 100644 index 00000000..01c14814 --- /dev/null +++ b/test/mitmproxy/addons/test_termstatus.py @@ -0,0 +1,12 @@ +from mitmproxy.addons import termstatus +from mitmproxy.test import taddons + + +def test_configure(): + ts = termstatus.TermStatus() + with taddons.context() as ctx: + ts.running() + assert not ctx.master.event_log + ctx.configure(ts, server=True) + ts.running() + assert ctx.master.event_log diff --git a/test/mitmproxy/data/addonscripts/addon.py b/test/mitmproxy/data/addonscripts/addon.py index 84173cb6..f34f41cb 100644 --- a/test/mitmproxy/data/addonscripts/addon.py +++ b/test/mitmproxy/data/addonscripts/addon.py @@ -6,7 +6,7 @@ class Addon: def event_log(self): return event_log - def start(self): + def start(self, opts): event_log.append("addonstart") def configure(self, options, updated): @@ -17,6 +17,6 @@ def configure(options, updated): event_log.append("addonconfigure") -def start(): +def start(opts): event_log.append("scriptstart") return Addon() diff --git a/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py b/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py index bd047c99..10ba24cd 100644 --- a/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py +++ b/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py @@ -9,5 +9,5 @@ class ConcurrentClass: time.sleep(0.1) -def start(): +def start(opts): return ConcurrentClass() diff --git a/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py b/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py index 756869c8..7bc28182 100644 --- a/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py +++ b/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py @@ -2,5 +2,5 @@ from mitmproxy.script import concurrent @concurrent -def start(): +def start(opts): pass diff --git a/test/mitmproxy/data/addonscripts/recorder.py b/test/mitmproxy/data/addonscripts/recorder.py index 6b9b6ea8..aff524a8 100644 --- a/test/mitmproxy/data/addonscripts/recorder.py +++ b/test/mitmproxy/data/addonscripts/recorder.py @@ -22,5 +22,5 @@ class CallLogger: raise AttributeError -def start(): +def start(opts): return CallLogger(*sys.argv[1:]) diff --git a/test/mitmproxy/net/http/test_message.py b/test/mitmproxy/net/http/test_message.py index 034bd600..b75bc7c2 100644 --- a/test/mitmproxy/net/http/test_message.py +++ b/test/mitmproxy/net/http/test_message.py @@ -38,14 +38,12 @@ def _test_decoded_attr(message, attr): class TestMessageData: - def test_eq_ne(self): + def test_eq(self): data = tutils.tresp(timestamp_start=42, timestamp_end=42).data same = tutils.tresp(timestamp_start=42, timestamp_end=42).data assert data == same - assert not data != same other = tutils.tresp(content=b"foo").data - assert not data == other assert data != other assert data != 0 @@ -61,10 +59,8 @@ class TestMessage: resp = tutils.tresp(timestamp_start=42, timestamp_end=42) same = tutils.tresp(timestamp_start=42, timestamp_end=42) assert resp == same - assert not resp != same other = tutils.tresp(timestamp_start=0, timestamp_end=0) - assert not resp == other assert resp != other assert resp != 0 diff --git a/test/mitmproxy/net/test_tcp.py b/test/mitmproxy/net/test_tcp.py index cf010f6e..8b26784a 100644 --- a/test/mitmproxy/net/test_tcp.py +++ b/test/mitmproxy/net/test_tcp.py @@ -11,8 +11,8 @@ from OpenSSL import SSL from mitmproxy import certs from mitmproxy.net import tcp -from mitmproxy.test import tutils from mitmproxy import exceptions +from mitmproxy.test import tutils from . import tservers from ...conftest import requires_alpn @@ -783,25 +783,24 @@ class TestSSLKeyLogger(tservers.ServerTestBase): cipher_list="AES256-SHA" ) - def test_log(self): + def test_log(self, tmpdir): testval = b"echo!\n" _logfun = tcp.log_ssl_key - with tutils.tmpdir() as d: - logfile = os.path.join(d, "foo", "bar", "logfile") - tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) + logfile = str(tmpdir.join("foo", "bar", "logfile")) + tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl() - c.wfile.write(testval) - c.wfile.flush() - assert c.rfile.readline() == testval - c.finish() - - tcp.log_ssl_key.close() - with open(logfile, "rb") as f: - assert f.read().count(b"CLIENT_RANDOM") == 2 + c = tcp.TCPClient(("127.0.0.1", self.port)) + with c.connect(): + c.convert_to_ssl() + c.wfile.write(testval) + c.wfile.flush() + assert c.rfile.readline() == testval + c.finish() + + tcp.log_ssl_key.close() + with open(logfile, "rb") as f: + assert f.read().count(b"CLIENT_RANDOM") == 2 tcp.log_ssl_key = _logfun diff --git a/test/mitmproxy/proxy/test_server.py b/test/mitmproxy/proxy/test_server.py index aa45761a..16efe415 100644 --- a/test/mitmproxy/proxy/test_server.py +++ b/test/mitmproxy/proxy/test_server.py @@ -302,6 +302,9 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin): class TestHTTPAuth(tservers.HTTPProxyTest): def test_auth(self): self.master.addons.add(proxyauth.ProxyAuth()) + self.master.addons.configure_all( + self.master.options, self.master.options.keys() + ) self.master.options.proxyauth = "test:test" assert self.pathod("202").status_code == 407 p = self.pathoc() diff --git a/test/mitmproxy/script/test_concurrent.py b/test/mitmproxy/script/test_concurrent.py index e81c023d..a9b6f0c4 100644 --- a/test/mitmproxy/script/test_concurrent.py +++ b/test/mitmproxy/script/test_concurrent.py @@ -24,7 +24,7 @@ class TestConcurrent(tservers.MasterTest): "mitmproxy/data/addonscripts/concurrent_decorator.py" ) ) - sc.start() + sc.start(tctx.options) f1, f2 = tflow.tflow(), tflow.tflow() tctx.cycle(sc, f1) @@ -42,7 +42,7 @@ class TestConcurrent(tservers.MasterTest): "mitmproxy/data/addonscripts/concurrent_decorator_err.py" ) ) - sc.start() + sc.start(tctx.options) assert "decorator not supported" in tctx.master.event_log[0][1] def test_concurrent_class(self): @@ -52,7 +52,7 @@ class TestConcurrent(tservers.MasterTest): "mitmproxy/data/addonscripts/concurrent_decorator_class.py" ) ) - sc.start() + sc.start(tctx.options) f1, f2 = tflow.tflow(), tflow.tflow() tctx.cycle(sc, f1) diff --git a/test/mitmproxy/test_addonmanager.py b/test/mitmproxy/test_addonmanager.py index 17402e26..3e5f71c6 100644 --- a/test/mitmproxy/test_addonmanager.py +++ b/test/mitmproxy/test_addonmanager.py @@ -10,12 +10,12 @@ from mitmproxy import proxy class TAddon: def __init__(self, name): self.name = name - self.noop_member = True + self.tick = True def __repr__(self): return "Addon(%s)" % self.name - def noop(self): + def done(self): pass @@ -30,6 +30,6 @@ def test_simple(): assert not a.chain a.add(TAddon("one")) - a("noop") + a("done") with pytest.raises(exceptions.AddonError): - a("noop_member") + a("tick") diff --git a/test/mitmproxy/test_certs.py b/test/mitmproxy/test_certs.py index 9bd3ad25..88c49561 100644 --- a/test/mitmproxy/test_certs.py +++ b/test/mitmproxy/test_certs.py @@ -34,118 +34,106 @@ from mitmproxy.test import tutils class TestCertStore: - def test_create_explicit(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - assert ca.get_cert(b"foo", []) - - ca2 = certs.CertStore.from_store(d, "test") - assert ca2.get_cert(b"foo", []) - - assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() - - def test_create_no_common_name(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - assert ca.get_cert(None, [])[0].cn is None - - def test_create_tmp(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - assert ca.get_cert(b"foo.com", []) - assert ca.get_cert(b"foo.com", []) - assert ca.get_cert(b"*.foo.com", []) - - r = ca.get_cert(b"*.foo.com", []) - assert r[1] == ca.default_privatekey - - def test_sans(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - c1 = ca.get_cert(b"foo.com", [b"*.bar.com"]) - ca.get_cert(b"foo.bar.com", []) - # assert c1 == c2 - c3 = ca.get_cert(b"bar.com", []) - assert not c1 == c3 - - def test_sans_change(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - ca.get_cert(b"foo.com", [b"*.bar.com"]) - cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"]) - assert b"*.baz.com" in cert.altnames - - def test_expire(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - ca.STORE_CAP = 3 - ca.get_cert(b"one.com", []) - ca.get_cert(b"two.com", []) - ca.get_cert(b"three.com", []) - - assert (b"one.com", ()) in ca.certs - assert (b"two.com", ()) in ca.certs - assert (b"three.com", ()) in ca.certs - - ca.get_cert(b"one.com", []) - - assert (b"one.com", ()) in ca.certs - assert (b"two.com", ()) in ca.certs - assert (b"three.com", ()) in ca.certs - - ca.get_cert(b"four.com", []) - - assert (b"one.com", ()) not in ca.certs - assert (b"two.com", ()) in ca.certs - assert (b"three.com", ()) in ca.certs - assert (b"four.com", ()) in ca.certs - - def test_overrides(self): - with tutils.tmpdir() as d: - ca1 = certs.CertStore.from_store(os.path.join(d, "ca1"), "test") - ca2 = certs.CertStore.from_store(os.path.join(d, "ca2"), "test") - assert not ca1.default_ca.get_serial_number( - ) == ca2.default_ca.get_serial_number() - - dc = ca2.get_cert(b"foo.com", [b"sans.example.com"]) - dcp = os.path.join(d, "dc") - f = open(dcp, "wb") - f.write(dc[0].to_pem()) - f.close() - ca1.add_cert_file(b"foo.com", dcp) - - ret = ca1.get_cert(b"foo.com", []) - assert ret[0].serial == dc[0].serial - - def test_create_dhparams(self): - with tutils.tmpdir() as d: - filename = os.path.join(d, "dhparam.pem") - certs.CertStore.load_dhparam(filename) - assert os.path.exists(filename) + def test_create_explicit(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + assert ca.get_cert(b"foo", []) + + ca2 = certs.CertStore.from_store(str(tmpdir), "test") + assert ca2.get_cert(b"foo", []) + + assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() + + def test_create_no_common_name(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + assert ca.get_cert(None, [])[0].cn is None + + def test_create_tmp(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + assert ca.get_cert(b"foo.com", []) + assert ca.get_cert(b"foo.com", []) + assert ca.get_cert(b"*.foo.com", []) + + r = ca.get_cert(b"*.foo.com", []) + assert r[1] == ca.default_privatekey + + def test_sans(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + c1 = ca.get_cert(b"foo.com", [b"*.bar.com"]) + ca.get_cert(b"foo.bar.com", []) + # assert c1 == c2 + c3 = ca.get_cert(b"bar.com", []) + assert not c1 == c3 + + def test_sans_change(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + ca.get_cert(b"foo.com", [b"*.bar.com"]) + cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"]) + assert b"*.baz.com" in cert.altnames + + def test_expire(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + ca.STORE_CAP = 3 + ca.get_cert(b"one.com", []) + ca.get_cert(b"two.com", []) + ca.get_cert(b"three.com", []) + + assert (b"one.com", ()) in ca.certs + assert (b"two.com", ()) in ca.certs + assert (b"three.com", ()) in ca.certs + + ca.get_cert(b"one.com", []) + + assert (b"one.com", ()) in ca.certs + assert (b"two.com", ()) in ca.certs + assert (b"three.com", ()) in ca.certs + + ca.get_cert(b"four.com", []) + + assert (b"one.com", ()) not in ca.certs + assert (b"two.com", ()) in ca.certs + assert (b"three.com", ()) in ca.certs + assert (b"four.com", ()) in ca.certs + + def test_overrides(self, tmpdir): + ca1 = certs.CertStore.from_store(str(tmpdir.join("ca1")), "test") + ca2 = certs.CertStore.from_store(str(tmpdir.join("ca2")), "test") + assert not ca1.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() + + dc = ca2.get_cert(b"foo.com", [b"sans.example.com"]) + dcp = tmpdir.join("dc") + dcp.write(dc[0].to_pem()) + ca1.add_cert_file(b"foo.com", str(dcp)) + + ret = ca1.get_cert(b"foo.com", []) + assert ret[0].serial == dc[0].serial + + def test_create_dhparams(self, tmpdir): + filename = str(tmpdir.join("dhparam.pem")) + certs.CertStore.load_dhparam(filename) + assert os.path.exists(filename) class TestDummyCert: - def test_with_ca(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - r = certs.dummy_cert( - ca.default_privatekey, - ca.default_ca, - b"foo.com", - [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"] - ) - assert r.cn == b"foo.com" - assert r.altnames == [b'one.com', b'two.com', b'*.three.com'] - - r = certs.dummy_cert( - ca.default_privatekey, - ca.default_ca, - None, - [] - ) - assert r.cn is None - assert r.altnames == [] + def test_with_ca(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + r = certs.dummy_cert( + ca.default_privatekey, + ca.default_ca, + b"foo.com", + [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"] + ) + assert r.cn == b"foo.com" + assert r.altnames == [b'one.com', b'two.com', b'*.three.com'] + + r = certs.dummy_cert( + ca.default_privatekey, + ca.default_ca, + None, + [] + ) + assert r.cn is None + assert r.altnames == [] class TestSSLCert: @@ -172,7 +160,6 @@ class TestSSLCert: assert c2.to_pem() assert c2.has_expired is not None - assert not c1 == c2 assert c1 != c2 def test_err_broken_sans(self): diff --git a/test/mitmproxy/test_connections.py b/test/mitmproxy/test_connections.py index 0083f57c..67a6552f 100644 --- a/test/mitmproxy/test_connections.py +++ b/test/mitmproxy/test_connections.py @@ -66,8 +66,18 @@ class TestClientConnection: assert c.timestamp_start == 42 c3 = c.copy() + assert c3.get_state() != c.get_state() + c.id = c3.id = "foo" assert c3.get_state() == c.get_state() + def test_eq(self): + c = tflow.tclient_conn() + c2 = c.copy() + assert c == c + assert c != c2 + assert c != 42 + assert hash(c) != hash(c2) + class TestServerConnection: @@ -147,6 +157,21 @@ class TestServerConnection: with pytest.raises(ValueError, matches='sni must be str, not '): c.establish_ssl(None, b'foobar') + def test_state(self): + c = tflow.tserver_conn() + c2 = c.copy() + assert c2.get_state() != c.get_state() + c.id = c2.id = "foo" + assert c2.get_state() == c.get_state() + + def test_eq(self): + c = tflow.tserver_conn() + c2 = c.copy() + assert c == c + assert c != c2 + assert c != 42 + assert hash(c) != hash(c2) + class TestClientConnectionTLS: diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py index 668d0d4a..f20e0c8c 100644 --- a/test/mitmproxy/test_examples.py +++ b/test/mitmproxy/test_examples.py @@ -1,5 +1,4 @@ import json -import os import shlex import pytest @@ -142,30 +141,26 @@ class TestHARDump: with pytest.raises(ScriptError): tscript("complex/har_dump.py") - def test_simple(self): - with tutils.tmpdir() as tdir: - path = os.path.join(tdir, "somefile") + def test_simple(self, tmpdir): + path = str(tmpdir.join("somefile")) - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.invoke(m, "response", self.flow()) - m.addons.remove(sc) - - with open(path, "r") as inp: - har = json.load(inp) + m, sc = tscript("complex/har_dump.py", shlex.quote(path)) + m.addons.invoke(m, "response", self.flow()) + m.addons.remove(sc) + with open(path, "r") as inp: + har = json.load(inp) assert len(har["log"]["entries"]) == 1 - def test_base64(self): - with tutils.tmpdir() as tdir: - path = os.path.join(tdir, "somefile") - - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)) - m.addons.remove(sc) + def test_base64(self, tmpdir): + path = str(tmpdir.join("somefile")) - with open(path, "r") as inp: - har = json.load(inp) + m, sc = tscript("complex/har_dump.py", shlex.quote(path)) + m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)) + m.addons.remove(sc) + with open(path, "r") as inp: + har = json.load(inp) assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64" def test_format_cookies(self): @@ -187,7 +182,7 @@ class TestHARDump: f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0] assert f['expires'] - def test_binary(self): + def test_binary(self, tmpdir): f = self.flow() f.request.method = "POST" @@ -196,14 +191,12 @@ class TestHARDump: f.response.headers["random-junk"] = bytes(range(256)) f.response.content = bytes(range(256)) - with tutils.tmpdir() as tdir: - path = os.path.join(tdir, "somefile") - - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.invoke(m, "response", f) - m.addons.remove(sc) + path = str(tmpdir.join("somefile")) - with open(path, "r") as inp: - har = json.load(inp) + m, sc = tscript("complex/har_dump.py", shlex.quote(path)) + m.addons.invoke(m, "response", f) + m.addons.remove(sc) + with open(path, "r") as inp: + har = json.load(inp) assert len(har["log"]["entries"]) == 1 diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py index db33cddd..df392829 100644 --- a/test/mitmproxy/test_optmanager.py +++ b/test/mitmproxy/test_optmanager.py @@ -1,5 +1,4 @@ import copy -import os import pytest import typing import argparse @@ -7,7 +6,6 @@ import argparse from mitmproxy import options from mitmproxy import optmanager from mitmproxy import exceptions -from mitmproxy.test import tutils class TO(optmanager.OptManager): @@ -85,10 +83,11 @@ def test_options(): with pytest.raises(TypeError): TO(nonexistent = "value") - with pytest.raises(Exception, match="No such option"): + with pytest.raises(Exception, match="Unknown options"): o.nonexistent = "value" - with pytest.raises(Exception, match="No such option"): + with pytest.raises(Exception, match="Unknown options"): o.update(nonexistent = "value") + assert o.update_known(nonexistent = "value") == {"nonexistent": "value"} rec = [] @@ -201,62 +200,63 @@ def test_simple(): def test_serialize(): o = TD2() o.three = "set" - assert "dfour" in o.serialize(None, defaults=True) + assert "dfour" in optmanager.serialize(o, None, defaults=True) - data = o.serialize(None) + data = optmanager.serialize(o, None) assert "dfour" not in data o2 = TD2() - o2.load(data) + optmanager.load(o2, data) assert o2 == o t = """ unknown: foo """ - data = o.serialize(t) + data = optmanager.serialize(o, t) o2 = TD2() - o2.load(data) + optmanager.load(o2, data) assert o2 == o t = "invalid: foo\ninvalid" with pytest.raises(Exception, match="Config error"): - o2.load(t) + optmanager.load(o2, t) t = "invalid" with pytest.raises(Exception, match="Config error"): - o2.load(t) + optmanager.load(o2, t) t = "" - o2.load(t) - - with pytest.raises(exceptions.OptionsError, matches='No such option: foobar'): - o2.load("foobar: '123'") + optmanager.load(o2, t) + assert optmanager.load(o2, "foobar: '123'") == {"foobar": "123"} def test_serialize_defaults(): o = options.Options() - assert o.serialize(None, defaults=True) + assert optmanager.serialize(o, None, defaults=True) -def test_saving(): +def test_saving(tmpdir): o = TD2() o.three = "set" - with tutils.tmpdir() as tdir: - dst = os.path.join(tdir, "conf") - o.save(dst, defaults=True) + dst = str(tmpdir.join("conf")) + optmanager.save(o, dst, defaults=True) + + o2 = TD2() + optmanager.load_paths(o2, dst) + o2.three = "foo" + optmanager.save(o2, dst, defaults=True) - o2 = TD2() - o2.load_paths(dst) - o2.three = "foo" - o2.save(dst, defaults=True) + optmanager.load_paths(o, dst) + assert o.three == "foo" - o.load_paths(dst) - assert o.three == "foo" + with open(dst, 'a') as f: + f.write("foobar: '123'") + assert optmanager.load_paths(o, dst) == {"foobar": "123"} - with open(dst, 'a') as f: - f.write("foobar: '123'") - with pytest.raises(exceptions.OptionsError, matches=''): - o.load_paths(dst) + with open(dst, 'a') as f: + f.write("'''") + with pytest.raises(exceptions.OptionsError): + optmanager.load_paths(o, dst) def test_merge(): @@ -283,9 +283,9 @@ def test_option(): assert o2 != o -def test_dump(): +def test_dump_defaults(): o = options.Options() - assert optmanager.dump(o) + assert optmanager.dump_defaults(o) class TTypes(optmanager.OptManager): @@ -349,3 +349,6 @@ def test_set(): assert opts.seqstr == ["foo", "bar"] opts.set("seqstr") assert opts.seqstr == [] + + with pytest.raises(exceptions.OptionsError): + opts.set("nonexistent=wobble") diff --git a/test/mitmproxy/tools/console/test_master.py b/test/mitmproxy/tools/console/test_master.py index 0bf3734b..6c716ad1 100644 --- a/test/mitmproxy/tools/console/test_master.py +++ b/test/mitmproxy/tools/console/test_master.py @@ -28,7 +28,9 @@ class TestMaster(tservers.MasterTest): if "verbosity" not in opts: opts["verbosity"] = 1 o = options.Options(**opts) - return console.master.ConsoleMaster(o, proxy.DummyServer()) + m = console.master.ConsoleMaster(o, proxy.DummyServer()) + m.addons.configure_all(o, o.keys()) + return m def test_basic(self): m = self.mkmaster() diff --git a/test/mitmproxy/tools/test_dump.py b/test/mitmproxy/tools/test_dump.py index 2542ec4b..a15bf583 100644 --- a/test/mitmproxy/tools/test_dump.py +++ b/test/mitmproxy/tools/test_dump.py @@ -1,4 +1,3 @@ -import os import pytest from unittest import mock @@ -9,7 +8,6 @@ from mitmproxy import controller from mitmproxy import options from mitmproxy.tools import dump -from mitmproxy.test import tutils from .. import tservers @@ -19,18 +17,17 @@ class TestDumpMaster(tservers.MasterTest): m = dump.DumpMaster(o, proxy.DummyServer(), with_termlog=False, with_dumper=False) return m - def test_read(self): - with tutils.tmpdir() as t: - p = os.path.join(t, "read") - self.flowfile(p) - self.dummy_cycle( - self.mkmaster(None, rfile=p), - 1, b"", - ) - with pytest.raises(exceptions.OptionsError): - self.mkmaster(None, rfile="/nonexistent") - with pytest.raises(exceptions.OptionsError): - self.mkmaster(None, rfile="test_dump.py") + def test_read(self, tmpdir): + p = str(tmpdir.join("read")) + self.flowfile(p) + self.dummy_cycle( + self.mkmaster(None, rfile=p), + 1, b"", + ) + with pytest.raises(exceptions.OptionsError): + self.mkmaster(None, rfile="/nonexistent") + with pytest.raises(exceptions.OptionsError): + self.mkmaster(None, rfile="test_dump.py") def test_has_error(self): m = self.mkmaster(None) diff --git a/test/mitmproxy/tservers.py b/test/mitmproxy/tservers.py index a8aaa358..c47411ee 100644 --- a/test/mitmproxy/tservers.py +++ b/test/mitmproxy/tservers.py @@ -79,6 +79,8 @@ class TestMaster(master.Master): self.state = TestState() self.addons.add(self.state) self.addons.add(*addons) + self.addons.configure_all(self.options, self.options.keys()) + self.addons.invoke_all_with_context("running") def clear_log(self): self.tlog = [] diff --git a/test/mitmproxy/types/test_multidict.py b/test/mitmproxy/types/test_multidict.py index 9b13c5cd..c76cd753 100644 --- a/test/mitmproxy/types/test_multidict.py +++ b/test/mitmproxy/types/test_multidict.py @@ -93,11 +93,6 @@ class TestMultiDict: md1.fields = md1.fields[1:] + md1.fields[:1] assert not (md1 == md2) - def test_ne(self): - assert not TMultiDict() != TMultiDict() - assert TMultiDict() != self._multi() - assert TMultiDict() != 42 - def test_hash(self): """ If a class defines mutable objects and implements an __eq__() method, @@ -205,3 +200,12 @@ class TestMultiDictView: tv["c"] = "b" assert p.vals == (("a", "b"), ("c", "b")) assert tv["a"] == "b" + + def test_copy(self): + p = TParent() + tv = multidict.MultiDictView(p.getter, p.setter) + c = tv.copy() + assert isinstance(c, multidict.MultiDict) + assert tv.items() == c.items() + c["foo"] = "bar" + assert tv.items() != c.items() diff --git a/test/mitmproxy/types/test_serializable.py b/test/mitmproxy/types/test_serializable.py index dd4a3778..390d17e1 100644 --- a/test/mitmproxy/types/test_serializable.py +++ b/test/mitmproxy/types/test_serializable.py @@ -1,3 +1,5 @@ +import copy + from mitmproxy.types import serializable @@ -6,17 +8,17 @@ class SerializableDummy(serializable.Serializable): self.i = i def get_state(self): - return self.i + return copy.copy(self.i) def set_state(self, i): self.i = i - def from_state(self, state): - return type(self)(state) + @classmethod + def from_state(cls, state): + return cls(state) class TestSerializable: - def test_copy(self): a = SerializableDummy(42) assert a.i == 42 @@ -26,3 +28,12 @@ class TestSerializable: a.set_state(1) assert a.i == 1 assert b.i == 42 + + def test_copy_id(self): + a = SerializableDummy({ + "id": "foo", + "foo": 42 + }) + b = a.copy() + assert a.get_state()["id"] != b.get_state()["id"] + assert a.get_state()["foo"] == b.get_state()["foo"] diff --git a/test/pathod/language/test_base.py b/test/pathod/language/test_base.py index 85e9e53b..ec460b07 100644 --- a/test/pathod/language/test_base.py +++ b/test/pathod/language/test_base.py @@ -1,11 +1,8 @@ -import os import pytest from pathod import language from pathod.language import base, exceptions -from mitmproxy.test import tutils - def parse_request(s): return language.parse_pathoc(s).next() @@ -137,24 +134,22 @@ class TestTokValueFile: v = base.TokValue.parseString("<path")[0] assert v.path == "path" - def test_access_control(self): + def test_access_control(self, tmpdir): v = base.TokValue.parseString("<path")[0] - with tutils.tmpdir() as t: - p = os.path.join(t, "path") - with open(p, "wb") as f: - f.write(b"x" * 10000) - - assert v.get_generator(language.Settings(staticdir=t)) - - v = base.TokValue.parseString("<path2")[0] - with pytest.raises(exceptions.FileAccessDenied): - v.get_generator(language.Settings(staticdir=t)) - with pytest.raises(Exception, match="access disabled"): - v.get_generator(language.Settings()) - - v = base.TokValue.parseString("</outside")[0] - with pytest.raises(Exception, match="outside"): - v.get_generator(language.Settings(staticdir=t)) + f = tmpdir.join("path") + f.write(b"x" * 10000) + + assert v.get_generator(language.Settings(staticdir=str(tmpdir))) + + v = base.TokValue.parseString("<path2")[0] + with pytest.raises(exceptions.FileAccessDenied): + v.get_generator(language.Settings(staticdir=str(tmpdir))) + with pytest.raises(Exception, match="access disabled"): + v.get_generator(language.Settings()) + + v = base.TokValue.parseString("</outside")[0] + with pytest.raises(Exception, match="outside"): + v.get_generator(language.Settings(staticdir=str(tmpdir))) def test_spec(self): v = base.TokValue.parseString("<'one two'")[0] diff --git a/test/pathod/language/test_generators.py b/test/pathod/language/test_generators.py index b3ce0335..dc15aaa1 100644 --- a/test/pathod/language/test_generators.py +++ b/test/pathod/language/test_generators.py @@ -1,7 +1,4 @@ -import os - from pathod.language import generators -from mitmproxy.test import tutils def test_randomgenerator(): @@ -15,23 +12,20 @@ def test_randomgenerator(): assert len(g[1000:1001]) == 0 -def test_filegenerator(): - with tutils.tmpdir() as t: - path = os.path.join(t, "foo") - f = open(path, "wb") - f.write(b"x" * 10000) - f.close() - g = generators.FileGenerator(path) - assert len(g) == 10000 - assert g[0] == b"x" - assert g[-1] == b"x" - assert g[0:5] == b"xxxxx" - assert len(g[1:10]) == 9 - assert len(g[10000:10001]) == 0 - assert repr(g) - # remove all references to FileGenerator instance to close the file - # handle. - del g +def test_filegenerator(tmpdir): + f = tmpdir.join("foo") + f.write(b"x" * 10000) + g = generators.FileGenerator(str(f)) + assert len(g) == 10000 + assert g[0] == b"x" + assert g[-1] == b"x" + assert g[0:5] == b"xxxxx" + assert len(g[1:10]) == 9 + assert len(g[10000:10001]) == 0 + assert repr(g) + # remove all references to FileGenerator instance to close the file + # handle. + del g def test_transform_generator(): |