diff options
Diffstat (limited to 'test')
31 files changed, 697 insertions, 342 deletions
diff --git a/test/mock_urwid.py b/test/mock_urwid.py index f132e0bd..191210bf 100644 --- a/test/mock_urwid.py +++ b/test/mock_urwid.py @@ -1,8 +1,10 @@ -import os, sys, mock +import os +import sys +import mock if os.name == "nt": m = mock.Mock() m.__version__ = "1.1.1" m.Widget = mock.Mock m.WidgetWrap = mock.Mock sys.modules['urwid'] = m - sys.modules['urwid.util'] = mock.Mock()
\ No newline at end of file + sys.modules['urwid.util'] = mock.Mock() diff --git a/test/scripts/a.py b/test/scripts/a.py index 1d5717b0..210fea78 100644 --- a/test/scripts/a.py +++ b/test/scripts/a.py @@ -4,14 +4,18 @@ parser = argparse.ArgumentParser() parser.add_argument('--var', type=int) var = 0 + + def start(ctx, argv): global var var = parser.parse_args(argv[1:]).var + def here(ctx): global var var += 1 return var + def errargs(): pass diff --git a/test/scripts/all.py b/test/scripts/all.py index 3acaf694..dad2aade 100644 --- a/test/scripts/all.py +++ b/test/scripts/all.py @@ -1,28 +1,36 @@ log = [] + + def clientconnect(ctx, cc): ctx.log("XCLIENTCONNECT") log.append("clientconnect") + def serverconnect(ctx, cc): ctx.log("XSERVERCONNECT") log.append("serverconnect") + def request(ctx, f): ctx.log("XREQUEST") log.append("request") + def response(ctx, f): ctx.log("XRESPONSE") log.append("response") + def responseheaders(ctx, f): ctx.log("XRESPONSEHEADERS") log.append("responseheaders") + def clientdisconnect(ctx, cc): ctx.log("XCLIENTDISCONNECT") log.append("clientdisconnect") + def error(ctx, cc): ctx.log("XERROR") log.append("error") diff --git a/test/scripts/concurrent_decorator.py b/test/scripts/concurrent_decorator.py index 8e132006..f6feda1d 100644 --- a/test/scripts/concurrent_decorator.py +++ b/test/scripts/concurrent_decorator.py @@ -29,4 +29,4 @@ def error(context, err): @concurrent def clientdisconnect(context, dc): - context.log("clientdisconnect")
\ No newline at end of file + context.log("clientdisconnect") diff --git a/test/scripts/concurrent_decorator_err.py b/test/scripts/concurrent_decorator_err.py index 78191315..00fd8dad 100644 --- a/test/scripts/concurrent_decorator_err.py +++ b/test/scripts/concurrent_decorator_err.py @@ -1,5 +1,6 @@ from libmproxy.script import concurrent + @concurrent def start(context, argv): - pass
\ No newline at end of file + pass diff --git a/test/scripts/duplicate_flow.py b/test/scripts/duplicate_flow.py index f1b92309..e13af786 100644 --- a/test/scripts/duplicate_flow.py +++ b/test/scripts/duplicate_flow.py @@ -2,4 +2,3 @@ def request(ctx, f): f = ctx.duplicate_flow(f) ctx.replay_request(f) - diff --git a/test/scripts/stream_modify.py b/test/scripts/stream_modify.py index 9a98a7ee..e5c323be 100644 --- a/test/scripts/stream_modify.py +++ b/test/scripts/stream_modify.py @@ -4,4 +4,4 @@ def modify(chunks): def responseheaders(context, flow): - flow.response.stream = modify
\ No newline at end of file + flow.response.stream = modify diff --git a/test/test_app.py b/test/test_app.py index 0b6ed14c..5fb49563 100644 --- a/test/test_app.py +++ b/test/test_app.py @@ -1,8 +1,13 @@ -import mock, socket, os, time +import mock +import socket +import os +import time from libmproxy import dump from netlib import certutils, tcp from libpathod.pathoc import Pathoc -import tutils, tservers +import tutils +import tservers + class TestApp(tservers.HTTPProxTest): def test_basic(self): diff --git a/test/test_cmdline.py b/test/test_cmdline.py index f7bf4612..eafcbde4 100644 --- a/test/test_cmdline.py +++ b/test/test_cmdline.py @@ -37,13 +37,24 @@ def test_parse_replace_hook(): def test_parse_server_spec(): tutils.raises("Invalid server specification", cmdline.parse_server_spec, "") - assert cmdline.parse_server_spec("http://foo.com:88") == [False, False, "foo.com", 88] - assert cmdline.parse_server_spec("http://foo.com") == [False, False, "foo.com", 80] - assert cmdline.parse_server_spec("https://foo.com") == [True, True, "foo.com", 443] - assert cmdline.parse_server_spec_special("https2http://foo.com") == [True, False, "foo.com", 80] - assert cmdline.parse_server_spec_special("http2https://foo.com") == [False, True, "foo.com", 443] - tutils.raises("Invalid server specification", cmdline.parse_server_spec, "foo.com") - tutils.raises("Invalid server specification", cmdline.parse_server_spec, "http://") + assert cmdline.parse_server_spec( + "http://foo.com:88") == [False, False, "foo.com", 88] + assert cmdline.parse_server_spec( + "http://foo.com") == [False, False, "foo.com", 80] + assert cmdline.parse_server_spec( + "https://foo.com") == [True, True, "foo.com", 443] + assert cmdline.parse_server_spec_special( + "https2http://foo.com") == [True, False, "foo.com", 80] + assert cmdline.parse_server_spec_special( + "http2https://foo.com") == [False, True, "foo.com", 443] + tutils.raises( + "Invalid server specification", + cmdline.parse_server_spec, + "foo.com") + tutils.raises( + "Invalid server specification", + cmdline.parse_server_spec, + "http://") def test_parse_setheaders(): @@ -103,7 +114,7 @@ def test_common(): ) p = tutils.test_data.path("data/replace") - opts.replace_file = [("/foo/bar/%s"%p)] + opts.replace_file = [("/foo/bar/%s" % p)] v = cmdline.get_common_options(opts)["replacements"] assert len(v) == 1 assert v[0][2].strip() == "replacecontents" @@ -122,5 +133,3 @@ def test_mitmdump(): def test_mitmweb(): ap = cmdline.mitmweb() assert ap - - diff --git a/test/test_console.py b/test/test_console.py index 419b94a7..ed8408a5 100644 --- a/test/test_console.py +++ b/test/test_console.py @@ -1,4 +1,7 @@ -import os, sys, mock, gc +import os +import sys +import mock +import gc from os.path import normpath import mock_urwid from libmproxy import console @@ -6,6 +9,7 @@ from libmproxy.console import common import tutils + class TestConsoleState: def test_flow(self): """ diff --git a/test/test_console_contentview.py b/test/test_console_contentview.py index 83dbbb8f..a296376e 100644 --- a/test/test_console_contentview.py +++ b/test/test_console_contentview.py @@ -31,40 +31,39 @@ class TestContentView: def test_view_auto(self): v = cv.ViewAuto() f = v( - odict.ODictCaseless(), - "foo", - 1000 - ) + odict.ODictCaseless(), + "foo", + 1000 + ) assert f[0] == "Raw" f = v( - odict.ODictCaseless( - [["content-type", "text/html"]], - ), - "<html></html>", - 1000 - ) + odict.ODictCaseless( + [["content-type", "text/html"]], + ), + "<html></html>", + 1000 + ) assert f[0] == "HTML" f = v( - odict.ODictCaseless( - [["content-type", "text/flibble"]], - ), - "foo", - 1000 - ) + odict.ODictCaseless( + [["content-type", "text/flibble"]], + ), + "foo", + 1000 + ) assert f[0] == "Raw" f = v( - odict.ODictCaseless( - [["content-type", "text/flibble"]], - ), - "<xml></xml>", - 1000 - ) + odict.ODictCaseless( + [["content-type", "text/flibble"]], + ), + "<xml></xml>", + 1000 + ) assert f[0].startswith("XML") - def test_view_urlencoded(self): d = utils.urlencode([("one", "two"), ("three", "four")]) v = cv.ViewURLEncoded() @@ -91,7 +90,7 @@ class TestContentView: v = cv.ViewJSON() assert v([], "{}", 1000) assert not v([], "{", 1000) - assert v([], "[" + ",".join(["0"]*cv.VIEW_CUTOFF) + "]", 1000) + assert v([], "[" + ",".join(["0"] * cv.VIEW_CUTOFF) + "]", 1000) assert v([], "[1, 2, 3, 4, 5]", 5) def test_view_xml(self): @@ -145,18 +144,18 @@ class TestContentView: def test_view_image(self): v = cv.ViewImage() p = tutils.test_data.path("data/image.png") - assert v([], file(p,"rb").read(), sys.maxint) + assert v([], file(p, "rb").read(), sys.maxsize) p = tutils.test_data.path("data/image.gif") - assert v([], file(p,"rb").read(), sys.maxint) + assert v([], file(p, "rb").read(), sys.maxsize) p = tutils.test_data.path("data/image-err1.jpg") - assert v([], file(p,"rb").read(), sys.maxint) + assert v([], file(p, "rb").read(), sys.maxsize) p = tutils.test_data.path("data/image.ico") - assert v([], file(p,"rb").read(), sys.maxint) + assert v([], file(p, "rb").read(), sys.maxsize) - assert not v([], "flibble", sys.maxint) + assert not v([], "flibble", sys.maxsize) def test_view_multipart(self): view = cv.ViewMultipart() @@ -187,71 +186,70 @@ Larry def test_get_content_view(self): r = cv.get_content_view( - cv.get("Raw"), - [["content-type", "application/json"]], - "[1, 2, 3]", - 1000, - lambda x, l: None, - False - ) + cv.get("Raw"), + [["content-type", "application/json"]], + "[1, 2, 3]", + 1000, + lambda x, l: None, + False + ) assert "Raw" in r[0] r = cv.get_content_view( - cv.get("Auto"), - [["content-type", "application/json"]], - "[1, 2, 3]", - 1000, - lambda x, l: None, - False - ) + cv.get("Auto"), + [["content-type", "application/json"]], + "[1, 2, 3]", + 1000, + lambda x, l: None, + False + ) assert r[0] == "JSON" r = cv.get_content_view( - cv.get("Auto"), - [["content-type", "application/json"]], - "[1, 2", - 1000, - lambda x, l: None, - False - ) + cv.get("Auto"), + [["content-type", "application/json"]], + "[1, 2", + 1000, + lambda x, l: None, + False + ) assert "Raw" in r[0] r = cv.get_content_view( - cv.get("AMF"), - [], - "[1, 2", - 1000, - lambda x, l: None, - False - ) + cv.get("AMF"), + [], + "[1, 2", + 1000, + lambda x, l: None, + False + ) assert "Raw" in r[0] - r = cv.get_content_view( - cv.get("Auto"), - [ - ["content-type", "application/json"], - ["content-encoding", "gzip"] - ], - encoding.encode('gzip', "[1, 2, 3]"), - 1000, - lambda x, l: None, - False - ) + cv.get("Auto"), + [ + ["content-type", "application/json"], + ["content-encoding", "gzip"] + ], + encoding.encode('gzip', "[1, 2, 3]"), + 1000, + lambda x, l: None, + False + ) assert "decoded gzip" in r[0] assert "JSON" in r[0] r = cv.get_content_view( - cv.get("XML"), - [ - ["content-type", "application/json"], - ["content-encoding", "gzip"] - ], - encoding.encode('gzip', "[1, 2, 3]"), - 1000, - lambda x, l: None, - False - ) + cv.get("XML"), + [ + ["content-type", "application/json"], + ["content-encoding", "gzip"] + ], + encoding.encode('gzip', "[1, 2, 3]"), + 1000, + lambda x, l: None, + False + ) assert "decoded gzip" in r[0] assert "Raw" in r[0] @@ -261,24 +259,25 @@ if pyamf: v = cv.ViewAMF() p = tutils.test_data.path("data/amf01") - assert v([], file(p,"rb").read(), sys.maxint) + assert v([], file(p, "rb").read(), sys.maxsize) p = tutils.test_data.path("data/amf02") - assert v([], file(p,"rb").read(), sys.maxint) + assert v([], file(p, "rb").read(), sys.maxsize) def test_view_amf_response(): v = cv.ViewAMF() p = tutils.test_data.path("data/amf03") - assert v([], file(p,"rb").read(), sys.maxint) + assert v([], file(p, "rb").read(), sys.maxsize) if cv.ViewProtobuf.is_available(): def test_view_protobuf_request(): v = cv.ViewProtobuf() p = tutils.test_data.path("data/protobuf01") - content_type, output = v([], file(p,"rb").read(), sys.maxint) + content_type, output = v([], file(p, "rb").read(), sys.maxsize) assert content_type == "Protobuf" assert output[0].text == '1: "3bbc333c-e61c-433b-819a-0b9a8cc103b8"' + def test_get_by_shortcut(): assert cv.get_by_shortcut("h") diff --git a/test/test_console_help.py b/test/test_console_help.py index 24517439..a7a8b745 100644 --- a/test/test_console_help.py +++ b/test/test_console_help.py @@ -5,10 +5,12 @@ if os.name == "nt": import libmproxy.console.help as help + class DummyLoop: def __init__(self): self.widget = None + class DummyMaster: def __init__(self): self.loop = DummyLoop() diff --git a/test/test_controller.py b/test/test_controller.py index e71a148e..d287f18d 100644 --- a/test/test_controller.py +++ b/test/test_controller.py @@ -8,5 +8,3 @@ class TestMaster: msg = mock.MagicMock() m.handle("type", msg) assert msg.reply.call_count == 1 - - diff --git a/test/test_dump.py b/test/test_dump.py index 48eeb244..e3743ac6 100644 --- a/test/test_dump.py +++ b/test/test_dump.py @@ -116,7 +116,6 @@ class TestDumpMaster: 0, None, "", verbosity=1, rfile="test_dump.py" ) - def test_options(self): o = dump.Options(verbosity = 2) assert o.verbosity == 2 @@ -147,21 +146,25 @@ class TestDumpMaster: def test_basic(self): for i in (1, 2, 3): assert "GET" in self._dummy_cycle(1, "~s", "", flow_detail=i) - assert "GET" in self._dummy_cycle(1, "~s", "\x00\x00\x00", flow_detail=i) + assert "GET" in self._dummy_cycle( + 1, + "~s", + "\x00\x00\x00", + flow_detail=i) assert "GET" in self._dummy_cycle(1, "~s", "ascii", flow_detail=i) def test_write(self): with tutils.tmpdir() as d: p = os.path.join(d, "a") - self._dummy_cycle(1, None, "", outfile=(p,"wb"), verbosity=0) - assert len(list(flow.FlowReader(open(p,"rb")).stream())) == 1 + self._dummy_cycle(1, None, "", outfile=(p, "wb"), verbosity=0) + assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 1 def test_write_append(self): with tutils.tmpdir() as d: p = os.path.join(d, "a.append") - self._dummy_cycle(1, None, "", outfile=(p,"wb"), verbosity=0) - self._dummy_cycle(1, None, "", outfile=(p,"ab"), verbosity=0) - assert len(list(flow.FlowReader(open(p,"rb")).stream())) == 2 + self._dummy_cycle(1, None, "", outfile=(p, "wb"), verbosity=0) + self._dummy_cycle(1, None, "", outfile=(p, "ab"), verbosity=0) + assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 2 def test_write_err(self): tutils.raises( diff --git a/test/test_encoding.py b/test/test_encoding.py index 732447e2..e13f5dce 100644 --- a/test/test_encoding.py +++ b/test/test_encoding.py @@ -1,5 +1,6 @@ from libmproxy import encoding + def test_identity(): assert "string" == encoding.decode("identity", "string") assert "string" == encoding.encode("identity", "string") @@ -8,12 +9,25 @@ def test_identity(): def test_gzip(): - assert "string" == encoding.decode("gzip", encoding.encode("gzip", "string")) + assert "string" == encoding.decode( + "gzip", + encoding.encode( + "gzip", + "string")) assert None == encoding.decode("gzip", "bogus") def test_deflate(): - assert "string" == encoding.decode("deflate", encoding.encode("deflate", "string")) - assert "string" == encoding.decode("deflate", encoding.encode("deflate", "string")[2:-4]) + assert "string" == encoding.decode( + "deflate", + encoding.encode( + "deflate", + "string")) + assert "string" == encoding.decode( + "deflate", + encoding.encode( + "deflate", + "string")[ + 2:- + 4]) assert None == encoding.decode("deflate", "bogus") - diff --git a/test/test_examples.py b/test/test_examples.py index fd901b5d..e9bccd2e 100644 --- a/test/test_examples.py +++ b/test/test_examples.py @@ -21,7 +21,7 @@ def test_load_scripts(): f += " foo bar" # two arguments required try: s = script.Script(f, tmaster) # Loads the script file. - except Exception, v: + except Exception as v: if not "ImportError" in str(v): raise else: diff --git a/test/test_filt.py b/test/test_filt.py index 97b8e73c..3ad17dfe 100644 --- a/test/test_filt.py +++ b/test/test_filt.py @@ -5,6 +5,7 @@ from libmproxy.protocol import http from libmproxy.protocol.primitives import Error import tutils + class TestParsing: def _dump(self, x): c = cStringIO.StringIO() @@ -99,7 +100,15 @@ class TestMatching: headers = odict.ODictCaseless() headers["header_response"] = ["svalue"] - f.response = http.HTTPResponse((1, 1), 200, "OK", headers, "content_response", None, None) + f.response = http.HTTPResponse( + (1, + 1), + 200, + "OK", + headers, + "content_response", + None, + None) return f diff --git a/test/test_flow.py b/test/test_flow.py index f5d06906..2609b7cb 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -1,4 +1,6 @@ -import Queue, time, os.path +import Queue +import time +import os.path from cStringIO import StringIO import email.utils import mock @@ -33,7 +35,6 @@ def test_app_registry(): assert ar.get(r) - class TestStickyCookieState: def _response(self, cookie, host): s = flow.StickyCookieState(filt.parse(".*")) @@ -115,7 +116,15 @@ class TestClientPlaybackState: class TestServerPlaybackState: def test_hash(self): - s = flow.ServerPlaybackState(None, [], False, False, None, False, None, False) + s = flow.ServerPlaybackState( + None, + [], + False, + False, + None, + False, + None, + False) r = tutils.tflow() r2 = tutils.tflow() @@ -131,7 +140,15 @@ class TestServerPlaybackState: assert s._hash(r) != s._hash(r2) def test_headers(self): - s = flow.ServerPlaybackState(["foo"], [], False, False, None, False, None, False) + s = flow.ServerPlaybackState( + ["foo"], + [], + False, + False, + None, + False, + None, + False) r = tutils.tflow(resp=True) r.request.headers["foo"] = ["bar"] r2 = tutils.tflow(resp=True) @@ -152,7 +169,9 @@ class TestServerPlaybackState: r2 = tutils.tflow(resp=True) r2.request.headers["key"] = ["two"] - s = flow.ServerPlaybackState(None, [r, r2], False, False, None, False, None, False) + s = flow.ServerPlaybackState( + None, [ + r, r2], False, False, None, False, None, False) assert s.count() == 2 assert len(s.fmap.keys()) == 1 @@ -173,34 +192,41 @@ class TestServerPlaybackState: r2 = tutils.tflow(resp=True) r2.request.headers["key"] = ["two"] - s = flow.ServerPlaybackState(None, [r, r2], False, True, None, False, None, False) + s = flow.ServerPlaybackState( + None, [ + r, r2], False, True, None, False, None, False) assert s.count() == 2 s.next_flow(r) assert s.count() == 2 - def test_ignore_params(self): - s = flow.ServerPlaybackState(None, [], False, False, ["param1", "param2"], False, None, False) + s = flow.ServerPlaybackState( + None, [], False, False, [ + "param1", "param2"], False, None, False) r = tutils.tflow(resp=True) - r.request.path="/test?param1=1" + r.request.path = "/test?param1=1" r2 = tutils.tflow(resp=True) - r2.request.path="/test" + r2.request.path = "/test" assert s._hash(r) == s._hash(r2) - r2.request.path="/test?param1=2" + r2.request.path = "/test?param1=2" assert s._hash(r) == s._hash(r2) - r2.request.path="/test?param2=1" + r2.request.path = "/test?param2=1" assert s._hash(r) == s._hash(r2) - r2.request.path="/test?param3=2" + r2.request.path = "/test?param3=2" assert not s._hash(r) == s._hash(r2) def test_ignore_payload_params(self): - s = flow.ServerPlaybackState(None, [], False, False, None, False, ["param1", "param2"], False) + s = flow.ServerPlaybackState( + None, [], False, False, None, False, [ + "param1", "param2"], False) r = tutils.tflow(resp=True) - r.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"] + r.request.headers[ + "Content-Type"] = ["application/x-www-form-urlencoded"] r.request.content = "paramx=x¶m1=1" r2 = tutils.tflow(resp=True) - r2.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"] + r2.request.headers[ + "Content-Type"] = ["application/x-www-form-urlencoded"] r2.request.content = "paramx=x¶m1=1" # same parameters assert s._hash(r) == s._hash(r2) @@ -208,20 +234,22 @@ class TestServerPlaybackState: r2.request.content = "paramx=x¶m1=2" assert s._hash(r) == s._hash(r2) # missing parameter - r2.request.content="paramx=x" + r2.request.content = "paramx=x" assert s._hash(r) == s._hash(r2) # ignorable parameter added - r2.request.content="paramx=x¶m1=2" + r2.request.content = "paramx=x¶m1=2" assert s._hash(r) == s._hash(r2) # not ignorable parameter changed - r2.request.content="paramx=y¶m1=1" + r2.request.content = "paramx=y¶m1=1" assert not s._hash(r) == s._hash(r2) # not ignorable parameter missing - r2.request.content="param1=1" + r2.request.content = "param1=1" assert not s._hash(r) == s._hash(r2) def test_ignore_payload_params_other_content_type(self): - s = flow.ServerPlaybackState(None, [], False, False, None, False, ["param1", "param2"], False) + s = flow.ServerPlaybackState( + None, [], False, False, None, False, [ + "param1", "param2"], False) r = tutils.tflow(resp=True) r.request.headers["Content-Type"] = ["application/json"] r.request.content = '{"param1":"1"}' @@ -235,19 +263,31 @@ class TestServerPlaybackState: assert not s._hash(r) == s._hash(r2) def test_ignore_payload_wins_over_params(self): - #NOTE: parameters are mutually exclusive in options - s = flow.ServerPlaybackState(None, [], False, False, None, True, ["param1", "param2"], False) + # NOTE: parameters are mutually exclusive in options + s = flow.ServerPlaybackState( + None, [], False, False, None, True, [ + "param1", "param2"], False) r = tutils.tflow(resp=True) - r.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"] + r.request.headers[ + "Content-Type"] = ["application/x-www-form-urlencoded"] r.request.content = "paramx=y" r2 = tutils.tflow(resp=True) - r2.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"] + r2.request.headers[ + "Content-Type"] = ["application/x-www-form-urlencoded"] r2.request.content = "paramx=x" # same parameters assert s._hash(r) == s._hash(r2) def test_ignore_content(self): - s = flow.ServerPlaybackState(None, [], False, False, None, False, None, False) + s = flow.ServerPlaybackState( + None, + [], + False, + False, + None, + False, + None, + False) r = tutils.tflow(resp=True) r2 = tutils.tflow(resp=True) @@ -257,8 +297,16 @@ class TestServerPlaybackState: r2.request.content = "bar" assert not s._hash(r) == s._hash(r2) - #now ignoring content - s = flow.ServerPlaybackState(None, [], False, False, None, True, None, False) + # now ignoring content + s = flow.ServerPlaybackState( + None, + [], + False, + False, + None, + True, + None, + False) r = tutils.tflow(resp=True) r2 = tutils.tflow(resp=True) r.request.content = "foo" @@ -272,14 +320,22 @@ class TestServerPlaybackState: assert s._hash(r) == s._hash(r2) def test_ignore_host(self): - s = flow.ServerPlaybackState(None, [], False, False, None, False, None, True) + s = flow.ServerPlaybackState( + None, + [], + False, + False, + None, + False, + None, + True) r = tutils.tflow(resp=True) r2 = tutils.tflow(resp=True) - r.request.host="address" - r2.request.host="address" + r.request.host = "address" + r2.request.host = "address" assert s._hash(r) == s._hash(r2) - r2.request.host="wrong_address" + r2.request.host = "wrong_address" assert s._hash(r) == s._hash(r2) @@ -343,12 +399,14 @@ class TestFlow: def test_getset_state(self): f = tutils.tflow(resp=True) state = f.get_state() - assert f.get_state() == protocol.http.HTTPFlow.from_state(state).get_state() + assert f.get_state() == protocol.http.HTTPFlow.from_state( + state).get_state() f.response = None f.error = Error("error") state = f.get_state() - assert f.get_state() == protocol.http.HTTPFlow.from_state(state).get_state() + assert f.get_state() == protocol.http.HTTPFlow.from_state( + state).get_state() f2 = f.copy() f2.id = f.id # copy creates a different uuid @@ -430,7 +488,6 @@ class TestFlow: assert f.response.content == "abarb" - class TestState: def test_backup(self): c = flow.State() @@ -519,7 +576,7 @@ class TestState: assert c.intercept_txt == "~q" assert "Invalid" in c.set_intercept("~") assert not c.set_intercept(None) - assert c.intercept_txt == None + assert c.intercept_txt is None def _add_request(self, state): f = tutils.tflow() @@ -608,7 +665,13 @@ class TestSerialize: def test_load_flows_reverse(self): r = self._treader() s = flow.State() - conf = ProxyConfig(mode="reverse", upstream_server=[True,True,"use-this-domain",80]) + conf = ProxyConfig( + mode="reverse", + upstream_server=[ + True, + True, + "use-this-domain", + 80]) fm = flow.FlowMaster(DummyServer(conf), s) fm.load_flows(r) assert s.flows[0].request.host == "use-this-domain" @@ -630,7 +693,6 @@ class TestSerialize: r = flow.FlowReader(sio) assert len(list(r.stream())) - def test_error(self): sio = StringIO() sio.write("bogus") @@ -661,7 +723,8 @@ class TestFlowMaster: assert not fm.load_script(tutils.test_data.path("scripts/a.py")) assert not fm.unload_scripts() assert fm.load_script("nonexistent") - assert "ValueError" in fm.load_script(tutils.test_data.path("scripts/starterr.py")) + assert "ValueError" in fm.load_script( + tutils.test_data.path("scripts/starterr.py")) assert len(fm.scripts) == 0 def test_getset_ignore(self): @@ -707,14 +770,14 @@ class TestFlowMaster: assert fm.scripts[0].ns["log"][-1] == "request" fm.handle_response(f) assert fm.scripts[0].ns["log"][-1] == "response" - #load second script + # load second script assert not fm.load_script(tutils.test_data.path("scripts/all.py")) assert len(fm.scripts) == 2 fm.handle_clientdisconnect(f.server_conn) assert fm.scripts[0].ns["log"][-1] == "clientdisconnect" assert fm.scripts[1].ns["log"][-1] == "clientdisconnect" - #unload first script + # unload first script fm.unload_scripts() assert len(fm.scripts) == 0 assert not fm.load_script(tutils.test_data.path("scripts/all.py")) @@ -765,7 +828,16 @@ class TestFlowMaster: f = tutils.tflow(resp=True) pb = [tutils.tflow(resp=True), f] fm = flow.FlowMaster(DummyServer(ProxyConfig()), s) - assert not fm.start_server_playback(pb, False, [], False, False, None, False, None, False) + assert not fm.start_server_playback( + pb, + False, + [], + False, + False, + None, + False, + None, + False) assert not fm.start_client_playback(pb, False) fm.client_playback.testing = True @@ -788,16 +860,43 @@ class TestFlowMaster: fm.refresh_server_playback = True assert not fm.do_server_playback(tutils.tflow()) - fm.start_server_playback(pb, False, [], False, False, None, False, None, False) + fm.start_server_playback( + pb, + False, + [], + False, + False, + None, + False, + None, + False) assert fm.do_server_playback(tutils.tflow()) - fm.start_server_playback(pb, False, [], True, False, None, False, None, False) + fm.start_server_playback( + pb, + False, + [], + True, + False, + None, + False, + None, + False) r = tutils.tflow() r.request.content = "gibble" assert not fm.do_server_playback(r) assert fm.do_server_playback(tutils.tflow()) - fm.start_server_playback(pb, False, [], True, False, None, False, None, False) + fm.start_server_playback( + pb, + False, + [], + True, + False, + None, + False, + None, + False) q = Queue.Queue() fm.tick(q, 0) assert fm.should_exit.is_set() @@ -812,7 +911,16 @@ class TestFlowMaster: pb = [f] fm = flow.FlowMaster(None, s) fm.refresh_server_playback = True - fm.start_server_playback(pb, True, [], False, False, None, False, None, False) + fm.start_server_playback( + pb, + True, + [], + False, + False, + None, + False, + None, + False) f = tutils.tflow() f.request.host = "nonexistent" @@ -862,8 +970,9 @@ class TestFlowMaster: def test_stream(self): with tutils.tmpdir() as tdir: p = os.path.join(tdir, "foo") + def r(): - r = flow.FlowReader(open(p,"rb")) + r = flow.FlowReader(open(p, "rb")) return list(r.stream()) s = flow.State() @@ -884,6 +993,7 @@ class TestFlowMaster: assert not r()[1].response + class TestRequest: def test_simple(self): f = tutils.tflow() @@ -919,7 +1029,7 @@ class TestRequest: r.host = "address" r.port = 22 - assert r.url== "https://address:22/path" + assert r.url == "https://address:22/path" assert r.pretty_url(True) == "https://address:22/path" r.headers["Host"] = ["foo.com"] @@ -1062,6 +1172,7 @@ class TestRequest: resp.headers = h assert resp.headers.get_first("content-type") == "text/plain" + class TestResponse: def test_simple(self): f = tutils.tflow(resp=True) @@ -1077,7 +1188,9 @@ class TestResponse: assert resp.size() == len(resp.assemble()) resp.content = CONTENT_MISSING - tutils.raises("Cannot assemble flow with CONTENT_MISSING", resp.assemble) + tutils.raises( + "Cannot assemble flow with CONTENT_MISSING", + resp.assemble) def test_refresh(self): r = tutils.tresp() @@ -1086,14 +1199,15 @@ class TestResponse: pre = r.headers["date"] r.refresh(n) assert pre == r.headers["date"] - r.refresh(n+60) + r.refresh(n + 60) d = email.utils.parsedate_tz(r.headers["date"][0]) d = email.utils.mktime_tz(d) # Weird that this is not exact... - assert abs(60-(d-n)) <= 1 + assert abs(60 - (d - n)) <= 1 - r.headers["set-cookie"] = ["MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"] + r.headers[ + "set-cookie"] = ["MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"] r.refresh() def test_refresh_cookie(self): @@ -1146,7 +1260,7 @@ class TestResponse: def test_header_size(self): r = tutils.tresp() result = len(r._assemble_headers()) - assert result==44 + assert result == 44 def test_get_content_type(self): h = odict.ODictCaseless() @@ -1178,7 +1292,7 @@ class TestClientConnection: c = tutils.tclient_conn() assert ClientConnection.from_state(c.get_state()).get_state() ==\ - c.get_state() + c.get_state() c2 = tutils.tclient_conn() c2.address.address = (c2.address.host, 4242) @@ -1295,7 +1409,6 @@ def test_setheaders(): h.run(f) assert f.request.content == "foo" - h.clear() h.add("~s", "one", "two") h.add("~s", "one", "three") diff --git a/test/test_fuzzing.py b/test/test_fuzzing.py index 3e5fc100..5e5115c9 100644 --- a/test/test_fuzzing.py +++ b/test/test_fuzzing.py @@ -5,16 +5,17 @@ import tservers after being fixed to check for regressions. """ + class TestFuzzy(tservers.HTTPProxTest): def test_idna_err(self): req = r'get:"http://localhost:%s":i10,"\xc6"' p = self.pathoc() - assert p.request(req%self.server.port).status_code == 400 + assert p.request(req % self.server.port).status_code == 400 def test_nullbytes(self): req = r'get:"http://localhost:%s":i19,"\x00"' p = self.pathoc() - assert p.request(req%self.server.port).status_code == 400 + assert p.request(req % self.server.port).status_code == 400 def test_invalid_ports(self): req = 'get:"http://localhost:999999"' @@ -24,12 +25,12 @@ class TestFuzzy(tservers.HTTPProxTest): def test_invalid_ipv6_url(self): req = 'get:"http://localhost:%s":i13,"["' p = self.pathoc() - assert p.request(req%self.server.port).status_code == 400 + assert p.request(req % self.server.port).status_code == 400 def test_invalid_upstream(self): req = r"get:'http://localhost:%s/p/200:i10,\x27+\x27'" p = self.pathoc() - assert p.request(req%self.server.port).status_code == 502 + assert p.request(req % self.server.port).status_code == 502 def test_upstream_disconnect(self): req = r'200:d0' diff --git a/test/test_platform_pf.py b/test/test_platform_pf.py index 2c4870f9..3250b035 100644 --- a/test/test_platform_pf.py +++ b/test/test_platform_pf.py @@ -1,4 +1,5 @@ -import tutils, sys +import tutils +import sys from libmproxy.platform import pf @@ -6,10 +7,20 @@ class TestLookup: def test_simple(self): if sys.platform == "freebsd10": p = tutils.test_data.path("data/pf02") - d = open(p,"rb").read() + d = open(p, "rb").read() else: p = tutils.test_data.path("data/pf01") - d = open(p,"rb").read() + d = open(p, "rb").read() assert pf.lookup("192.168.1.111", 40000, d) == ("5.5.5.5", 80) - tutils.raises("Could not resolve original destination", pf.lookup, "192.168.1.112", 40000, d) - tutils.raises("Could not resolve original destination", pf.lookup, "192.168.1.111", 40001, d) + tutils.raises( + "Could not resolve original destination", + pf.lookup, + "192.168.1.112", + 40000, + d) + tutils.raises( + "Could not resolve original destination", + pf.lookup, + "192.168.1.111", + 40001, + d) diff --git a/test/test_protocol_http.py b/test/test_protocol_http.py index c39f9abb..884a528e 100644 --- a/test/test_protocol_http.py +++ b/test/test_protocol_http.py @@ -61,7 +61,8 @@ class TestHTTPRequest: assert "Host" in r.headers def test_expect_header(self): - s = StringIO("GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar") + s = StringIO( + "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar") w = StringIO() r = HTTPRequest.from_stream(s, wfile=w) assert w.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" @@ -84,7 +85,8 @@ class TestHTTPRequest: tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s) s = StringIO("GET http://address:22/ HTTP/1.1") r = HTTPRequest.from_stream(s) - assert r.assemble() == "GET http://address:22/ HTTP/1.1\r\nHost: address:22\r\nContent-Length: 0\r\n\r\n" + assert r.assemble( + ) == "GET http://address:22/ HTTP/1.1\r\nHost: address:22\r\nContent-Length: 0\r\n\r\n" def test_http_options_relative_form_in(self): """ @@ -105,10 +107,10 @@ class TestHTTPRequest: r.host = 'address' r.port = 80 r.scheme = "http" - assert r.assemble() == ("OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" - "Host: address\r\n" - "Content-Length: 0\r\n\r\n") - + assert r.assemble() == ( + "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" + "Host: address\r\n" + "Content-Length: 0\r\n\r\n") def test_assemble_unknown_form(self): r = tutils.treq() @@ -257,7 +259,8 @@ class TestHTTPResponse: def test_get_cookies_with_parameters(self): h = odict.ODictCaseless() - h["Set-Cookie"] = ["cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"] + h["Set-Cookie"] = [ + "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"] resp = tutils.tresp() resp.headers = h result = resp.get_cookies() diff --git a/test/test_proxy.py b/test/test_proxy.py index 641b4f47..a618ae6c 100644 --- a/test/test_proxy.py +++ b/test/test_proxy.py @@ -78,7 +78,6 @@ class TestProcessProxyOptions: def test_no_transparent(self): self.assert_err("transparent mode not supported", "-T") - @mock.patch("libmproxy.platform.resolver") def test_modes(self, _): self.assert_noerr("-R", "http://localhost") @@ -96,28 +95,42 @@ class TestProcessProxyOptions: def test_client_certs(self): with tutils.tmpdir() as cadir: self.assert_noerr("--client-certs", cadir) - self.assert_err("directory does not exist", "--client-certs", "nonexistent") + self.assert_err( + "directory does not exist", + "--client-certs", + "nonexistent") def test_certs(self): with tutils.tmpdir() as cadir: - self.assert_noerr("--cert", tutils.test_data.path("data/testkey.pem")) + self.assert_noerr( + "--cert", + tutils.test_data.path("data/testkey.pem")) self.assert_err("does not exist", "--cert", "nonexistent") def test_auth(self): p = self.assert_noerr("--nonanonymous") assert p.authenticator - p = self.assert_noerr("--htpasswd", tutils.test_data.path("data/htpasswd")) + p = self.assert_noerr( + "--htpasswd", + tutils.test_data.path("data/htpasswd")) assert p.authenticator - self.assert_err("malformed htpasswd file", "--htpasswd", tutils.test_data.path("data/htpasswd.invalid")) + self.assert_err( + "malformed htpasswd file", + "--htpasswd", + tutils.test_data.path("data/htpasswd.invalid")) p = self.assert_noerr("--singleuser", "test:test") assert p.authenticator - self.assert_err("invalid single-user specification", "--singleuser", "test") + self.assert_err( + "invalid single-user specification", + "--singleuser", + "test") class TestProxyServer: - @tutils.SkipWindows # binding to 0.0.0.0:1 works without special permissions on Windows + # binding to 0.0.0.0:1 works without special permissions on Windows + @tutils.SkipWindows def test_err(self): conf = ProxyConfig( port=1 @@ -142,6 +155,12 @@ class TestConnectionHandler: def test_fatal_error(self): config = mock.Mock() config.mode.get_upstream_server.side_effect = RuntimeError - c = ConnectionHandler(config, mock.MagicMock(), ("127.0.0.1", 8080), None, mock.MagicMock()) + c = ConnectionHandler( + config, + mock.MagicMock(), + ("127.0.0.1", + 8080), + None, + mock.MagicMock()) with tutils.capture_stderr(c.handle) as output: assert "mitmproxy has crashed" in output diff --git a/test/test_script.py b/test/test_script.py index aed7def1..0a063740 100644 --- a/test/test_script.py +++ b/test/test_script.py @@ -11,7 +11,7 @@ class TestScript: s = flow.State() fm = flow.FlowMaster(None, s) sp = tutils.test_data.path("scripts/a.py") - p = script.Script("%s --var 40"%sp, fm) + p = script.Script("%s --var 40" % sp, fm) assert "here" in p.ns assert p.run("here") == (True, 41) @@ -79,7 +79,9 @@ class TestScript: def test_concurrent2(self): s = flow.State() fm = flow.FlowMaster(None, s) - s = script.Script(tutils.test_data.path("scripts/concurrent_decorator.py"), fm) + s = script.Script( + tutils.test_data.path("scripts/concurrent_decorator.py"), + fm) s.load() m = mock.Mock() @@ -110,8 +112,9 @@ class TestScript: fm = flow.FlowMaster(None, s) tutils.raises( "decorator not supported for this method", - script.Script, tutils.test_data.path("scripts/concurrent_decorator_err.py"), fm - ) + script.Script, + tutils.test_data.path("scripts/concurrent_decorator_err.py"), + fm) def test_command_parsing(): @@ -120,5 +123,3 @@ def test_command_parsing(): absfilepath = os.path.normcase(tutils.test_data.path("scripts/a.py")) s = script.Script(absfilepath, fm) assert os.path.isfile(s.argv[0]) - - diff --git a/test/test_server.py b/test/test_server.py index 7f93c729..2ab48422 100644 --- a/test/test_server.py +++ b/test/test_server.py @@ -1,10 +1,12 @@ -import socket, time +import socket +import time from libmproxy.proxy.config import HostMatcher import libpathod from netlib import tcp, http_auth, http from libpathod import pathoc, pathod from netlib.certutils import SSLCert -import tutils, tservers +import tutils +import tservers from libmproxy.protocol import KILL, Error from libmproxy.protocol.http import CONTENT_MISSING @@ -16,9 +18,10 @@ from libmproxy.protocol.http import CONTENT_MISSING for a 200 response. """ + class CommonMixin: def test_large(self): - assert len(self.pathod("200:b@50k").content) == 1024*50 + assert len(self.pathod("200:b@50k").content) == 1024 * 50 @staticmethod def wait_until_not_live(flow): @@ -56,7 +59,8 @@ class CommonMixin: # Port error l.request.port = 1 # In upstream mode, we get a 502 response from the upstream proxy server. - # In upstream mode with ssl, the replay will fail as we cannot establish SSL with the upstream proxy. + # In upstream mode with ssl, the replay will fail as we cannot establish + # SSL with the upstream proxy. rt = self.master.replay_request(l, block=True) assert not rt if isinstance(self, tservers.HTTPUpstreamProxTest) and not self.ssl: @@ -68,7 +72,9 @@ class CommonMixin: f = self.pathod("304") assert f.status_code == 304 - l = self.master.state.view[-1] # In Upstream mode with SSL, we may already have a previous CONNECT request. + # In Upstream mode with SSL, we may already have a previous CONNECT + # request. + l = self.master.state.view[-1] assert l.client_conn.address assert "host" in l.request.headers assert l.response.code == 304 @@ -90,11 +96,13 @@ class CommonMixin: log = self.server.last_log() assert log["request"]["sni"] == "testserver.com" + class TcpMixin: def _ignore_on(self): assert not hasattr(self, "_ignore_backup") self._ignore_backup = self.config.check_ignore - self.config.check_ignore = HostMatcher([".+:%s" % self.server.port] + self.config.check_ignore.patterns) + self.config.check_ignore = HostMatcher( + [".+:%s" % self.server.port] + self.config.check_ignore.patterns) def _ignore_off(self): assert hasattr(self, "_ignore_backup") @@ -125,22 +133,26 @@ class TcpMixin: # Test Non-HTTP traffic spec = "200:i0,@100:d0" # this results in just 100 random bytes - assert self.pathod(spec).status_code == 502 # mitmproxy responds with bad gateway + # mitmproxy responds with bad gateway + assert self.pathod(spec).status_code == 502 self._ignore_on() - tutils.raises("invalid server response", self.pathod, spec) # pathoc tries to parse answer as HTTP + tutils.raises( + "invalid server response", + self.pathod, + spec) # pathoc tries to parse answer as HTTP self._ignore_off() def _tcpproxy_on(self): assert not hasattr(self, "_tcpproxy_backup") self._tcpproxy_backup = self.config.check_tcp - self.config.check_tcp = HostMatcher([".+:%s" % self.server.port] + self.config.check_tcp.patterns) + self.config.check_tcp = HostMatcher( + [".+:%s" % self.server.port] + self.config.check_tcp.patterns) def _tcpproxy_off(self): assert hasattr(self, "_tcpproxy_backup") self.config.check_ignore = self._tcpproxy_backup del self._tcpproxy_backup - def test_tcp(self): spec = '304:h"Alternate-Protocol"="mitmproxy-will-remove-this"' n = self.pathod(spec) @@ -165,6 +177,7 @@ class TcpMixin: # Make sure that TCP messages are in the event log. assert any("mitmproxy-will-remove-this" in m for m in self.master.log) + class AppMixin: def test_app(self): ret = self.app("/") @@ -188,30 +201,30 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin): def test_upstream_ssl_error(self): p = self.pathoc() - ret = p.request("get:'https://localhost:%s/'"%self.server.port) + ret = p.request("get:'https://localhost:%s/'" % self.server.port) assert ret.status_code == 400 def test_connection_close(self): # Add a body, so we have a content-length header, which combined with # HTTP1.1 means the connection is kept alive. - response = '%s/p/200:b@1'%self.server.urlbase + response = '%s/p/200:b@1' % self.server.urlbase # Lets sanity check that the connection does indeed stay open by # issuing two requests over the same connection p = self.pathoc() - assert p.request("get:'%s'"%response) - assert p.request("get:'%s'"%response) + assert p.request("get:'%s'" % response) + assert p.request("get:'%s'" % response) # Now check that the connection is closed as the client specifies p = self.pathoc() - assert p.request("get:'%s':h'Connection'='close'"%response) + assert p.request("get:'%s':h'Connection'='close'" % response) # There's a race here, which means we can get any of a number of errors. # Rather than introduce yet another sleep into the test suite, we just # relax the Exception specification. - tutils.raises(Exception, p.request, "get:'%s'"%response) + tutils.raises(Exception, p.request, "get:'%s'" % response) def test_reconnect(self): - req = "get:'%s/p/200:b@1:da'"%self.server.urlbase + req = "get:'%s/p/200:b@1:da'" % self.server.urlbase p = self.pathoc() assert p.request(req) # Server has disconnected. Mitmproxy should detect this, and reconnect. @@ -225,8 +238,8 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin): return True req = "get:'%s/p/200:b@1'" p = self.pathoc() - assert p.request(req%self.server.urlbase) - assert p.request(req%self.server2.urlbase) + assert p.request(req % self.server.urlbase) + assert p.request(req % self.server2.urlbase) assert switched(self.proxy.log) def test_get_connection_err(self): @@ -237,7 +250,7 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin): def test_blank_leading_line(self): p = self.pathoc() req = "get:'%s/p/201':i0,'\r\n'" - assert p.request(req%self.server.urlbase).status_code == 201 + assert p.request(req % self.server.urlbase).status_code == 201 def test_invalid_headers(self): p = self.pathoc() @@ -251,7 +264,9 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin): connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) connection.connect(("127.0.0.1", self.proxy.port)) spec = '301:h"Transfer-Encoding"="chunked":r:b"0\\r\\n\\r\\n"' - connection.send("GET http://localhost:%d/p/%s HTTP/1.1\r\n"%(self.server.port, spec)) + connection.send( + "GET http://localhost:%d/p/%s HTTP/1.1\r\n" % + (self.server.port, spec)) connection.send("\r\n") resp = connection.recv(50000) connection.close() @@ -270,13 +285,20 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin): self.master.set_stream_large_bodies(None) def test_stream_modify(self): - self.master.load_script(tutils.test_data.path("scripts/stream_modify.py")) + self.master.load_script( + tutils.test_data.path("scripts/stream_modify.py")) d = self.pathod('200:b"foo"') assert d.content == "bar" self.master.unload_scripts() + class TestHTTPAuth(tservers.HTTPProxTest): - authenticator = http_auth.BasicProxyAuth(http_auth.PassManSingleUser("test", "test"), "realm") + authenticator = http_auth.BasicProxyAuth( + http_auth.PassManSingleUser( + "test", + "test"), + "realm") + def test_auth(self): assert self.pathod("202").status_code == 407 p = self.pathoc() @@ -284,7 +306,7 @@ class TestHTTPAuth(tservers.HTTPProxTest): get 'http://localhost:%s/p/202' h'%s'='%s' - """%( + """ % ( self.server.port, http_auth.BasicProxyAuth.AUTH_HEADER, http.assemble_http_basic_auth("basic", "test", "test") @@ -294,6 +316,7 @@ class TestHTTPAuth(tservers.HTTPProxTest): class TestHTTPConnectSSLError(tservers.HTTPProxTest): certfile = True + def test_go(self): self.config.ssl_ports.append(self.proxy.port) p = self.pathoc_raw() @@ -306,6 +329,7 @@ class TestHTTPS(tservers.HTTPProxTest, CommonMixin, TcpMixin): ssl = True ssloptions = pathod.SSLOptions(request_client_cert=True) clientcerts = True + def test_clientcert(self): f = self.pathod("304") assert f.status_code == 304 @@ -319,6 +343,7 @@ class TestHTTPS(tservers.HTTPProxTest, CommonMixin, TcpMixin): class TestHTTPSCertfile(tservers.HTTPProxTest, CommonMixin): ssl = True certfile = True + def test_certfile(self): assert self.pathod("304") @@ -328,11 +353,12 @@ class TestHTTPSNoCommonName(tservers.HTTPProxTest): Test what happens if we get a cert without common name back. """ ssl = True - ssloptions=pathod.SSLOptions( - certs = [ - ("*", tutils.test_data.path("data/no_common_name.pem")) - ] - ) + ssloptions = pathod.SSLOptions( + certs = [ + ("*", tutils.test_data.path("data/no_common_name.pem")) + ] + ) + def test_http(self): f = self.pathod("202") assert f.sslinfo.certchain[0].get_subject().CN == "127.0.0.1" @@ -373,7 +399,6 @@ class TestHttps2Http(tservers.ReverseProxTest): assert p.request("get:'/p/200'").status_code == 400 - class TestTransparent(tservers.TransparentProxTest, CommonMixin, TcpMixin): ssl = False @@ -413,15 +438,19 @@ class TestProxy(tservers.HTTPProxTest): connection.connect(("127.0.0.1", self.proxy.port)) # call pathod server, wait a second to complete the request - connection.send("GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n"%self.server.port) + connection.send( + "GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n" % + self.server.port) time.sleep(1) connection.send("\r\n") connection.recv(50000) connection.close() - request, response = self.master.state.view[0].request, self.master.state.view[0].response + request, response = self.master.state.view[ + 0].request, self.master.state.view[0].response assert response.code == 304 # sanity test for our low level request - assert 0.95 < (request.timestamp_end - request.timestamp_start) < 1.2 #time.sleep might be a little bit shorter than a second + # time.sleep might be a little bit shorter than a second + assert 0.95 < (request.timestamp_end - request.timestamp_start) < 1.2 def test_request_timestamps_not_affected_by_client_time(self): # test that don't include user wait time in request's timestamps @@ -441,10 +470,14 @@ class TestProxy(tservers.HTTPProxTest): # tests that the client_conn a tcp connection has a tcp_setup_timestamp connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) connection.connect(("localhost", self.proxy.port)) - connection.send("GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n"%self.server.port) + connection.send( + "GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n" % + self.server.port) connection.send("\r\n") connection.recv(5000) - connection.send("GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n"%self.server.port) + connection.send( + "GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n" % + self.server.port) connection.send("\r\n") connection.recv(5000) connection.close() @@ -462,8 +495,10 @@ class TestProxy(tservers.HTTPProxTest): f = self.master.state.view[0] assert f.server_conn.address == ("127.0.0.1", self.server.port) + class TestProxySSL(tservers.HTTPProxTest): - ssl=True + ssl = True + def test_request_ssl_setup_timestamp_presence(self): # tests that the ssl timestamp is present when ssl is used f = self.pathod("304:b@10k") @@ -479,16 +514,24 @@ class MasterRedirectRequest(tservers.TestMaster): request = f.request if request.path == "/p/201": addr = f.live.c.server_conn.address - assert f.live.change_server(("127.0.0.1", self.redirect_port), ssl=False) - assert not f.live.change_server(("127.0.0.1", self.redirect_port), ssl=False) - tutils.raises("SSL handshake error", f.live.change_server, ("127.0.0.1", self.redirect_port), ssl=True) + assert f.live.change_server( + ("127.0.0.1", self.redirect_port), ssl=False) + assert not f.live.change_server( + ("127.0.0.1", self.redirect_port), ssl=False) + tutils.raises( + "SSL handshake error", + f.live.change_server, + ("127.0.0.1", + self.redirect_port), + ssl=True) assert f.live.change_server(addr, ssl=False) request.url = "http://127.0.0.1:%s/p/201" % self.redirect_port tservers.TestMaster.handle_request(self, f) def handle_response(self, f): f.response.content = str(f.client_conn.address.port) - f.response.headers["server-conn-id"] = [str(f.server_conn.source_address.port)] + f.response.headers[ + "server-conn-id"] = [str(f.server_conn.source_address.port)] tservers.TestMaster.handle_response(self, f) @@ -502,37 +545,41 @@ class TestRedirectRequest(tservers.HTTPProxTest): self.server.clear_log() self.server2.clear_log() - r1 = p.request("get:'%s/p/200'"%self.server.urlbase) + r1 = p.request("get:'%s/p/200'" % self.server.urlbase) assert r1.status_code == 200 assert self.server.last_log() assert not self.server2.last_log() self.server.clear_log() self.server2.clear_log() - r2 = p.request("get:'%s/p/201'"%self.server.urlbase) + r2 = p.request("get:'%s/p/201'" % self.server.urlbase) assert r2.status_code == 201 assert not self.server.last_log() assert self.server2.last_log() self.server.clear_log() self.server2.clear_log() - r3 = p.request("get:'%s/p/202'"%self.server.urlbase) + r3 = p.request("get:'%s/p/202'" % self.server.urlbase) assert r3.status_code == 202 assert self.server.last_log() assert not self.server2.last_log() assert r1.content == r2.content == r3.content - assert r1.headers.get_first("server-conn-id") == r3.headers.get_first("server-conn-id") + assert r1.headers.get_first( + "server-conn-id") == r3.headers.get_first("server-conn-id") # Make sure that we actually use the same connection in this test case + class MasterStreamRequest(tservers.TestMaster): """ Enables the stream flag on the flow for all requests """ + def handle_responseheaders(self, f): f.response.stream = True f.reply() + class TestStreamRequest(tservers.HTTPProxTest): masterclass = MasterStreamRequest @@ -541,7 +588,7 @@ class TestStreamRequest(tservers.HTTPProxTest): # a request with 100k of data but without content-length self.server.clear_log() - r1 = p.request("get:'%s/p/200:r:b@100k:d102400'"%self.server.urlbase) + r1 = p.request("get:'%s/p/200:r:b@100k:d102400'" % self.server.urlbase) assert r1.status_code == 200 assert len(r1.content) > 100000 assert self.server.last_log() @@ -551,13 +598,13 @@ class TestStreamRequest(tservers.HTTPProxTest): # simple request with streaming turned on self.server.clear_log() - r1 = p.request("get:'%s/p/200'"%self.server.urlbase) + r1 = p.request("get:'%s/p/200'" % self.server.urlbase) assert r1.status_code == 200 assert self.server.last_log() # now send back 100k of data, streamed but not chunked self.server.clear_log() - r1 = p.request("get:'%s/p/200:b@100k'"%self.server.urlbase) + r1 = p.request("get:'%s/p/200:b@100k'" % self.server.urlbase) assert r1.status_code == 200 assert self.server.last_log() @@ -567,15 +614,27 @@ class TestStreamRequest(tservers.HTTPProxTest): connection.connect(("127.0.0.1", self.proxy.port)) fconn = connection.makefile() spec = '200:h"Transfer-Encoding"="chunked":r:b"4\\r\\nthis\\r\\n7\\r\\nisatest\\r\\n0\\r\\n\\r\\n"' - connection.send("GET %s/p/%s HTTP/1.1\r\n"%(self.server.urlbase, spec)) + connection.send( + "GET %s/p/%s HTTP/1.1\r\n" % + (self.server.urlbase, spec)) connection.send("\r\n") - httpversion, code, msg, headers, content = http.read_response(fconn, "GET", None, include_body=False) + httpversion, code, msg, headers, content = http.read_response( + fconn, "GET", None, include_body=False) assert headers["Transfer-Encoding"][0] == 'chunked' assert code == 200 - chunks = list(content for _, content, _ in http.read_http_body_chunked(fconn, headers, None, "GET", 200, False)) + chunks = list( + content for _, + content, + _ in http.read_http_body_chunked( + fconn, + headers, + None, + "GET", + 200, + False)) assert chunks == ["this", "isatest", ""] connection.close() @@ -589,6 +648,7 @@ class MasterFakeResponse(tservers.TestMaster): class TestFakeResponse(tservers.HTTPProxTest): masterclass = MasterFakeResponse + def test_fake(self): f = self.pathod("200") assert "header_response" in f.headers.keys() @@ -601,6 +661,7 @@ class MasterKillRequest(tservers.TestMaster): class TestKillRequest(tservers.HTTPProxTest): masterclass = MasterKillRequest + def test_kill(self): tutils.raises("server disconnect", self.pathod, "200") # Nothing should have hit the server @@ -614,6 +675,7 @@ class MasterKillResponse(tservers.TestMaster): class TestKillResponse(tservers.HTTPProxTest): masterclass = MasterKillResponse + def test_kill(self): tutils.raises("server disconnect", self.pathod, "200") # The server should have seen a request @@ -627,6 +689,7 @@ class EResolver(tservers.TResolver): class TestTransparentResolveError(tservers.TransparentProxTest): resolver = EResolver + def test_resolve_error(self): assert self.pathod("304").status_code == 502 @@ -640,6 +703,7 @@ class MasterIncomplete(tservers.TestMaster): class TestIncompleteResponse(tservers.HTTPProxTest): masterclass = MasterIncomplete + def test_incomplete(self): assert self.pathod("200").status_code == 502 @@ -656,10 +720,16 @@ class TestUpstreamProxy(tservers.HTTPUpstreamProxTest, CommonMixin, AppMixin): ssl = False def test_order(self): - self.proxy.tmaster.replacehooks.add("~q", "foo", "bar") # replace in request + self.proxy.tmaster.replacehooks.add( + "~q", + "foo", + "bar") # replace in request self.chain[0].tmaster.replacehooks.add("~q", "bar", "baz") self.chain[1].tmaster.replacehooks.add("~q", "foo", "oh noes!") - self.chain[0].tmaster.replacehooks.add("~s", "baz", "ORLY") # replace in response + self.chain[0].tmaster.replacehooks.add( + "~s", + "baz", + "ORLY") # replace in response p = self.pathoc() req = p.request("get:'%s/p/418:b\"foo\"'" % self.server.urlbase) @@ -667,7 +737,10 @@ class TestUpstreamProxy(tservers.HTTPUpstreamProxTest, CommonMixin, AppMixin): assert req.status_code == 418 -class TestUpstreamProxySSL(tservers.HTTPUpstreamProxTest, CommonMixin, TcpMixin): +class TestUpstreamProxySSL( + tservers.HTTPUpstreamProxTest, + CommonMixin, + TcpMixin): ssl = True def _host_pattern_on(self, attr): @@ -677,7 +750,10 @@ class TestUpstreamProxySSL(tservers.HTTPUpstreamProxTest, CommonMixin, TcpMixin) assert not hasattr(self, "_ignore_%s_backup" % attr) backup = [] for proxy in self.chain: - old_matcher = getattr(proxy.tmaster.server.config, "check_%s" % attr) + old_matcher = getattr( + proxy.tmaster.server.config, + "check_%s" % + attr) backup.append(old_matcher) setattr( proxy.tmaster.server.config, @@ -721,11 +797,14 @@ class TestUpstreamProxySSL(tservers.HTTPUpstreamProxTest, CommonMixin, TcpMixin) assert req.content == "content" assert req.status_code == 418 - assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT from pathoc to chain[0], - # request from pathoc to chain[0] - assert self.chain[0].tmaster.state.flow_count() == 2 # CONNECT from proxy to chain[1], - # request from proxy to chain[1] - assert self.chain[1].tmaster.state.flow_count() == 1 # request from chain[0] (regular proxy doesn't store CONNECTs) + # CONNECT from pathoc to chain[0], + assert self.proxy.tmaster.state.flow_count() == 2 + # request from pathoc to chain[0] + # CONNECT from proxy to chain[1], + assert self.chain[0].tmaster.state.flow_count() == 2 + # request from proxy to chain[1] + # request from chain[0] (regular proxy doesn't store CONNECTs) + assert self.chain[1].tmaster.state.flow_count() == 1 def test_closing_connect_response(self): """ @@ -755,6 +834,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest): def kill_requests(master, attr, exclude): k = [0] # variable scope workaround: put into array _func = getattr(master, attr) + def handler(f): k[0] += 1 if not (k[0] in exclude): @@ -766,9 +846,9 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest): kill_requests(self.chain[1].tmaster, "handle_request", exclude=[ - # fail first request + # fail first request 2, # allow second request - ]) + ]) kill_requests(self.chain[0].tmaster, "handle_request", exclude=[ @@ -776,16 +856,18 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest): # fail first request 3, # reCONNECT 4, # request - ]) + ]) p = self.pathoc() req = p.request("get:'/p/418:b\"content\"'") assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT and request - assert self.chain[0].tmaster.state.flow_count() == 4 # CONNECT, failing request, - # reCONNECT, request - assert self.chain[1].tmaster.state.flow_count() == 2 # failing request, request - # (doesn't store (repeated) CONNECTs from chain[0] - # as it is a regular proxy) + # CONNECT, failing request, + assert self.chain[0].tmaster.state.flow_count() == 4 + # reCONNECT, request + # failing request, request + assert self.chain[1].tmaster.state.flow_count() == 2 + # (doesn't store (repeated) CONNECTs from chain[0] + # as it is a regular proxy) assert req.content == "content" assert req.status_code == 418 @@ -795,18 +877,26 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest): assert self.proxy.tmaster.state.flows[0].request.form_in == "authority" assert self.proxy.tmaster.state.flows[1].request.form_in == "relative" - assert self.chain[0].tmaster.state.flows[0].request.form_in == "authority" - assert self.chain[0].tmaster.state.flows[1].request.form_in == "relative" - assert self.chain[0].tmaster.state.flows[2].request.form_in == "authority" - assert self.chain[0].tmaster.state.flows[3].request.form_in == "relative" + assert self.chain[0].tmaster.state.flows[ + 0].request.form_in == "authority" + assert self.chain[0].tmaster.state.flows[ + 1].request.form_in == "relative" + assert self.chain[0].tmaster.state.flows[ + 2].request.form_in == "authority" + assert self.chain[0].tmaster.state.flows[ + 3].request.form_in == "relative" - assert self.chain[1].tmaster.state.flows[0].request.form_in == "relative" - assert self.chain[1].tmaster.state.flows[1].request.form_in == "relative" + assert self.chain[1].tmaster.state.flows[ + 0].request.form_in == "relative" + assert self.chain[1].tmaster.state.flows[ + 1].request.form_in == "relative" req = p.request("get:'/p/418:b\"content2\"'") assert req.status_code == 502 assert self.proxy.tmaster.state.flow_count() == 3 # + new request - assert self.chain[0].tmaster.state.flow_count() == 6 # + new request, repeated CONNECT from chain[1] - # (both terminated) - assert self.chain[1].tmaster.state.flow_count() == 2 # nothing happened here + # + new request, repeated CONNECT from chain[1] + assert self.chain[0].tmaster.state.flow_count() == 6 + # (both terminated) + # nothing happened here + assert self.chain[1].tmaster.state.flow_count() == 2 diff --git a/test/test_utils.py b/test/test_utils.py index 6b9262a0..0c514f5d 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -48,9 +48,11 @@ def test_urldecode(): s = "one=two&three=four" assert len(utils.urldecode(s)) == 2 + def test_multipartdecode(): boundary = 'somefancyboundary' - headers = odict.ODict([('content-type', ('multipart/form-data; boundary=%s' % boundary))]) + headers = odict.ODict( + [('content-type', ('multipart/form-data; boundary=%s' % boundary))]) content = "--{0}\n" \ "Content-Disposition: form-data; name=\"field1\"\n\n" \ "value1\n" \ @@ -65,6 +67,7 @@ def test_multipartdecode(): assert form[0] == ('field1', 'value1') assert form[1] == ('field2', 'value2') + def test_pretty_duration(): assert utils.pretty_duration(0.00001) == "0ms" assert utils.pretty_duration(0.0001) == "0ms" @@ -79,10 +82,13 @@ def test_pretty_duration(): assert utils.pretty_duration(1.123) == "1.12s" assert utils.pretty_duration(0.123) == "123ms" + def test_LRUCache(): cache = utils.LRUCache(2) + class Foo: ran = False + def gen(self, x): self.ran = True return x diff --git a/test/tools/bench.py b/test/tools/bench.py index 1028f61d..8127d083 100644 --- a/test/tools/bench.py +++ b/test/tools/bench.py @@ -1,5 +1,6 @@ from __future__ import print_function -import requests, time +import requests +import time n = 100 url = "http://192.168.1.1/" @@ -7,18 +8,17 @@ proxy = "http://192.168.1.115:8080/" start = time.time() for _ in range(n): - requests.get(url, allow_redirects=False, proxies=dict(http=proxy)) - print(".", end="") -t_mitmproxy = time.time()-start + requests.get(url, allow_redirects=False, proxies=dict(http=proxy)) + print(".", end="") +t_mitmproxy = time.time() - start print("\r\nTotal time with mitmproxy: {}".format(t_mitmproxy)) - start = time.time() for _ in range(n): - requests.get(url, allow_redirects=False) - print(".", end="") -t_without = time.time()-start + requests.get(url, allow_redirects=False) + print(".", end="") +t_without = time.time() - start -print("\r\nTotal time without mitmproxy: {}".format(t_without))
\ No newline at end of file +print("\r\nTotal time without mitmproxy: {}".format(t_without)) diff --git a/test/tools/getcert b/test/tools/getcert index 8fabefb7..3bd2bec8 100755 --- a/test/tools/getcert +++ b/test/tools/getcert @@ -1,7 +1,10 @@ #!/usr/bin/env python import sys sys.path.insert(0, "../..") -import socket, tempfile, ssl, subprocess +import socket +import tempfile +import ssl +import subprocess addr = socket.gethostbyname(sys.argv[1]) print ssl.get_server_certificate((addr, 443)) diff --git a/test/tools/passive_close.py b/test/tools/passive_close.py index d0b36e7f..7199ea70 100644 --- a/test/tools/passive_close.py +++ b/test/tools/passive_close.py @@ -2,12 +2,14 @@ import SocketServer from threading import Thread from time import sleep + class service(SocketServer.BaseRequestHandler): def handle(self): data = 'dummy' print "Client connected with ", self.client_address while True: - self.request.send("HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Length: 7\r\n\r\ncontent") + self.request.send( + "HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Length: 7\r\n\r\ncontent") data = self.request.recv(1024) if not len(data): print "Connection closed by remote: ", self.client_address @@ -17,5 +19,5 @@ class service(SocketServer.BaseRequestHandler): class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): pass -server = ThreadedTCPServer(('',1520), service) +server = ThreadedTCPServer(('', 1520), service) server.serve_forever() diff --git a/test/tools/yappibench.py b/test/tools/yappibench.py index b9e4e41d..bae8da44 100644 --- a/test/tools/yappibench.py +++ b/test/tools/yappibench.py @@ -2,7 +2,7 @@ # yappi (https://code.google.com/p/yappi/) # # Requirements: -# - Apache Bench "ab" binary +# - Apache Bench "ab" binary # - pip install click yappi from libmproxy.main import mitmdump @@ -13,14 +13,17 @@ import time import yappi import click + class ApacheBenchThread(Thread): - def __init__(self, concurrency): - self.concurrency = concurrency - super(ApacheBenchThread, self).__init__() + def __init__(self, concurrency): + self.concurrency = concurrency + super(ApacheBenchThread, self).__init__() + + def run(self): + time.sleep(2) + system( + "ab -n 1024 -c {} -X 127.0.0.1:8080 http://example.com/".format(self.concurrency)) - def run(self): - time.sleep(2) - system("ab -n 1024 -c {} -X 127.0.0.1:8080 http://example.com/".format(self.concurrency)) @click.command() @click.option('--profiler', default="yappi", type=click.Choice(['yappi'])) @@ -28,24 +31,24 @@ class ApacheBenchThread(Thread): @click.option('--concurrency', default=1, type=click.INT) def main(profiler, clock_type, concurrency): - outfile = "callgrind.mitmdump-{}-c{}".format(clock_type, concurrency) - a = ApacheBenchThread(concurrency) - a.start() - - if profiler == "yappi": - yappi.set_clock_type(clock_type) - yappi.start(builtins=True) - - print("Start mitmdump...") - mitmdump(["-k","-q","-S", "1024example"]) - print("mitmdump stopped.") - - print("Save profile information...") - if profiler == "yappi": - yappi.stop() - stats = yappi.get_func_stats() - stats.save(outfile, type='callgrind') - print("Done.") + outfile = "callgrind.mitmdump-{}-c{}".format(clock_type, concurrency) + a = ApacheBenchThread(concurrency) + a.start() + + if profiler == "yappi": + yappi.set_clock_type(clock_type) + yappi.start(builtins=True) + + print("Start mitmdump...") + mitmdump(["-k", "-q", "-S", "1024example"]) + print("mitmdump stopped.") + + print("Save profile information...") + if profiler == "yappi": + yappi.stop() + stats = yappi.get_func_stats() + stats.save(outfile, type='callgrind') + print("Done.") if __name__ == '__main__': - main()
\ No newline at end of file + main() diff --git a/test/tservers.py b/test/tservers.py index bcda0295..dc14fb37 100644 --- a/test/tservers.py +++ b/test/tservers.py @@ -1,23 +1,28 @@ import os.path -import threading, Queue -import shutil, tempfile +import threading +import Queue +import shutil +import tempfile import flask import mock from libmproxy.proxy.config import ProxyConfig from libmproxy.proxy.server import ProxyServer from libmproxy.proxy.primitives import TransparentProxyMode -import libpathod.test, libpathod.pathoc +import libpathod.test +import libpathod.pathoc from libmproxy import flow, controller from libmproxy.cmdline import APP_HOST, APP_PORT import tutils testapp = flask.Flask(__name__) + @testapp.route("/") def hello(): return "testapp" + @testapp.route("/error") def error(): raise ValueError("An exception...") @@ -57,7 +62,8 @@ class ProxyThread(threading.Thread): def __init__(self, tmaster): threading.Thread.__init__(self) self.tmaster = tmaster - self.name = "ProxyThread (%s:%s)" % (tmaster.server.address.host, tmaster.server.address.port) + self.name = "ProxyThread (%s:%s)" % ( + tmaster.server.address.host, tmaster.server.address.port) controller.should_exit = False @property @@ -87,8 +93,12 @@ class ProxTestBase(object): @classmethod def setupAll(cls): - cls.server = libpathod.test.Daemon(ssl=cls.ssl, ssloptions=cls.ssloptions) - cls.server2 = libpathod.test.Daemon(ssl=cls.ssl, ssloptions=cls.ssloptions) + cls.server = libpathod.test.Daemon( + ssl=cls.ssl, + ssloptions=cls.ssloptions) + cls.server2 = libpathod.test.Daemon( + ssl=cls.ssl, + ssloptions=cls.ssloptions) cls.config = ProxyConfig(**cls.get_proxy_config()) @@ -151,9 +161,9 @@ class HTTPProxTest(ProxTestBase): p = self.pathoc(sni=sni) spec = spec.encode("string_escape") if self.ssl: - q = "get:'/p/%s'"%spec + q = "get:'/p/%s'" % spec else: - q = "get:'%s/p/%s'"%(self.server.urlbase, spec) + q = "get:'%s/p/%s'" % (self.server.urlbase, spec) return p.request(q) def app(self, page): @@ -162,10 +172,10 @@ class HTTPProxTest(ProxTestBase): ("127.0.0.1", self.proxy.port), True, fp=None ) p.connect((APP_HOST, APP_PORT)) - return p.request("get:'%s'"%page) + return p.request("get:'%s'" % page) else: p = self.pathoc() - return p.request("get:'http://%s%s'"%(APP_HOST, page)) + return p.request("get:'http://%s%s'" % (APP_HOST, page)) class TResolver: @@ -188,7 +198,10 @@ class TransparentProxTest(ProxTestBase): ports = [cls.server.port, cls.server2.port] else: ports = [] - cls.config.mode = TransparentProxyMode(cls.resolver(cls.server.port), ports) + cls.config.mode = TransparentProxyMode( + cls.resolver( + cls.server.port), + ports) @classmethod def get_proxy_config(cls): @@ -202,10 +215,10 @@ class TransparentProxTest(ProxTestBase): """ if self.ssl: p = self.pathoc(sni=sni) - q = "get:'/p/%s'"%spec + q = "get:'/p/%s'" % spec else: p = self.pathoc() - q = "get:'/p/%s'"%spec + q = "get:'/p/%s'" % spec return p.request(q) def pathoc(self, sni=None): @@ -221,6 +234,7 @@ class TransparentProxTest(ProxTestBase): class ReverseProxTest(ProxTestBase): ssl = None + @classmethod def get_proxy_config(cls): d = ProxTestBase.get_proxy_config() @@ -249,10 +263,10 @@ class ReverseProxTest(ProxTestBase): """ if self.ssl: p = self.pathoc(sni=sni) - q = "get:'/p/%s'"%spec + q = "get:'/p/%s'" % spec else: p = self.pathoc() - q = "get:'/p/%s'"%spec + q = "get:'/p/%s'" % spec return p.request(q) @@ -278,8 +292,8 @@ class ChainProxTest(ProxTestBase): cls.chain.insert(0, proxy) # Patch the orginal proxy to upstream mode - cls.config = cls.proxy.tmaster.config = cls.proxy.tmaster.server.config = ProxyConfig(**cls.get_proxy_config()) - + cls.config = cls.proxy.tmaster.config = cls.proxy.tmaster.server.config = ProxyConfig( + **cls.get_proxy_config()) @classmethod def teardownAll(cls): @@ -303,5 +317,6 @@ class ChainProxTest(ProxTestBase): ) return d + class HTTPUpstreamProxTest(ChainProxTest, HTTPProxTest): pass diff --git a/test/tutils.py b/test/tutils.py index 499efc6e..aeaeb0de 100644 --- a/test/tutils.py +++ b/test/tutils.py @@ -1,5 +1,8 @@ from cStringIO import StringIO -import os, shutil, tempfile, argparse +import os +import shutil +import tempfile +import argparse from contextlib import contextmanager import sys from libmproxy import flow, utils, controller @@ -14,8 +17,11 @@ from nose.plugins.skip import SkipTest from mock import Mock from time import time + def _SkipWindows(): raise SkipTest("Skipped on Windows.") + + def SkipWindows(fn): if os.name == "nt": return _SkipWindows @@ -83,10 +89,23 @@ def treq(content="content", scheme="http", host="address", port=22): """ headers = odict.ODictCaseless() headers["header"] = ["qvalue"] - req = http.HTTPRequest("relative", "GET", scheme, host, port, "/path", (1, 1), headers, content, - None, None, None) + req = http.HTTPRequest( + "relative", + "GET", + scheme, + host, + port, + "/path", + (1, + 1), + headers, + content, + None, + None, + None) return req + def treq_absolute(content="content"): """ @return: libmproxy.protocol.http.HTTPRequest @@ -107,7 +126,15 @@ def tresp(content="message"): headers = odict.ODictCaseless() headers["header_response"] = ["svalue"] - resp = http.HTTPResponse((1, 1), 200, "OK", headers, content, time(), time()) + resp = http.HTTPResponse( + (1, + 1), + 200, + "OK", + headers, + content, + time(), + time()) return resp @@ -118,10 +145,11 @@ def terr(content="error"): err = Error(content) return err + def tflowview(request_contents=None): m = Mock() cs = ConsoleState() - if request_contents == None: + if request_contents is None: flow = tflow() else: flow = tflow(req=treq(request_contents)) @@ -129,9 +157,11 @@ def tflowview(request_contents=None): fv = FlowView(m, cs, flow) return fv + def get_body_line(last_displayed_body, line_nb): return last_displayed_body.contents()[line_nb + 2] + @contextmanager def tmpdir(*args, **kwargs): orig_workdir = os.getcwd() @@ -149,6 +179,7 @@ class MockParser(argparse.ArgumentParser): argparse.ArgumentParser sys.exits() by default. Make it more testable by throwing an exception instead. """ + def error(self, message): raise Exception(message) @@ -169,14 +200,14 @@ def raises(exc, obj, *args, **kwargs): :kwargs Arguments to be passed to the callable. """ try: - apply(obj, args, kwargs) - except Exception, v: + obj(*args, **kwargs) + except Exception as v: if isinstance(exc, basestring): if exc.lower() in str(v).lower(): return else: raise AssertionError( - "Expected %s, but caught %s"%( + "Expected %s, but caught %s" % ( repr(str(exc)), v ) ) @@ -185,7 +216,7 @@ def raises(exc, obj, *args, **kwargs): return else: raise AssertionError( - "Expected %s, but caught %s %s"%( + "Expected %s, but caught %s %s" % ( exc.__name__, v.__class__.__name__, str(v) ) ) |