aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_flow.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_flow.py')
-rw-r--r--test/test_flow.py347
1 files changed, 188 insertions, 159 deletions
diff --git a/test/test_flow.py b/test/test_flow.py
index b41eb630..2609b7cb 100644
--- a/test/test_flow.py
+++ b/test/test_flow.py
@@ -1,7 +1,10 @@
-import Queue, time, os.path
+import Queue
+import time
+import os.path
from cStringIO import StringIO
import email.utils
import mock
+from netlib import odict
from libmproxy import filt, protocol, controller, utils, tnetstring, flow
from libmproxy.protocol.primitives import Error, Flow
from libmproxy.protocol.http import decoded, CONTENT_MISSING
@@ -32,7 +35,6 @@ def test_app_registry():
assert ar.get(r)
-
class TestStickyCookieState:
def _response(self, cookie, host):
s = flow.StickyCookieState(filt.parse(".*"))
@@ -114,7 +116,15 @@ class TestClientPlaybackState:
class TestServerPlaybackState:
def test_hash(self):
- s = flow.ServerPlaybackState(None, [], False, False, None, False, None, False)
+ s = flow.ServerPlaybackState(
+ None,
+ [],
+ False,
+ False,
+ None,
+ False,
+ None,
+ False)
r = tutils.tflow()
r2 = tutils.tflow()
@@ -125,8 +135,20 @@ class TestServerPlaybackState:
r.request.path = "voing"
assert s._hash(r) != s._hash(r2)
+ r.request.path = "path?blank_value"
+ r2.request.path = "path?"
+ assert s._hash(r) != s._hash(r2)
+
def test_headers(self):
- s = flow.ServerPlaybackState(["foo"], [], False, False, None, False, None, False)
+ s = flow.ServerPlaybackState(
+ ["foo"],
+ [],
+ False,
+ False,
+ None,
+ False,
+ None,
+ False)
r = tutils.tflow(resp=True)
r.request.headers["foo"] = ["bar"]
r2 = tutils.tflow(resp=True)
@@ -147,7 +169,9 @@ class TestServerPlaybackState:
r2 = tutils.tflow(resp=True)
r2.request.headers["key"] = ["two"]
- s = flow.ServerPlaybackState(None, [r, r2], False, False, None, False, None, False)
+ s = flow.ServerPlaybackState(
+ None, [
+ r, r2], False, False, None, False, None, False)
assert s.count() == 2
assert len(s.fmap.keys()) == 1
@@ -168,81 +192,102 @@ class TestServerPlaybackState:
r2 = tutils.tflow(resp=True)
r2.request.headers["key"] = ["two"]
- s = flow.ServerPlaybackState(None, [r, r2], False, True, None, False, None, False)
+ s = flow.ServerPlaybackState(
+ None, [
+ r, r2], False, True, None, False, None, False)
assert s.count() == 2
s.next_flow(r)
assert s.count() == 2
-
def test_ignore_params(self):
- s = flow.ServerPlaybackState(None, [], False, False, ["param1", "param2"], False, None, False)
+ s = flow.ServerPlaybackState(
+ None, [], False, False, [
+ "param1", "param2"], False, None, False)
r = tutils.tflow(resp=True)
- r.request.path="/test?param1=1"
+ r.request.path = "/test?param1=1"
r2 = tutils.tflow(resp=True)
- r2.request.path="/test"
+ r2.request.path = "/test"
assert s._hash(r) == s._hash(r2)
- r2.request.path="/test?param1=2"
+ r2.request.path = "/test?param1=2"
assert s._hash(r) == s._hash(r2)
- r2.request.path="/test?param2=1"
+ r2.request.path = "/test?param2=1"
assert s._hash(r) == s._hash(r2)
- r2.request.path="/test?param3=2"
+ r2.request.path = "/test?param3=2"
assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_params(self):
- s = flow.ServerPlaybackState(None, [], False, False, None, False, ["param1", "param2"], False)
+ s = flow.ServerPlaybackState(
+ None, [], False, False, None, False, [
+ "param1", "param2"], False)
r = tutils.tflow(resp=True)
- r.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"]
+ r.request.headers[
+ "Content-Type"] = ["application/x-www-form-urlencoded"]
r.request.content = "paramx=x&param1=1"
r2 = tutils.tflow(resp=True)
- r2.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"]
+ r2.request.headers[
+ "Content-Type"] = ["application/x-www-form-urlencoded"]
r2.request.content = "paramx=x&param1=1"
- # same parameters
+ # same parameters
assert s._hash(r) == s._hash(r2)
- # ignored parameters !=
+ # ignored parameters !=
r2.request.content = "paramx=x&param1=2"
assert s._hash(r) == s._hash(r2)
- # missing parameter
- r2.request.content="paramx=x"
+ # missing parameter
+ r2.request.content = "paramx=x"
assert s._hash(r) == s._hash(r2)
# ignorable parameter added
- r2.request.content="paramx=x&param1=2"
+ r2.request.content = "paramx=x&param1=2"
assert s._hash(r) == s._hash(r2)
# not ignorable parameter changed
- r2.request.content="paramx=y&param1=1"
+ r2.request.content = "paramx=y&param1=1"
assert not s._hash(r) == s._hash(r2)
# not ignorable parameter missing
- r2.request.content="param1=1"
+ r2.request.content = "param1=1"
assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_params_other_content_type(self):
- s = flow.ServerPlaybackState(None, [], False, False, None, False, ["param1", "param2"], False)
+ s = flow.ServerPlaybackState(
+ None, [], False, False, None, False, [
+ "param1", "param2"], False)
r = tutils.tflow(resp=True)
r.request.headers["Content-Type"] = ["application/json"]
r.request.content = '{"param1":"1"}'
r2 = tutils.tflow(resp=True)
r2.request.headers["Content-Type"] = ["application/json"]
r2.request.content = '{"param1":"1"}'
- # same content
+ # same content
assert s._hash(r) == s._hash(r2)
# distint content (note only x-www-form-urlencoded payload is analysed)
r2.request.content = '{"param1":"2"}'
assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_wins_over_params(self):
- #NOTE: parameters are mutually exclusive in options
- s = flow.ServerPlaybackState(None, [], False, False, None, True, ["param1", "param2"], False)
+ # NOTE: parameters are mutually exclusive in options
+ s = flow.ServerPlaybackState(
+ None, [], False, False, None, True, [
+ "param1", "param2"], False)
r = tutils.tflow(resp=True)
- r.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"]
+ r.request.headers[
+ "Content-Type"] = ["application/x-www-form-urlencoded"]
r.request.content = "paramx=y"
r2 = tutils.tflow(resp=True)
- r2.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"]
+ r2.request.headers[
+ "Content-Type"] = ["application/x-www-form-urlencoded"]
r2.request.content = "paramx=x"
- # same parameters
+ # same parameters
assert s._hash(r) == s._hash(r2)
def test_ignore_content(self):
- s = flow.ServerPlaybackState(None, [], False, False, None, False, None, False)
+ s = flow.ServerPlaybackState(
+ None,
+ [],
+ False,
+ False,
+ None,
+ False,
+ None,
+ False)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
@@ -252,8 +297,16 @@ class TestServerPlaybackState:
r2.request.content = "bar"
assert not s._hash(r) == s._hash(r2)
- #now ignoring content
- s = flow.ServerPlaybackState(None, [], False, False, None, True, None, False)
+ # now ignoring content
+ s = flow.ServerPlaybackState(
+ None,
+ [],
+ False,
+ False,
+ None,
+ True,
+ None,
+ False)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
r.request.content = "foo"
@@ -267,14 +320,22 @@ class TestServerPlaybackState:
assert s._hash(r) == s._hash(r2)
def test_ignore_host(self):
- s = flow.ServerPlaybackState(None, [], False, False, None, False, None, True)
+ s = flow.ServerPlaybackState(
+ None,
+ [],
+ False,
+ False,
+ None,
+ False,
+ None,
+ True)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
- r.request.host="address"
- r2.request.host="address"
+ r.request.host = "address"
+ r2.request.host = "address"
assert s._hash(r) == s._hash(r2)
- r2.request.host="wrong_address"
+ r2.request.host = "wrong_address"
assert s._hash(r) == s._hash(r2)
@@ -338,12 +399,14 @@ class TestFlow:
def test_getset_state(self):
f = tutils.tflow(resp=True)
state = f.get_state()
- assert f.get_state() == protocol.http.HTTPFlow.from_state(state).get_state()
+ assert f.get_state() == protocol.http.HTTPFlow.from_state(
+ state).get_state()
f.response = None
f.error = Error("error")
state = f.get_state()
- assert f.get_state() == protocol.http.HTTPFlow.from_state(state).get_state()
+ assert f.get_state() == protocol.http.HTTPFlow.from_state(
+ state).get_state()
f2 = f.copy()
f2.id = f.id # copy creates a different uuid
@@ -425,7 +488,6 @@ class TestFlow:
assert f.response.content == "abarb"
-
class TestState:
def test_backup(self):
c = flow.State()
@@ -514,7 +576,7 @@ class TestState:
assert c.intercept_txt == "~q"
assert "Invalid" in c.set_intercept("~")
assert not c.set_intercept(None)
- assert c.intercept_txt == None
+ assert c.intercept_txt is None
def _add_request(self, state):
f = tutils.tflow()
@@ -603,7 +665,13 @@ class TestSerialize:
def test_load_flows_reverse(self):
r = self._treader()
s = flow.State()
- conf = ProxyConfig(mode="reverse", upstream_server=[True,True,"use-this-domain",80])
+ conf = ProxyConfig(
+ mode="reverse",
+ upstream_server=[
+ True,
+ True,
+ "use-this-domain",
+ 80])
fm = flow.FlowMaster(DummyServer(conf), s)
fm.load_flows(r)
assert s.flows[0].request.host == "use-this-domain"
@@ -625,7 +693,6 @@ class TestSerialize:
r = flow.FlowReader(sio)
assert len(list(r.stream()))
-
def test_error(self):
sio = StringIO()
sio.write("bogus")
@@ -656,7 +723,8 @@ class TestFlowMaster:
assert not fm.load_script(tutils.test_data.path("scripts/a.py"))
assert not fm.unload_scripts()
assert fm.load_script("nonexistent")
- assert "ValueError" in fm.load_script(tutils.test_data.path("scripts/starterr.py"))
+ assert "ValueError" in fm.load_script(
+ tutils.test_data.path("scripts/starterr.py"))
assert len(fm.scripts) == 0
def test_getset_ignore(self):
@@ -702,14 +770,14 @@ class TestFlowMaster:
assert fm.scripts[0].ns["log"][-1] == "request"
fm.handle_response(f)
assert fm.scripts[0].ns["log"][-1] == "response"
- #load second script
+ # load second script
assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
assert len(fm.scripts) == 2
fm.handle_clientdisconnect(f.server_conn)
assert fm.scripts[0].ns["log"][-1] == "clientdisconnect"
assert fm.scripts[1].ns["log"][-1] == "clientdisconnect"
- #unload first script
+ # unload first script
fm.unload_scripts()
assert len(fm.scripts) == 0
assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
@@ -760,7 +828,16 @@ class TestFlowMaster:
f = tutils.tflow(resp=True)
pb = [tutils.tflow(resp=True), f]
fm = flow.FlowMaster(DummyServer(ProxyConfig()), s)
- assert not fm.start_server_playback(pb, False, [], False, False, None, False, None, False)
+ assert not fm.start_server_playback(
+ pb,
+ False,
+ [],
+ False,
+ False,
+ None,
+ False,
+ None,
+ False)
assert not fm.start_client_playback(pb, False)
fm.client_playback.testing = True
@@ -783,16 +860,43 @@ class TestFlowMaster:
fm.refresh_server_playback = True
assert not fm.do_server_playback(tutils.tflow())
- fm.start_server_playback(pb, False, [], False, False, None, False, None, False)
+ fm.start_server_playback(
+ pb,
+ False,
+ [],
+ False,
+ False,
+ None,
+ False,
+ None,
+ False)
assert fm.do_server_playback(tutils.tflow())
- fm.start_server_playback(pb, False, [], True, False, None, False, None, False)
+ fm.start_server_playback(
+ pb,
+ False,
+ [],
+ True,
+ False,
+ None,
+ False,
+ None,
+ False)
r = tutils.tflow()
r.request.content = "gibble"
assert not fm.do_server_playback(r)
assert fm.do_server_playback(tutils.tflow())
- fm.start_server_playback(pb, False, [], True, False, None, False, None, False)
+ fm.start_server_playback(
+ pb,
+ False,
+ [],
+ True,
+ False,
+ None,
+ False,
+ None,
+ False)
q = Queue.Queue()
fm.tick(q, 0)
assert fm.should_exit.is_set()
@@ -807,7 +911,16 @@ class TestFlowMaster:
pb = [f]
fm = flow.FlowMaster(None, s)
fm.refresh_server_playback = True
- fm.start_server_playback(pb, True, [], False, False, None, False, None, False)
+ fm.start_server_playback(
+ pb,
+ True,
+ [],
+ False,
+ False,
+ None,
+ False,
+ None,
+ False)
f = tutils.tflow()
f.request.host = "nonexistent"
@@ -857,8 +970,9 @@ class TestFlowMaster:
def test_stream(self):
with tutils.tmpdir() as tdir:
p = os.path.join(tdir, "foo")
+
def r():
- r = flow.FlowReader(open(p,"rb"))
+ r = flow.FlowReader(open(p, "rb"))
return list(r.stream())
s = flow.State()
@@ -879,6 +993,7 @@ class TestFlowMaster:
assert not r()[1].response
+
class TestRequest:
def test_simple(self):
f = tutils.tflow()
@@ -914,7 +1029,7 @@ class TestRequest:
r.host = "address"
r.port = 22
- assert r.url== "https://address:22/path"
+ assert r.url == "https://address:22/path"
assert r.pretty_url(True) == "https://address:22/path"
r.headers["Host"] = ["foo.com"]
@@ -927,7 +1042,7 @@ class TestRequest:
assert r.get_path_components() == []
r.path = "/foo/bar"
assert r.get_path_components() == ["foo", "bar"]
- q = flow.ODict()
+ q = odict.ODict()
q["test"] = ["123"]
r.set_query(q)
assert r.get_path_components() == ["foo", "bar"]
@@ -941,12 +1056,12 @@ class TestRequest:
assert "%2F" in r.path
def test_getset_form_urlencoded(self):
- d = flow.ODict([("one", "two"), ("three", "four")])
+ d = odict.ODict([("one", "two"), ("three", "four")])
r = tutils.treq(content=utils.urlencode(d.lst))
r.headers["content-type"] = [protocol.http.HDR_FORM_URLENCODED]
assert r.get_form_urlencoded() == d
- d = flow.ODict([("x", "y")])
+ d = odict.ODict([("x", "y")])
r.set_form_urlencoded(d)
assert r.get_form_urlencoded() == d
@@ -954,7 +1069,7 @@ class TestRequest:
assert not r.get_form_urlencoded()
def test_getset_query(self):
- h = flow.ODictCaseless()
+ h = odict.ODictCaseless()
r = tutils.treq()
r.path = "/foo?x=y&a=b"
@@ -971,14 +1086,14 @@ class TestRequest:
r.path = "/foo?x=y&a=b"
assert r.get_query()
- r.set_query(flow.ODict([]))
+ r.set_query(odict.ODict([]))
assert not r.get_query()
- qv = flow.ODict([("a", "b"), ("c", "d")])
+ qv = odict.ODict([("a", "b"), ("c", "d")])
r.set_query(qv)
assert r.get_query() == qv
def test_anticache(self):
- h = flow.ODictCaseless()
+ h = odict.ODictCaseless()
r = tutils.treq()
r.headers = h
h["if-modified-since"] = ["test"]
@@ -1042,43 +1157,8 @@ class TestRequest:
r.encode("gzip")
assert r.get_decoded_content() == "falafel"
- def test_get_cookies_none(self):
- h = flow.ODictCaseless()
- r = tutils.treq()
- r.headers = h
- assert r.get_cookies() is None
-
- def test_get_cookies_single(self):
- h = flow.ODictCaseless()
- h["Cookie"] = ["cookiename=cookievalue"]
- r = tutils.treq()
- r.headers = h
- result = r.get_cookies()
- assert len(result)==1
- assert result['cookiename']==('cookievalue',{})
-
- def test_get_cookies_double(self):
- h = flow.ODictCaseless()
- h["Cookie"] = ["cookiename=cookievalue;othercookiename=othercookievalue"]
- r = tutils.treq()
- r.headers = h
- result = r.get_cookies()
- assert len(result)==2
- assert result['cookiename']==('cookievalue',{})
- assert result['othercookiename']==('othercookievalue',{})
-
- def test_get_cookies_withequalsign(self):
- h = flow.ODictCaseless()
- h["Cookie"] = ["cookiename=coo=kievalue;othercookiename=othercookievalue"]
- r = tutils.treq()
- r.headers = h
- result = r.get_cookies()
- assert len(result)==2
- assert result['cookiename']==('coo=kievalue',{})
- assert result['othercookiename']==('othercookievalue',{})
-
def test_header_size(self):
- h = flow.ODictCaseless()
+ h = odict.ODictCaseless()
h["headername"] = ["headervalue"]
r = tutils.treq()
r.headers = h
@@ -1086,12 +1166,13 @@ class TestRequest:
assert len(raw) == 62
def test_get_content_type(self):
- h = flow.ODictCaseless()
+ h = odict.ODictCaseless()
h["Content-Type"] = ["text/plain"]
resp = tutils.tresp()
resp.headers = h
assert resp.headers.get_first("content-type") == "text/plain"
+
class TestResponse:
def test_simple(self):
f = tutils.tflow(resp=True)
@@ -1107,7 +1188,9 @@ class TestResponse:
assert resp.size() == len(resp.assemble())
resp.content = CONTENT_MISSING
- tutils.raises("Cannot assemble flow with CONTENT_MISSING", resp.assemble)
+ tutils.raises(
+ "Cannot assemble flow with CONTENT_MISSING",
+ resp.assemble)
def test_refresh(self):
r = tutils.tresp()
@@ -1116,14 +1199,15 @@ class TestResponse:
pre = r.headers["date"]
r.refresh(n)
assert pre == r.headers["date"]
- r.refresh(n+60)
+ r.refresh(n + 60)
d = email.utils.parsedate_tz(r.headers["date"][0])
d = email.utils.mktime_tz(d)
# Weird that this is not exact...
- assert abs(60-(d-n)) <= 1
+ assert abs(60 - (d - n)) <= 1
- r.headers["set-cookie"] = ["MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"]
+ r.headers[
+ "set-cookie"] = ["MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"]
r.refresh()
def test_refresh_cookie(self):
@@ -1176,64 +1260,10 @@ class TestResponse:
def test_header_size(self):
r = tutils.tresp()
result = len(r._assemble_headers())
- assert result==44
-
- def test_get_cookies_none(self):
- h = flow.ODictCaseless()
- resp = tutils.tresp()
- resp.headers = h
- assert not resp.get_cookies()
-
- def test_get_cookies_simple(self):
- h = flow.ODictCaseless()
- h["Set-Cookie"] = ["cookiename=cookievalue"]
- resp = tutils.tresp()
- resp.headers = h
- result = resp.get_cookies()
- assert len(result)==1
- assert "cookiename" in result
- assert result["cookiename"] == ("cookievalue", {})
-
- def test_get_cookies_with_parameters(self):
- h = flow.ODictCaseless()
- h["Set-Cookie"] = ["cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
- resp = tutils.tresp()
- resp.headers = h
- result = resp.get_cookies()
- assert len(result)==1
- assert "cookiename" in result
- assert result["cookiename"][0] == "cookievalue"
- assert len(result["cookiename"][1])==4
- assert result["cookiename"][1]["domain"]=="example.com"
- assert result["cookiename"][1]["expires"]=="Wed Oct 21 16:29:41 2015"
- assert result["cookiename"][1]["path"]=="/"
- assert result["cookiename"][1]["httponly"]==""
-
- def test_get_cookies_no_value(self):
- h = flow.ODictCaseless()
- h["Set-Cookie"] = ["cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"]
- resp = tutils.tresp()
- resp.headers = h
- result = resp.get_cookies()
- assert len(result)==1
- assert "cookiename" in result
- assert result["cookiename"][0] == ""
- assert len(result["cookiename"][1])==2
-
- def test_get_cookies_twocookies(self):
- h = flow.ODictCaseless()
- h["Set-Cookie"] = ["cookiename=cookievalue","othercookie=othervalue"]
- resp = tutils.tresp()
- resp.headers = h
- result = resp.get_cookies()
- assert len(result)==2
- assert "cookiename" in result
- assert result["cookiename"] == ("cookievalue", {})
- assert "othercookie" in result
- assert result["othercookie"] == ("othervalue", {})
+ assert result == 44
def test_get_content_type(self):
- h = flow.ODictCaseless()
+ h = odict.ODictCaseless()
h["Content-Type"] = ["text/plain"]
resp = tutils.tresp()
resp.headers = h
@@ -1262,7 +1292,7 @@ class TestClientConnection:
c = tutils.tclient_conn()
assert ClientConnection.from_state(c.get_state()).get_state() ==\
- c.get_state()
+ c.get_state()
c2 = tutils.tclient_conn()
c2.address.address = (c2.address.host, 4242)
@@ -1379,7 +1409,6 @@ def test_setheaders():
h.run(f)
assert f.request.content == "foo"
-
h.clear()
h.add("~s", "one", "two")
h.add("~s", "one", "three")