diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_dump.py | 18 | ||||
-rw-r--r-- | test/test_filt.py | 43 | ||||
-rw-r--r-- | test/test_flow.py | 122 | ||||
-rw-r--r-- | test/tutils.py | 64 |
4 files changed, 131 insertions, 116 deletions
diff --git a/test/test_dump.py b/test/test_dump.py index 031a3f6a..f6688b1a 100644 --- a/test/test_dump.py +++ b/test/test_dump.py @@ -6,11 +6,11 @@ import mock def test_strfuncs(): t = tutils.tresp() - t._set_replay() + t.is_replay = True dump.str_response(t) t = tutils.treq() - t.client_conn = None + t.flow.client_conn = None t.stickycookie = True assert "stickycookie" in dump.str_request(t, False) assert "stickycookie" in dump.str_request(t, True) @@ -20,24 +20,20 @@ def test_strfuncs(): class TestDumpMaster: def _cycle(self, m, content): - req = tutils.treq() - req.content = content + req = tutils.treq(content=content) l = proxy.Log("connect") l.reply = mock.MagicMock() m.handle_log(l) - cc = req.client_conn - cc.connection_error = "error" - resp = tutils.tresp(req) - resp.content = content + cc = req.flow.client_conn + cc.reply = mock.MagicMock() + resp = tutils.tresp(req, content=content) m.handle_clientconnect(cc) sc = proxy.ServerConnection((req.host, req.port)) sc.reply = mock.MagicMock() m.handle_serverconnection(sc) m.handle_request(req) f = m.handle_response(resp) - cd = flow.ClientDisconnect(cc) - cd.reply = mock.MagicMock() - m.handle_clientdisconnect(cd) + m.handle_clientdisconnect(cc) return f def _dummy_cycle(self, n, filt, content, **options): diff --git a/test/test_filt.py b/test/test_filt.py index 4e059196..96fc58f9 100644 --- a/test/test_filt.py +++ b/test/test_filt.py @@ -1,6 +1,7 @@ import cStringIO from libmproxy import filt, flow - +from libmproxy.protocol import http +import tutils class TestParsing: def _dump(self, x): @@ -72,41 +73,37 @@ class TestParsing: class TestMatching: def req(self): - conn = flow.ClientConnect(("one", 2222)) headers = flow.ODictCaseless() headers["header"] = ["qvalue"] - req = flow.Request( - conn, - (1, 1), - "host", - 80, - "http", - "GET", - "/path", - headers, - "content_request" + req = http.HTTPRequest( + "absolute", + "GET", + "http", + "host", + 80, + "/path", + (1, 1), + headers, + "content_request", + None, + None ) - return flow.Flow(req) + f = http.HTTPFlow(tutils.tclient_conn(), None) + f.request = req + return f def resp(self): f = self.req() headers = flow.ODictCaseless() headers["header_response"] = ["svalue"] - f.response = flow.Response( - f.request, - (1, 1), - 200, - "message", - headers, - "content_response", - None - ) + f.response = http.HTTPResponse((1, 1), 200, "OK", headers, "content_response", None, None) + return f def err(self): f = self.req() - f.error = flow.Error(f.request, "msg") + f.error = flow.Error("msg") return f def q(self, q, o): diff --git a/test/test_flow.py b/test/test_flow.py index 42118e36..3bcb47d8 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -172,7 +172,9 @@ class TestFlow: def test_copy(self): f = tutils.tflow_full() f2 = f.copy() + assert f == f2 assert not f is f2 + assert f.request == f2.request assert not f.request is f2.request assert f.request.headers == f2.request.headers assert not f.request.headers is f2.request.headers @@ -189,9 +191,7 @@ class TestFlow: assert not f.error is f2.error def test_match(self): - f = tutils.tflow() - f.response = tutils.tresp() - f.request = f.response.request + f = tutils.tflow_full() assert not f.match("~b test") assert f.match(None) assert not f.match("~b test") @@ -201,7 +201,6 @@ class TestFlow: tutils.raises(ValueError, f.match, "~") - def test_backup(self): f = tutils.tflow() f.response = tutils.tresp() @@ -228,12 +227,12 @@ class TestFlow: assert f._get_state() == flow.Flow._from_state(state)._get_state() f.response = None - f.error = flow.Error(f.request, "error") + f.error = flow.Error("error") state = f._get_state() assert f._get_state() == flow.Flow._from_state(state)._get_state() f2 = tutils.tflow() - f2.error = flow.Error(f.request, "e2") + f2.error = flow.Error("e2") assert not f == f2 f._load_state(f2._get_state()) assert f._get_state() == f2._get_state() @@ -286,10 +285,6 @@ class TestFlow: f.accept_intercept() assert f.response.reply.acked - def test_serialization(self): - f = flow.Flow(None) - f.request = tutils.treq() - def test_replace_unicode(self): f = tutils.tflow_full() f.response.content = "\xc2foo" @@ -310,10 +305,6 @@ class TestFlow: assert f.response.headers["bar"] == ["bar"] assert f.response.content == "abarb" - f = tutils.tflow_err() - f.replace("error", "bar") - assert f.error.msg == "bar" - def test_replace_encoded(self): f = tutils.tflow_full() f.request.content = "afoob" @@ -378,16 +369,16 @@ class TestState: c = flow.State() req = tutils.treq() f = c.add_request(req) - e = flow.Error(f.request, "message") + e = flow.Error("message") assert c.add_error(e) - e = flow.Error(tutils.tflow().request, "message") + e = flow.Error("message") assert not c.add_error(e) c = flow.State() req = tutils.treq() f = c.add_request(req) - e = flow.Error(f.request, "message") + e = flow.Error("message") c.set_limit("~e") assert not c.view assert not c.view @@ -444,7 +435,7 @@ class TestState: def _add_error(self, state): req = tutils.treq() f = state.add_request(req) - f.error = flow.Error(f.request, "msg") + f.error = flow.Error("msg") def test_clear(self): c = flow.State() @@ -615,7 +606,7 @@ class TestFlowMaster: assert len(fm.scripts) == 0 assert not fm.load_script(tutils.test_data.path("scripts/all.py")) - err = flow.Error(f.request, "msg") + err = flow.Error("msg") err.reply = controller.DummyReply() fm.handle_error(err) assert fm.scripts[0].ns["log"][-1] == "error" @@ -637,7 +628,7 @@ class TestFlowMaster: fm.anticache = True fm.anticomp = True req = tutils.treq() - fm.handle_clientconnect(req.client_conn) + fm.handle_clientconnect(req.flow.client_conn) f = fm.handle_request(req) assert s.flow_count() == 1 @@ -649,12 +640,12 @@ class TestFlowMaster: rx = tutils.tresp() assert not fm.handle_response(rx) - dc = flow.ClientDisconnect(req.client_conn) + dc = flow.ClientDisconnect(req.flow.client_conn) dc.reply = controller.DummyReply() req.client_conn.requestcount = 1 fm.handle_clientdisconnect(dc) - err = flow.Error(f.request, "msg") + err = flow.Error("msg") err.reply = controller.DummyReply() fm.handle_error(err) @@ -675,7 +666,7 @@ class TestFlowMaster: fm.tick(q) assert fm.state.flow_count() - err = flow.Error(f.request, "error") + err = flow.Error("error") err.reply = controller.DummyReply() fm.handle_error(err) @@ -886,7 +877,8 @@ class TestRequest: def test_anticache(self): h = flow.ODictCaseless() - r = flow.Request(None, (1, 1), "host", 22, "https", "GET", "/", h, "content") + r = tutils.treq() + r.headers = h h["if-modified-since"] = ["test"] h["if-none-match"] = ["test"] r.anticache() @@ -896,8 +888,8 @@ class TestRequest: def test_getset_state(self): h = flow.ODictCaseless() h["test"] = ["test"] - c = flow.ClientConnect(("addr", 2222)) - r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + r = tutils.treq() + r.headers = h state = r._get_state() assert flow.Request._from_state(state) == r @@ -905,7 +897,8 @@ class TestRequest: state = r._get_state() assert flow.Request._from_state(state) == r - r2 = flow.Request(c, (1, 1), "testing", 20, "http", "PUT", "/foo", h, "test") + r2 = tutils.treq() + r2.headers = h assert not r == r2 r._load_state(r2._get_state()) assert r == r2 @@ -971,15 +964,15 @@ class TestRequest: def test_get_cookies_none(self): h = flow.ODictCaseless() - c = flow.ClientConnect(("addr", 2222)) - r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") - assert r.get_cookies() == None + r = tutils.treq() + r.headers = h + assert r.get_cookies() is None def test_get_cookies_single(self): h = flow.ODictCaseless() h["Cookie"] = ["cookiename=cookievalue"] - c = flow.ClientConnect(("addr", 2222)) - r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + r = tutils.treq() + r.headers = h result = r.get_cookies() assert len(result)==1 assert result['cookiename']==('cookievalue',{}) @@ -987,8 +980,8 @@ class TestRequest: def test_get_cookies_double(self): h = flow.ODictCaseless() h["Cookie"] = ["cookiename=cookievalue;othercookiename=othercookievalue"] - c = flow.ClientConnect(("addr", 2222)) - r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + r = tutils.treq() + r.headers = h result = r.get_cookies() assert len(result)==2 assert result['cookiename']==('cookievalue',{}) @@ -997,26 +990,28 @@ class TestRequest: def test_get_cookies_withequalsign(self): h = flow.ODictCaseless() h["Cookie"] = ["cookiename=coo=kievalue;othercookiename=othercookievalue"] - c = flow.ClientConnect(("addr", 2222)) - r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + r = tutils.treq() + r.headers = h result = r.get_cookies() assert len(result)==2 assert result['cookiename']==('coo=kievalue',{}) assert result['othercookiename']==('othercookievalue',{}) - def test_get_header_size(self): + def test_header_size(self): h = flow.ODictCaseless() h["headername"] = ["headervalue"] - c = flow.ClientConnect(("addr", 2222)) - r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") - result = r.get_header_size() - assert result==43 + r = tutils.treq() + r.headers = h + result = len(r._assemble_headers()) + print result + print r._assemble_headers() + assert result == 62 def test_get_transmitted_size(self): h = flow.ODictCaseless() h["headername"] = ["headervalue"] - c = flow.ClientConnect(("addr", 2222)) - r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + r = tutils.treq() + r.headers = h result = r.get_transmitted_size() assert result==len("content") r.content = None @@ -1025,9 +1020,9 @@ class TestRequest: def test_get_content_type(self): h = flow.ODictCaseless() h["Content-Type"] = ["text/plain"] - c = flow.ClientConnect(("addr", 2222)) - r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") - assert r.get_content_type()=="text/plain" + resp = tutils.tresp() + resp.headers = h + assert resp.get_content_type()=="text/plain" class TestResponse: def test_simple(self): @@ -1125,20 +1120,22 @@ class TestResponse: assert not r.headers["content-encoding"] assert r.content == "falafel" - def test_get_header_size(self): + def test_header_size(self): r = tutils.tresp() - result = r.get_header_size() - assert result==49 + result = len(r._assemble_headers()) + assert result==44 def test_get_cookies_none(self): h = flow.ODictCaseless() - resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None) + resp = tutils.tresp() + resp.headers = h assert not resp.get_cookies() def test_get_cookies_simple(self): h = flow.ODictCaseless() h["Set-Cookie"] = ["cookiename=cookievalue"] - resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None) + resp = tutils.tresp() + resp.headers = h result = resp.get_cookies() assert len(result)==1 assert "cookiename" in result @@ -1147,7 +1144,8 @@ class TestResponse: def test_get_cookies_with_parameters(self): h = flow.ODictCaseless() h["Set-Cookie"] = ["cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"] - resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None) + resp = tutils.tresp() + resp.headers = h result = resp.get_cookies() assert len(result)==1 assert "cookiename" in result @@ -1161,7 +1159,8 @@ class TestResponse: def test_get_cookies_no_value(self): h = flow.ODictCaseless() h["Set-Cookie"] = ["cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"] - resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None) + resp = tutils.tresp() + resp.headers = h result = resp.get_cookies() assert len(result)==1 assert "cookiename" in result @@ -1171,7 +1170,8 @@ class TestResponse: def test_get_cookies_twocookies(self): h = flow.ODictCaseless() h["Set-Cookie"] = ["cookiename=cookievalue","othercookie=othervalue"] - resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None) + resp = tutils.tresp() + resp.headers = h result = resp.get_cookies() assert len(result)==2 assert "cookiename" in result @@ -1182,19 +1182,20 @@ class TestResponse: def test_get_content_type(self): h = flow.ODictCaseless() h["Content-Type"] = ["text/plain"] - resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None) + resp = tutils.tresp() + resp.headers = h assert resp.get_content_type()=="text/plain" class TestError: def test_getset_state(self): - e = flow.Error(None, "Error") + e = flow.Error("Error") state = e._get_state() - assert flow.Error._from_state(None, state) == e + assert flow.Error._from_state(state) == e assert e.copy() - e2 = flow.Error(None, "bar") + e2 = flow.Error("bar") assert not e == e2 e._load_state(e2._get_state()) assert e == e2 @@ -1203,11 +1204,6 @@ class TestError: e3 = e.copy() assert e3 == e - def test_replace(self): - e = flow.Error(None, "amoop") - e.replace("moo", "bar") - assert e.msg == "abarp" - class TestClientConnect: def test_state(self): diff --git a/test/tutils.py b/test/tutils.py index fb41d77a..78bf5909 100644 --- a/test/tutils.py +++ b/test/tutils.py @@ -1,6 +1,7 @@ import os, shutil, tempfile from contextlib import contextmanager -from libmproxy import flow, utils, controller +from libmproxy import flow, utils, controller, proxy +from libmproxy.protocol import http if os.name != "nt": from libmproxy.console.flowview import FlowView from libmproxy.console import ConsoleState @@ -16,40 +17,65 @@ def SkipWindows(fn): else: return fn + +def tclient_conn(): + return proxy.ClientConnection._from_state(dict( + address=dict(address=("address", 22), use_ipv6=True), + clientcert=None + )) + +def tserver_conn(): + return proxy.ServerConnection._from_state(dict( + address=dict(address=("address", 22), use_ipv6=True), + source_address=dict(address=("address", 22), use_ipv6=True), + cert=None + )) + + def treq(conn=None, content="content"): if not conn: - conn = flow.ClientConnect(("address", 22)) - conn.reply = controller.DummyReply() + conn = tclient_conn() + server_conn = tserver_conn() headers = flow.ODictCaseless() headers["header"] = ["qvalue"] - r = flow.Request(conn, (1, 1), "host", 80, "http", "GET", "/path", headers, - content) - r.reply = controller.DummyReply() - return r + + f = http.HTTPFlow(conn, server_conn) + f.request = http.HTTPRequest("origin", "GET", None, None, None, "/path", (1, 1), headers, content, + None, None, None) + f.request.reply = controller.DummyReply() + return f.request -def tresp(req=None): +def tresp(req=None, content="message"): if not req: req = treq() + f = req.flow + headers = flow.ODictCaseless() headers["header_response"] = ["svalue"] - cert = certutils.SSLCert.from_der(file(test_data.path("data/dercert"),"rb").read()) - resp = flow.Response(req, (1, 1), 200, "message", headers, "content_response", cert) - resp.reply = controller.DummyReply() - return resp + cert = certutils.SSLCert.from_der(file(test_data.path("data/dercert"), "rb").read()) + f.server_conn = proxy.ServerConnection._from_state(dict( + address=dict(address=("address", 22), use_ipv6=True), + source_address=None, + cert=cert.to_pem())) + f.response = http.HTTPResponse((1, 1), 200, "OK", headers, content, None, None) + f.response.reply = controller.DummyReply() + return f.response + def terr(req=None): if not req: req = treq() - err = flow.Error(req, "error") - err.reply = controller.DummyReply() - return err + f = req.flow + f.error = flow.Error("error") + f.error.reply = controller.DummyReply() + return f.error -def tflow(r=None): - if r == None: - r = treq() - return flow.Flow(r) +def tflow(req=None): + if not req: + req = treq() + return req.flow def tflow_full(): |