diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/mitmproxy/addons/test_cut.py | 157 | ||||
-rw-r--r-- | test/mitmproxy/test_command.py | 17 | ||||
-rw-r--r-- | test/mitmproxy/test_connections.py | 2 | ||||
-rw-r--r-- | test/mitmproxy/utils/test_typecheck.py | 11 |
4 files changed, 186 insertions, 1 deletions
diff --git a/test/mitmproxy/addons/test_cut.py b/test/mitmproxy/addons/test_cut.py new file mode 100644 index 00000000..b4c0f66b --- /dev/null +++ b/test/mitmproxy/addons/test_cut.py @@ -0,0 +1,157 @@ + +from mitmproxy.addons import cut +from mitmproxy.addons import view +from mitmproxy import exceptions +from mitmproxy import certs +from mitmproxy.test import taddons +from mitmproxy.test import tflow +from mitmproxy.test import tutils +import pytest + + +def test_extract(): + tf = tflow.tflow(resp=True) + tests = [ + ["q.method", "GET"], + ["q.scheme", "http"], + ["q.host", "address"], + ["q.port", "22"], + ["q.path", "/path"], + ["q.url", "http://address:22/path"], + ["q.text", "content"], + ["q.content", b"content"], + ["q.raw_content", b"content"], + ["q.header[header]", "qvalue"], + + ["s.status_code", "200"], + ["s.reason", "OK"], + ["s.text", "message"], + ["s.content", b"message"], + ["s.raw_content", b"message"], + ["s.header[header-response]", "svalue"], + + ["cc.address.port", "22"], + ["cc.address.host", "address"], + ["cc.tls_version", "TLSv1.2"], + ["cc.sni", "address"], + ["cc.ssl_established", "false"], + + ["sc.address.port", "22"], + ["sc.address.host", "address"], + ["sc.ip_address.host", "192.168.0.1"], + ["sc.tls_version", "TLSv1.2"], + ["sc.sni", "address"], + ["sc.ssl_established", "false"], + ] + for t in tests: + ret = cut.extract(t[0], tf) + if ret != t[1]: + raise AssertionError("%s: Expected %s, got %s" % (t[0], t[1], ret)) + + with open(tutils.test_data.path("mitmproxy/net/data/text_cert"), "rb") as f: + d = f.read() + c1 = certs.SSLCert.from_pem(d) + tf.server_conn.cert = c1 + assert "CERTIFICATE" in cut.extract("sc.cert", tf) + + +def test_parse_cutspec(): + tests = [ + ("", None, True), + ("req.method", ("@all", ["req.method"]), False), + ( + "req.method,req.host", + ("@all", ["req.method", "req.host"]), + False + ), + ( + "req.method,req.host|~b foo", + ("~b foo", ["req.method", "req.host"]), + False + ), + ( + "req.method,req.host|~b foo | ~b bar", + ("~b foo | ~b bar", ["req.method", "req.host"]), + False + ), + ( + "req.method, req.host | ~b foo | ~b bar", + ("~b foo | ~b bar", ["req.method", "req.host"]), + False + ), + ] + for cutspec, output, err in tests: + try: + assert cut.parse_cutspec(cutspec) == output + except exceptions.CommandError: + if not err: + raise + else: + if err: + raise AssertionError("Expected error.") + + +def test_headername(): + with pytest.raises(exceptions.CommandError): + cut.headername("header[foo.") + + +def qr(f): + with open(f, "rb") as fp: + return fp.read() + + +def test_cut_file(tmpdir): + f = str(tmpdir.join("path")) + v = view.View() + c = cut.Cut() + with taddons.context() as tctx: + tctx.master.addons.add(v, c) + + v.add([tflow.tflow(resp=True)]) + + tctx.command(c.save, "q.method|@all", f) + assert qr(f) == b"GET" + tctx.command(c.save, "q.content|@all", f) + assert qr(f) == b"content" + tctx.command(c.save, "q.content|@all", "+" + f) + assert qr(f) == b"content\ncontent" + + v.add([tflow.tflow(resp=True)]) + tctx.command(c.save, "q.method|@all", f) + assert qr(f).splitlines() == [b"GET", b"GET"] + tctx.command(c.save, "q.method,q.content|@all", f) + assert qr(f).splitlines() == [b"GET,content", b"GET,content"] + + +def test_cut(): + v = view.View() + c = cut.Cut() + with taddons.context() as tctx: + v.add([tflow.tflow(resp=True)]) + tctx.master.addons.add(v, c) + assert c.cut("q.method|@all") == [["GET"]] + assert c.cut("q.scheme|@all") == [["http"]] + assert c.cut("q.host|@all") == [["address"]] + assert c.cut("q.port|@all") == [["22"]] + assert c.cut("q.path|@all") == [["/path"]] + assert c.cut("q.url|@all") == [["http://address:22/path"]] + assert c.cut("q.content|@all") == [[b"content"]] + assert c.cut("q.header[header]|@all") == [["qvalue"]] + assert c.cut("q.header[unknown]|@all") == [[""]] + + assert c.cut("s.status_code|@all") == [["200"]] + assert c.cut("s.reason|@all") == [["OK"]] + assert c.cut("s.content|@all") == [[b"message"]] + assert c.cut("s.header[header-response]|@all") == [["svalue"]] + assert c.cut("moo") == [[""]] + with pytest.raises(exceptions.CommandError): + assert c.cut("__dict__") == [[""]] + + v = view.View() + c = cut.Cut() + with taddons.context() as tctx: + tctx.master.addons.add(v, c) + v.add([tflow.ttcpflow()]) + assert c.cut("q.method|@all") == [[""]] + assert c.cut("s.status|@all") == [[""]] diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index 96d79dba..24d11d37 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -7,6 +7,7 @@ from mitmproxy import proxy from mitmproxy import exceptions from mitmproxy.test import tflow from mitmproxy.test import taddons +import io import pytest @@ -55,22 +56,34 @@ def test_simple(): c.add("empty", a.empty) c.call("empty") + fp = io.StringIO() + c.dump(fp) + assert fp.getvalue() + def test_typename(): assert command.typename(str, True) == "str" assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]" assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec" + + assert command.typename(command.Cuts, False) == "cutspec" + assert command.typename(command.Cuts, True) == "[cuts]" + assert command.typename(flow.Flow, False) == "flow" class DummyConsole: def load(self, l): l.add_command("view.resolve", self.resolve) + l.add_command("cut", self.cut) def resolve(self, spec: str) -> typing.Sequence[flow.Flow]: n = int(spec) return [tflow.tflow(resp=True)] * n + def cut(self, spec: str) -> command.Cuts: + return [["test"]] + def test_parsearg(): with taddons.context() as tctx: @@ -97,6 +110,10 @@ def test_parsearg(): with pytest.raises(exceptions.CommandError): command.parsearg(tctx.master.commands, "foo", Exception) + assert command.parsearg( + tctx.master.commands, "foo", command.Cuts + ) == [["test"]] + class TDec: @command.command("cmd1") diff --git a/test/mitmproxy/test_connections.py b/test/mitmproxy/test_connections.py index 67a6552f..e320885d 100644 --- a/test/mitmproxy/test_connections.py +++ b/test/mitmproxy/test_connections.py @@ -99,7 +99,7 @@ class TestServerConnection: c.alpn_proto_negotiated = b'h2' assert 'address:22' in repr(c) assert 'ALPN' in repr(c) - assert 'TLS: foobar' in repr(c) + assert 'TLSv1.2: foobar' in repr(c) c.sni = None c.tls_established = True diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py index 22bd7c34..17f70d37 100644 --- a/test/mitmproxy/utils/test_typecheck.py +++ b/test/mitmproxy/utils/test_typecheck.py @@ -4,6 +4,7 @@ from unittest import mock import pytest from mitmproxy.utils import typecheck +from mitmproxy import command class TBase: @@ -93,9 +94,19 @@ def test_check_command_return_type(): assert(typecheck.check_command_return_type(None, None)) assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int])) assert(not typecheck.check_command_return_type("foo", typing.Sequence[int])) + assert(typecheck.check_command_return_type([["foo", b"bar"]], command.Cuts)) + assert(not typecheck.check_command_return_type(["foo", b"bar"], command.Cuts)) + assert(not typecheck.check_command_return_type([["foo", 22]], command.Cuts)) # Python 3.5 only defines __parameters__ m = mock.Mock() m.__str__ = lambda self: "typing.Sequence" m.__parameters__ = (int,) + typecheck.check_command_return_type([10], m) + + # Python 3.5 only defines __union_params__ + m = mock.Mock() + m.__str__ = lambda self: "typing.Union" + m.__union_params__ = (int,) + assert not typecheck.check_command_return_type([22], m) |