aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_flow.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_flow.py')
-rw-r--r--test/test_flow.py89
1 files changed, 42 insertions, 47 deletions
diff --git a/test/test_flow.py b/test/test_flow.py
index 711688da..9cce26b3 100644
--- a/test/test_flow.py
+++ b/test/test_flow.py
@@ -3,20 +3,18 @@ import time
import os.path
from cStringIO import StringIO
import email.utils
+
import mock
import netlib.utils
from netlib import odict
-from netlib.http.semantics import CONTENT_MISSING, HDR_FORM_URLENCODED, HDR_FORM_MULTIPART
-
-from libmproxy import filt, protocol, controller, utils, tnetstring, flow
-from libmproxy.protocol import http_wrappers
-from libmproxy.protocol.primitives import Error, Flow
-from libmproxy.protocol.http import decoded
+from netlib.http.semantics import CONTENT_MISSING, HDR_FORM_URLENCODED
+from libmproxy import filt, protocol, controller, tnetstring, flow
+from libmproxy.models import Error, Flow, HTTPRequest, HTTPResponse, HTTPFlow, decoded
from libmproxy.proxy.config import HostMatcher
from libmproxy.proxy import ProxyConfig
from libmproxy.proxy.server import DummyServer
-from libmproxy.proxy.connection import ClientConnection
+from libmproxy.models.connections import ClientConnection
import tutils
@@ -24,7 +22,7 @@ def test_app_registry():
ar = flow.AppRegistry()
ar.add("foo", "domain", 80)
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.host = "domain"
r.port = 80
assert ar.get(r)
@@ -32,7 +30,7 @@ def test_app_registry():
r.port = 81
assert not ar.get(r)
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.host = "domain2"
r.port = 80
assert not ar.get(r)
@@ -385,7 +383,7 @@ class TestFlow:
def test_backup(self):
f = tutils.tflow()
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp())
f.request.content = "foo"
assert not f.modified()
f.backup()
@@ -404,13 +402,13 @@ class TestFlow:
def test_getset_state(self):
f = tutils.tflow(resp=True)
state = f.get_state()
- assert f.get_state() == protocol.http.HTTPFlow.from_state(
+ assert f.get_state() == HTTPFlow.from_state(
state).get_state()
f.response = None
f.error = Error("error")
state = f.get_state()
- assert f.get_state() == protocol.http.HTTPFlow.from_state(
+ assert f.get_state() == HTTPFlow.from_state(
state).get_state()
f2 = f.copy()
@@ -518,16 +516,16 @@ class TestState:
assert c.add_flow(newf)
assert c.active_flow_count() == 2
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp())
assert c.update_flow(f)
assert c.flow_count() == 2
assert c.active_flow_count() == 1
- _ = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ _ = HTTPResponse.wrap(netlib.tutils.tresp())
assert not c.update_flow(None)
assert c.active_flow_count() == 1
- newf.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ newf.response = HTTPResponse.wrap(netlib.tutils.tresp())
assert c.update_flow(newf)
assert c.active_flow_count() == 0
@@ -559,7 +557,7 @@ class TestState:
c.set_limit("~s")
assert c.limit_txt == "~s"
assert len(c.view) == 0
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp())
c.update_flow(f)
assert len(c.view) == 1
c.set_limit(None)
@@ -591,7 +589,7 @@ class TestState:
def _add_response(self, state):
f = tutils.tflow()
state.add_flow(f)
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp())
state.update_flow(f)
def _add_error(self, state):
@@ -672,11 +670,8 @@ class TestSerialize:
s = flow.State()
conf = ProxyConfig(
mode="reverse",
- upstream_server=[
- True,
- True,
- "use-this-domain",
- 80])
+ upstream_server=("https", ("use-this-domain", 80))
+ )
fm = flow.FlowMaster(DummyServer(conf), s)
fm.load_flows(r)
assert s.flows[0].request.host == "use-this-domain"
@@ -809,11 +804,11 @@ class TestFlowMaster:
fm.anticomp = True
f = tutils.tflow(req=None)
fm.handle_clientconnect(f.client_conn)
- f.request = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ f.request = HTTPRequest.wrap(netlib.tutils.treq())
fm.handle_request(f)
assert s.flow_count() == 1
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp())
fm.handle_response(f)
assert not fm.handle_response(None)
assert s.flow_count() == 1
@@ -858,7 +853,7 @@ class TestFlowMaster:
s = flow.State()
f = tutils.tflow()
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp(f.request))
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp(f.request))
pb = [f]
fm = flow.FlowMaster(None, s)
@@ -912,7 +907,7 @@ class TestFlowMaster:
def test_server_playback_kill(self):
s = flow.State()
f = tutils.tflow()
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp(f.request))
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp(f.request))
pb = [f]
fm = flow.FlowMaster(None, s)
fm.refresh_server_playback = True
@@ -1011,7 +1006,7 @@ class TestRequest:
assert r.get_state() == r2.get_state()
def test_get_url(self):
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
assert r.url == "http://address:22/path"
@@ -1032,7 +1027,7 @@ class TestRequest:
assert r.pretty_url(True) == "https://foo.com:22/path"
def test_path_components(self):
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.path = "/"
assert r.get_path_components() == []
r.path = "/foo/bar"
@@ -1052,7 +1047,7 @@ class TestRequest:
def test_getset_form_urlencoded(self):
d = odict.ODict([("one", "two"), ("three", "four")])
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq(content=netlib.utils.urlencode(d.lst)))
+ r = HTTPRequest.wrap(netlib.tutils.treq(content=netlib.utils.urlencode(d.lst)))
r.headers["content-type"] = [HDR_FORM_URLENCODED]
assert r.get_form_urlencoded() == d
@@ -1066,7 +1061,7 @@ class TestRequest:
def test_getset_query(self):
h = odict.ODictCaseless()
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.path = "/foo?x=y&a=b"
q = r.get_query()
assert q.lst == [("x", "y"), ("a", "b")]
@@ -1089,7 +1084,7 @@ class TestRequest:
def test_anticache(self):
h = odict.ODictCaseless()
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.headers = h
h["if-modified-since"] = ["test"]
h["if-none-match"] = ["test"]
@@ -1098,7 +1093,7 @@ class TestRequest:
assert not "if-none-match" in r.headers
def test_replace(self):
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.path = "path/foo"
r.headers["Foo"] = ["fOo"]
r.content = "afoob"
@@ -1108,31 +1103,31 @@ class TestRequest:
assert r.headers["boo"] == ["boo"]
def test_constrain_encoding(self):
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.headers["accept-encoding"] = ["gzip", "oink"]
r.constrain_encoding()
assert "oink" not in r.headers["accept-encoding"]
def test_decodeencode(self):
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.content = "falafel"
assert not r.decode()
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("identity")
assert r.headers["content-encoding"] == ["identity"]
assert r.content == "falafel"
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("gzip")
@@ -1143,7 +1138,7 @@ class TestRequest:
assert r.content == "falafel"
def test_get_decoded_content(self):
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.content = None
r.headers["content-encoding"] = ["identity"]
assert r.get_decoded_content() == None
@@ -1155,7 +1150,7 @@ class TestRequest:
def test_get_content_type(self):
h = odict.ODictCaseless()
h["Content-Type"] = ["text/plain"]
- resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ resp = HTTPResponse.wrap(netlib.tutils.tresp())
resp.headers = h
assert resp.headers.get_first("content-type") == "text/plain"
@@ -1168,7 +1163,7 @@ class TestResponse:
assert resp2.get_state() == resp.get_state()
def test_refresh(self):
- r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ r = HTTPResponse.wrap(netlib.tutils.tresp())
n = time.time()
r.headers["date"] = [email.utils.formatdate(n)]
pre = r.headers["date"]
@@ -1186,7 +1181,7 @@ class TestResponse:
r.refresh()
def test_refresh_cookie(self):
- r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ r = HTTPResponse.wrap(netlib.tutils.tresp())
# Invalid expires format, sent to us by Reddit.
c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/"
@@ -1196,7 +1191,7 @@ class TestResponse:
assert "00:21:38" in r._refresh_cookie(c, 60)
def test_replace(self):
- r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ r = HTTPResponse.wrap(netlib.tutils.tresp())
r.headers["Foo"] = ["fOo"]
r.content = "afoob"
assert r.replace("foo(?i)", "boo") == 3
@@ -1204,21 +1199,21 @@ class TestResponse:
assert r.headers["boo"] == ["boo"]
def test_decodeencode(self):
- r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ r = HTTPResponse.wrap(netlib.tutils.tresp())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
assert r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
- r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ r = HTTPResponse.wrap(netlib.tutils.tresp())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("identity")
assert r.headers["content-encoding"] == ["identity"]
assert r.content == "falafel"
- r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ r = HTTPResponse.wrap(netlib.tutils.tresp())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("gzip")
@@ -1235,7 +1230,7 @@ class TestResponse:
def test_get_content_type(self):
h = odict.ODictCaseless()
h["Content-Type"] = ["text/plain"]
- resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ resp = HTTPResponse.wrap(netlib.tutils.tresp())
resp.headers = h
assert resp.headers.get_first("content-type") == "text/plain"
@@ -1279,7 +1274,7 @@ class TestClientConnection:
def test_decoded():
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
assert r.content == "content"
assert not r.headers["content-encoding"]
r.encode("gzip")