aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/scripts/stream_modify.py4
-rw-r--r--test/test_cmdline.py10
-rw-r--r--test/test_dump.py10
-rw-r--r--test/test_filt.py2
-rw-r--r--test/test_flow.py89
-rw-r--r--test/test_protocol_http.py3
-rw-r--r--test/test_proxy.py45
-rw-r--r--test/test_server.py156
-rw-r--r--test/tservers.py79
-rw-r--r--test/tutils.py23
10 files changed, 157 insertions, 264 deletions
diff --git a/test/scripts/stream_modify.py b/test/scripts/stream_modify.py
index e5c323be..e26d83f1 100644
--- a/test/scripts/stream_modify.py
+++ b/test/scripts/stream_modify.py
@@ -1,6 +1,6 @@
def modify(chunks):
- for prefix, content, suffix in chunks:
- yield prefix, content.replace("foo", "bar"), suffix
+ for chunk in chunks:
+ yield chunk.replace("foo", "bar")
def responseheaders(context, flow):
diff --git a/test/test_cmdline.py b/test/test_cmdline.py
index eafcbde4..bb54d011 100644
--- a/test/test_cmdline.py
+++ b/test/test_cmdline.py
@@ -38,15 +38,11 @@ def test_parse_replace_hook():
def test_parse_server_spec():
tutils.raises("Invalid server specification", cmdline.parse_server_spec, "")
assert cmdline.parse_server_spec(
- "http://foo.com:88") == [False, False, "foo.com", 88]
+ "http://foo.com:88") == ("http", ("foo.com", 88))
assert cmdline.parse_server_spec(
- "http://foo.com") == [False, False, "foo.com", 80]
+ "http://foo.com") == ("http", ("foo.com", 80))
assert cmdline.parse_server_spec(
- "https://foo.com") == [True, True, "foo.com", 443]
- assert cmdline.parse_server_spec_special(
- "https2http://foo.com") == [True, False, "foo.com", 80]
- assert cmdline.parse_server_spec_special(
- "http2https://foo.com") == [False, True, "foo.com", 443]
+ "https://foo.com") == ("https", ("foo.com", 443))
tutils.raises(
"Invalid server specification",
cmdline.parse_server_spec,
diff --git a/test/test_dump.py b/test/test_dump.py
index b3d724a5..a0ad6cb4 100644
--- a/test/test_dump.py
+++ b/test/test_dump.py
@@ -1,18 +1,18 @@
import os
from cStringIO import StringIO
+from libmproxy.models import HTTPResponse
import netlib.tutils
from netlib.http.semantics import CONTENT_MISSING
from libmproxy import dump, flow
-from libmproxy.protocol import http, http_wrappers
-from libmproxy.proxy.primitives import Log
+from libmproxy.protocol import Log
import tutils
import mock
def test_strfuncs():
- t = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ t = HTTPResponse.wrap(netlib.tutils.tresp())
t.is_replay = True
dump.str_response(t)
@@ -34,7 +34,7 @@ class TestDumpMaster:
m.handle_clientconnect(f.client_conn)
m.handle_serverconnect(f.server_conn)
m.handle_request(f)
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp(content))
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp(content))
f = m.handle_response(f)
m.handle_clientdisconnect(f.client_conn)
return f
@@ -71,7 +71,7 @@ class TestDumpMaster:
f = tutils.tflow()
f.request.content = CONTENT_MISSING
m.handle_request(f)
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp())
f.response.content = CONTENT_MISSING
m.handle_response(f)
assert "content missing" in cs.getvalue()
diff --git a/test/test_filt.py b/test/test_filt.py
index bcdf6e4c..aeec2485 100644
--- a/test/test_filt.py
+++ b/test/test_filt.py
@@ -2,7 +2,7 @@ import cStringIO
from netlib import odict
from libmproxy import filt, flow
from libmproxy.protocol import http
-from libmproxy.protocol.primitives import Error
+from libmproxy.models import Error
import tutils
diff --git a/test/test_flow.py b/test/test_flow.py
index 711688da..9cce26b3 100644
--- a/test/test_flow.py
+++ b/test/test_flow.py
@@ -3,20 +3,18 @@ import time
import os.path
from cStringIO import StringIO
import email.utils
+
import mock
import netlib.utils
from netlib import odict
-from netlib.http.semantics import CONTENT_MISSING, HDR_FORM_URLENCODED, HDR_FORM_MULTIPART
-
-from libmproxy import filt, protocol, controller, utils, tnetstring, flow
-from libmproxy.protocol import http_wrappers
-from libmproxy.protocol.primitives import Error, Flow
-from libmproxy.protocol.http import decoded
+from netlib.http.semantics import CONTENT_MISSING, HDR_FORM_URLENCODED
+from libmproxy import filt, protocol, controller, tnetstring, flow
+from libmproxy.models import Error, Flow, HTTPRequest, HTTPResponse, HTTPFlow, decoded
from libmproxy.proxy.config import HostMatcher
from libmproxy.proxy import ProxyConfig
from libmproxy.proxy.server import DummyServer
-from libmproxy.proxy.connection import ClientConnection
+from libmproxy.models.connections import ClientConnection
import tutils
@@ -24,7 +22,7 @@ def test_app_registry():
ar = flow.AppRegistry()
ar.add("foo", "domain", 80)
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.host = "domain"
r.port = 80
assert ar.get(r)
@@ -32,7 +30,7 @@ def test_app_registry():
r.port = 81
assert not ar.get(r)
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.host = "domain2"
r.port = 80
assert not ar.get(r)
@@ -385,7 +383,7 @@ class TestFlow:
def test_backup(self):
f = tutils.tflow()
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp())
f.request.content = "foo"
assert not f.modified()
f.backup()
@@ -404,13 +402,13 @@ class TestFlow:
def test_getset_state(self):
f = tutils.tflow(resp=True)
state = f.get_state()
- assert f.get_state() == protocol.http.HTTPFlow.from_state(
+ assert f.get_state() == HTTPFlow.from_state(
state).get_state()
f.response = None
f.error = Error("error")
state = f.get_state()
- assert f.get_state() == protocol.http.HTTPFlow.from_state(
+ assert f.get_state() == HTTPFlow.from_state(
state).get_state()
f2 = f.copy()
@@ -518,16 +516,16 @@ class TestState:
assert c.add_flow(newf)
assert c.active_flow_count() == 2
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp())
assert c.update_flow(f)
assert c.flow_count() == 2
assert c.active_flow_count() == 1
- _ = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ _ = HTTPResponse.wrap(netlib.tutils.tresp())
assert not c.update_flow(None)
assert c.active_flow_count() == 1
- newf.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ newf.response = HTTPResponse.wrap(netlib.tutils.tresp())
assert c.update_flow(newf)
assert c.active_flow_count() == 0
@@ -559,7 +557,7 @@ class TestState:
c.set_limit("~s")
assert c.limit_txt == "~s"
assert len(c.view) == 0
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp())
c.update_flow(f)
assert len(c.view) == 1
c.set_limit(None)
@@ -591,7 +589,7 @@ class TestState:
def _add_response(self, state):
f = tutils.tflow()
state.add_flow(f)
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp())
state.update_flow(f)
def _add_error(self, state):
@@ -672,11 +670,8 @@ class TestSerialize:
s = flow.State()
conf = ProxyConfig(
mode="reverse",
- upstream_server=[
- True,
- True,
- "use-this-domain",
- 80])
+ upstream_server=("https", ("use-this-domain", 80))
+ )
fm = flow.FlowMaster(DummyServer(conf), s)
fm.load_flows(r)
assert s.flows[0].request.host == "use-this-domain"
@@ -809,11 +804,11 @@ class TestFlowMaster:
fm.anticomp = True
f = tutils.tflow(req=None)
fm.handle_clientconnect(f.client_conn)
- f.request = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ f.request = HTTPRequest.wrap(netlib.tutils.treq())
fm.handle_request(f)
assert s.flow_count() == 1
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp())
fm.handle_response(f)
assert not fm.handle_response(None)
assert s.flow_count() == 1
@@ -858,7 +853,7 @@ class TestFlowMaster:
s = flow.State()
f = tutils.tflow()
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp(f.request))
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp(f.request))
pb = [f]
fm = flow.FlowMaster(None, s)
@@ -912,7 +907,7 @@ class TestFlowMaster:
def test_server_playback_kill(self):
s = flow.State()
f = tutils.tflow()
- f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp(f.request))
+ f.response = HTTPResponse.wrap(netlib.tutils.tresp(f.request))
pb = [f]
fm = flow.FlowMaster(None, s)
fm.refresh_server_playback = True
@@ -1011,7 +1006,7 @@ class TestRequest:
assert r.get_state() == r2.get_state()
def test_get_url(self):
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
assert r.url == "http://address:22/path"
@@ -1032,7 +1027,7 @@ class TestRequest:
assert r.pretty_url(True) == "https://foo.com:22/path"
def test_path_components(self):
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.path = "/"
assert r.get_path_components() == []
r.path = "/foo/bar"
@@ -1052,7 +1047,7 @@ class TestRequest:
def test_getset_form_urlencoded(self):
d = odict.ODict([("one", "two"), ("three", "four")])
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq(content=netlib.utils.urlencode(d.lst)))
+ r = HTTPRequest.wrap(netlib.tutils.treq(content=netlib.utils.urlencode(d.lst)))
r.headers["content-type"] = [HDR_FORM_URLENCODED]
assert r.get_form_urlencoded() == d
@@ -1066,7 +1061,7 @@ class TestRequest:
def test_getset_query(self):
h = odict.ODictCaseless()
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.path = "/foo?x=y&a=b"
q = r.get_query()
assert q.lst == [("x", "y"), ("a", "b")]
@@ -1089,7 +1084,7 @@ class TestRequest:
def test_anticache(self):
h = odict.ODictCaseless()
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.headers = h
h["if-modified-since"] = ["test"]
h["if-none-match"] = ["test"]
@@ -1098,7 +1093,7 @@ class TestRequest:
assert not "if-none-match" in r.headers
def test_replace(self):
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.path = "path/foo"
r.headers["Foo"] = ["fOo"]
r.content = "afoob"
@@ -1108,31 +1103,31 @@ class TestRequest:
assert r.headers["boo"] == ["boo"]
def test_constrain_encoding(self):
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.headers["accept-encoding"] = ["gzip", "oink"]
r.constrain_encoding()
assert "oink" not in r.headers["accept-encoding"]
def test_decodeencode(self):
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.content = "falafel"
assert not r.decode()
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("identity")
assert r.headers["content-encoding"] == ["identity"]
assert r.content == "falafel"
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("gzip")
@@ -1143,7 +1138,7 @@ class TestRequest:
assert r.content == "falafel"
def test_get_decoded_content(self):
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
r.content = None
r.headers["content-encoding"] = ["identity"]
assert r.get_decoded_content() == None
@@ -1155,7 +1150,7 @@ class TestRequest:
def test_get_content_type(self):
h = odict.ODictCaseless()
h["Content-Type"] = ["text/plain"]
- resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ resp = HTTPResponse.wrap(netlib.tutils.tresp())
resp.headers = h
assert resp.headers.get_first("content-type") == "text/plain"
@@ -1168,7 +1163,7 @@ class TestResponse:
assert resp2.get_state() == resp.get_state()
def test_refresh(self):
- r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ r = HTTPResponse.wrap(netlib.tutils.tresp())
n = time.time()
r.headers["date"] = [email.utils.formatdate(n)]
pre = r.headers["date"]
@@ -1186,7 +1181,7 @@ class TestResponse:
r.refresh()
def test_refresh_cookie(self):
- r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ r = HTTPResponse.wrap(netlib.tutils.tresp())
# Invalid expires format, sent to us by Reddit.
c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/"
@@ -1196,7 +1191,7 @@ class TestResponse:
assert "00:21:38" in r._refresh_cookie(c, 60)
def test_replace(self):
- r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ r = HTTPResponse.wrap(netlib.tutils.tresp())
r.headers["Foo"] = ["fOo"]
r.content = "afoob"
assert r.replace("foo(?i)", "boo") == 3
@@ -1204,21 +1199,21 @@ class TestResponse:
assert r.headers["boo"] == ["boo"]
def test_decodeencode(self):
- r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ r = HTTPResponse.wrap(netlib.tutils.tresp())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
assert r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
- r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ r = HTTPResponse.wrap(netlib.tutils.tresp())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("identity")
assert r.headers["content-encoding"] == ["identity"]
assert r.content == "falafel"
- r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ r = HTTPResponse.wrap(netlib.tutils.tresp())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("gzip")
@@ -1235,7 +1230,7 @@ class TestResponse:
def test_get_content_type(self):
h = odict.ODictCaseless()
h["Content-Type"] = ["text/plain"]
- resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ resp = HTTPResponse.wrap(netlib.tutils.tresp())
resp.headers = h
assert resp.headers.get_first("content-type") == "text/plain"
@@ -1279,7 +1274,7 @@ class TestClientConnection:
def test_decoded():
- r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
+ r = HTTPRequest.wrap(netlib.tutils.treq())
assert r.content == "content"
assert not r.headers["content-encoding"]
r.encode("gzip")
diff --git a/test/test_protocol_http.py b/test/test_protocol_http.py
index 2da54093..cd0f77fa 100644
--- a/test/test_protocol_http.py
+++ b/test/test_protocol_http.py
@@ -4,6 +4,7 @@ from cStringIO import StringIO
from mock import MagicMock
from libmproxy.protocol.http import *
+import netlib.http
from netlib import odict
from netlib.http import http1
from netlib.http.semantics import CONTENT_MISSING
@@ -56,7 +57,7 @@ class TestInvalidRequests(tservers.HTTPProxTest):
p = self.pathoc()
r = p.request("connect:'%s:%s'" % ("127.0.0.1", self.server2.port))
assert r.status_code == 400
- assert "Must not CONNECT on already encrypted connection" in r.body
+ assert "Invalid HTTP request form" in r.body
def test_relative_request(self):
p = self.pathoc_raw()
diff --git a/test/test_proxy.py b/test/test_proxy.py
index 6ab19e02..3707fabe 100644
--- a/test/test_proxy.py
+++ b/test/test_proxy.py
@@ -1,20 +1,14 @@
-import argparse
+import mock
+from OpenSSL import SSL
+
from libmproxy import cmdline
-from libmproxy.proxy import ProxyConfig, process_proxy_options
-from libmproxy.proxy.connection import ServerConnection
-from libmproxy.proxy.primitives import ProxyError
+from libmproxy.proxy import ProxyConfig
+from libmproxy.proxy.config import process_proxy_options
+from libmproxy.models.connections import ServerConnection
from libmproxy.proxy.server import DummyServer, ProxyServer, ConnectionHandler
import tutils
from libpathod import test
from netlib import http, tcp
-import mock
-
-from OpenSSL import SSL
-
-
-def test_proxy_error():
- p = ProxyError(111, "msg")
- assert str(p)
class TestServerConnection:
@@ -97,13 +91,10 @@ class TestProcessProxyOptions:
self.assert_err("expected one argument", "-U")
self.assert_err("Invalid server specification", "-U", "upstream")
- self.assert_noerr("--spoof")
- self.assert_noerr("--ssl-spoof")
-
- self.assert_noerr("--spoofed-port", "443")
- self.assert_err("expected one argument", "--spoofed-port")
+ self.assert_err("not allowed with", "-R", "http://localhost", "-T")
- self.assert_err("mutually exclusive", "-R", "http://localhost", "-T")
+ def test_socks_auth(self):
+ self.assert_err("Proxy Authentication not supported in SOCKS mode.", "--socks", "--nonanonymous")
def test_client_certs(self):
with tutils.tmpdir() as cadir:
@@ -181,13 +172,19 @@ class TestDummyServer:
class TestConnectionHandler:
def test_fatal_error(self):
config = mock.Mock()
- config.mode.get_upstream_server.side_effect = RuntimeError
+ root_layer = mock.Mock()
+ root_layer.side_effect = RuntimeError
+ config.mode.return_value = root_layer
+ channel = mock.Mock()
+
+ def ask(_, x):
+ return x
+ channel.ask = ask
c = ConnectionHandler(
- config,
mock.MagicMock(),
- ("127.0.0.1",
- 8080),
- None,
- mock.MagicMock())
+ ("127.0.0.1", 8080),
+ config,
+ channel
+ )
with tutils.capture_stderr(c.handle) as output:
assert "mitmproxy has crashed" in output
diff --git a/test/test_server.py b/test/test_server.py
index 77ba4576..a1259b7f 100644
--- a/test/test_server.py
+++ b/test/test_server.py
@@ -1,6 +1,7 @@
import socket
import time
from OpenSSL import SSL
+from netlib.tcp import Address
import netlib.tutils
from netlib import tcp, http, socks
@@ -10,7 +11,9 @@ from netlib.http.semantics import CONTENT_MISSING
from libpathod import pathoc, pathod
from libmproxy.proxy.config import HostMatcher
-from libmproxy.protocol import KILL, Error, http_wrappers
+from libmproxy.protocol import Kill
+from libmproxy.models import Error, HTTPResponse
+
import tutils
import tservers
@@ -67,7 +70,7 @@ class CommonMixin:
# SSL with the upstream proxy.
rt = self.master.replay_request(l, block=True)
assert not rt
- if isinstance(self, tservers.HTTPUpstreamProxTest) and not self.ssl:
+ if isinstance(self, tservers.HTTPUpstreamProxTest):
assert l.response.code == 502
else:
assert l.error
@@ -319,17 +322,6 @@ class TestHTTPAuth(tservers.HTTPProxTest):
assert ret.status_code == 202
-class TestHTTPConnectSSLError(tservers.HTTPProxTest):
- certfile = True
-
- def test_go(self):
- self.config.ssl_ports.append(self.proxy.port)
- p = self.pathoc_raw()
- dst = ("localhost", self.proxy.port)
- p.connect(connect_to=dst)
- tutils.raises("502 - Bad Gateway", p.http_connect, dst)
-
-
class TestHTTPS(tservers.HTTPProxTest, CommonMixin, TcpMixin):
ssl = True
ssloptions = pathod.SSLOptions(request_client_cert=True)
@@ -390,26 +382,31 @@ class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxTest):
("untrusted-cert", tutils.test_data.path("data/untrusted-server.crt"))
])
+ def _request(self):
+ p = self.pathoc()
+ # We need to make an actual request because the upstream connection is lazy-loaded.
+ return p.request("get:/p/242")
+
def test_default_verification_w_bad_cert(self):
"""Should use no verification."""
self.config.openssl_trusted_ca_server = tutils.test_data.path(
"data/trusted-cadir/trusted-ca.pem")
- self.pathoc()
+ assert self._request().status_code == 242
def test_no_verification_w_bad_cert(self):
self.config.openssl_verification_mode_server = SSL.VERIFY_NONE
self.config.openssl_trusted_ca_server = tutils.test_data.path(
"data/trusted-cadir/trusted-ca.pem")
- self.pathoc()
+ assert self._request().status_code == 242
def test_verification_w_bad_cert(self):
self.config.openssl_verification_mode_server = SSL.VERIFY_PEER
self.config.openssl_trusted_ca_server = tutils.test_data.path(
"data/trusted-cadir/trusted-ca.pem")
- tutils.raises("SSL handshake error", self.pathoc)
+ assert self._request().status_code == 502
class TestHTTPSNoCommonName(tservers.HTTPProxTest):
@@ -469,60 +466,11 @@ class TestSocks5(tservers.SocksModeTest):
assert "SOCKS5 mode failure" in f.content
-class TestSpoof(tservers.SpoofModeTest):
- def test_http(self):
- alist = (
- ("localhost", self.server.port),
- ("127.0.0.1", self.server.port)
- )
- for a in alist:
- self.server.clear_log()
- p = self.pathoc()
- f = p.request("get:/p/304:h'Host'='%s:%s'" % a)
- assert self.server.last_log()
- assert f.status_code == 304
- l = self.master.state.view[-1]
- assert l.server_conn.address
- assert l.server_conn.address.host == a[0]
- assert l.server_conn.address.port == a[1]
-
- def test_http_without_host(self):
- p = self.pathoc()
- f = p.request("get:/p/304:r")
- assert f.status_code == 400
-
-
-class TestSSLSpoof(tservers.SSLSpoofModeTest):
- def test_https(self):
- alist = (
- ("localhost", self.server.port),
- ("127.0.0.1", self.server.port)
- )
- for a in alist:
- self.server.clear_log()
- self.config.mode.sslport = a[1]
- p = self.pathoc(sni=a[0])
- f = p.request("get:/p/304")
- assert self.server.last_log()
- assert f.status_code == 304
- l = self.master.state.view[-1]
- assert l.server_conn.address
- assert l.server_conn.address.host == a[0]
- assert l.server_conn.address.port == a[1]
-
- def test_https_without_sni(self):
- a = ("localhost", self.server.port)
- self.config.mode.sslport = a[1]
- p = self.pathoc(sni=None)
- f = p.request("get:/p/304")
- assert f.status_code == 400
-
-
class TestHttps2Http(tservers.ReverseProxTest):
@classmethod
def get_proxy_config(cls):
d = super(TestHttps2Http, cls).get_proxy_config()
- d["upstream_server"][0] = True
+ d["upstream_server"] = ("http", d["upstream_server"][1])
return d
def pathoc(self, ssl, sni=None):
@@ -530,7 +478,7 @@ class TestHttps2Http(tservers.ReverseProxTest):
Returns a connected Pathoc instance.
"""
p = pathoc.Pathoc(
- ("localhost", self.proxy.port), ssl=ssl, sni=sni, fp=None
+ ("localhost", self.proxy.port), ssl=True, sni=sni, fp=None
)
p.connect()
return p
@@ -546,7 +494,7 @@ class TestHttps2Http(tservers.ReverseProxTest):
def test_http(self):
p = self.pathoc(ssl=False)
- assert p.request("get:'/p/200'").status_code == 400
+ assert p.request("get:'/p/200'").status_code == 200
class TestTransparent(tservers.TransparentProxTest, CommonMixin, TcpMixin):
@@ -560,7 +508,7 @@ class TestTransparentSSL(tservers.TransparentProxTest, CommonMixin, TcpMixin):
p = pathoc.Pathoc(("localhost", self.proxy.port), fp=None)
p.connect()
r = p.request("get:/")
- assert r.status_code == 400
+ assert r.status_code == 502
class TestProxy(tservers.HTTPProxTest):
@@ -661,63 +609,65 @@ class MasterRedirectRequest(tservers.TestMaster):
redirect_port = None # Set by TestRedirectRequest
def handle_request(self, f):
- request = f.request
- if request.path == "/p/201":
- addr = f.live.c.server_conn.address
- assert f.live.change_server(
- ("127.0.0.1", self.redirect_port), ssl=False)
- assert not f.live.change_server(
- ("127.0.0.1", self.redirect_port), ssl=False)
- tutils.raises(
- "SSL handshake error",
- f.live.change_server,
- ("127.0.0.1",
- self.redirect_port),
- ssl=True)
- assert f.live.change_server(addr, ssl=False)
- request.url = "http://127.0.0.1:%s/p/201" % self.redirect_port
- tservers.TestMaster.handle_request(self, f)
+ if f.request.path == "/p/201":
+
+ # This part should have no impact, but it should also not cause any exceptions.
+ addr = f.live.server_conn.address
+ addr2 = Address(("127.0.0.1", self.redirect_port))
+ f.live.set_server(addr2)
+ f.live.set_server(addr)
+
+ # This is the actual redirection.
+ f.request.port = self.redirect_port
+ super(MasterRedirectRequest, self).handle_request(f)
def handle_response(self, f):
f.response.content = str(f.client_conn.address.port)
f.response.headers[
"server-conn-id"] = [str(f.server_conn.source_address.port)]
- tservers.TestMaster.handle_response(self, f)
+ super(MasterRedirectRequest, self).handle_response(f)
class TestRedirectRequest(tservers.HTTPProxTest):
masterclass = MasterRedirectRequest
+ ssl = True
def test_redirect(self):
+ """
+ Imagine a single HTTPS connection with three requests:
+
+ 1. First request should pass through unmodified
+ 2. Second request will be redirected to a different host by an inline script
+ 3. Third request should pass through unmodified
+
+ This test verifies that the original destination is restored for the third request.
+ """
self.master.redirect_port = self.server2.port
p = self.pathoc()
self.server.clear_log()
self.server2.clear_log()
- r1 = p.request("get:'%s/p/200'" % self.server.urlbase)
+ r1 = p.request("get:'/p/200'")
assert r1.status_code == 200
assert self.server.last_log()
assert not self.server2.last_log()
self.server.clear_log()
self.server2.clear_log()
- r2 = p.request("get:'%s/p/201'" % self.server.urlbase)
+ r2 = p.request("get:'/p/201'")
assert r2.status_code == 201
assert not self.server.last_log()
assert self.server2.last_log()
self.server.clear_log()
self.server2.clear_log()
- r3 = p.request("get:'%s/p/202'" % self.server.urlbase)
+ r3 = p.request("get:'/p/202'")
assert r3.status_code == 202
assert self.server.last_log()
assert not self.server2.last_log()
assert r1.content == r2.content == r3.content
- assert r1.headers.get_first(
- "server-conn-id") == r3.headers.get_first("server-conn-id")
- # Make sure that we actually use the same connection in this test case
class MasterStreamRequest(tservers.TestMaster):
@@ -774,9 +724,9 @@ class TestStreamRequest(tservers.HTTPProxTest):
assert resp.headers["Transfer-Encoding"][0] == 'chunked'
assert resp.status_code == 200
- chunks = list(
- content for _, content, _ in protocol.read_http_body_chunked(
- resp.headers, None, "GET", 200, False))
+ chunks = list(protocol.read_http_body_chunked(
+ resp.headers, None, "GET", 200, False
+ ))
assert chunks == ["this", "isatest", ""]
connection.close()
@@ -784,7 +734,7 @@ class TestStreamRequest(tservers.HTTPProxTest):
class MasterFakeResponse(tservers.TestMaster):
def handle_request(self, f):
- resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ resp = HTTPResponse.wrap(netlib.tutils.tresp())
f.reply(resp)
@@ -809,7 +759,7 @@ class TestServerConnect(tservers.HTTPProxTest):
class MasterKillRequest(tservers.TestMaster):
def handle_request(self, f):
- f.reply(KILL)
+ f.reply(Kill)
class TestKillRequest(tservers.HTTPProxTest):
@@ -823,7 +773,7 @@ class TestKillRequest(tservers.HTTPProxTest):
class MasterKillResponse(tservers.TestMaster):
def handle_response(self, f):
- f.reply(KILL)
+ f.reply(Kill)
class TestKillResponse(tservers.HTTPProxTest):
@@ -849,7 +799,7 @@ class TestTransparentResolveError(tservers.TransparentProxTest):
class MasterIncomplete(tservers.TestMaster):
def handle_request(self, f):
- resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ resp = HTTPResponse.wrap(netlib.tutils.tresp())
resp.content = CONTENT_MISSING
f.reply(resp)
@@ -988,7 +938,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
if not (k[0] in exclude):
f.client_conn.finish()
f.error = Error("terminated")
- f.reply(KILL)
+ f.reply(Kill)
return _func(f)
setattr(master, attr, handler)
@@ -1009,6 +959,9 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
p = self.pathoc()
req = p.request("get:'/p/418:b\"content\"'")
+ assert req.content == "content"
+ assert req.status_code == 418
+
assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT and request
# CONNECT, failing request,
assert self.chain[0].tmaster.state.flow_count() == 4
@@ -1017,8 +970,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
assert self.chain[1].tmaster.state.flow_count() == 2
# (doesn't store (repeated) CONNECTs from chain[0]
# as it is a regular proxy)
- assert req.content == "content"
- assert req.status_code == 418
+
assert not self.chain[1].tmaster.state.flows[0].response # killed
assert self.chain[1].tmaster.state.flows[1].response
diff --git a/test/tservers.py b/test/tservers.py
index 3c73b262..c5256e53 100644
--- a/test/tservers.py
+++ b/test/tservers.py
@@ -1,6 +1,5 @@
import os.path
import threading
-import Queue
import shutil
import tempfile
import flask
@@ -8,7 +7,6 @@ import mock
from libmproxy.proxy.config import ProxyConfig
from libmproxy.proxy.server import ProxyServer
-from libmproxy.proxy.primitives import TransparentProxyMode
import libpathod.test
import libpathod.pathoc
from libmproxy import flow, controller
@@ -130,7 +128,6 @@ class ProxTestBase(object):
no_upstream_cert = cls.no_upstream_cert,
cadir = cls.cadir,
authenticator = cls.authenticator,
- ssl_ports=([cls.server.port, cls.server2.port] if cls.ssl else []),
clientcerts = tutils.test_data.path("data/clientcert") if cls.clientcerts else None
)
@@ -183,22 +180,24 @@ class TResolver:
def original_addr(self, sock):
return ("127.0.0.1", self.port)
-
class TransparentProxTest(ProxTestBase):
ssl = None
resolver = TResolver
@classmethod
- @mock.patch("libmproxy.platform.resolver")
- def setupAll(cls, _):
+ def setupAll(cls):
super(TransparentProxTest, cls).setupAll()
- if cls.ssl:
- ports = [cls.server.port, cls.server2.port]
- else:
- ports = []
- cls.config.mode = TransparentProxyMode(
- cls.resolver(cls.server.port),
- ports)
+
+ cls._resolver = mock.patch(
+ "libmproxy.platform.resolver",
+ new=lambda: cls.resolver(cls.server.port)
+ )
+ cls._resolver.start()
+
+ @classmethod
+ def teardownAll(cls):
+ cls._resolver.stop()
+ super(TransparentProxTest, cls).teardownAll()
@classmethod
def get_proxy_config(cls):
@@ -235,12 +234,10 @@ class ReverseProxTest(ProxTestBase):
@classmethod
def get_proxy_config(cls):
d = ProxTestBase.get_proxy_config()
- d["upstream_server"] = [
- True if cls.ssl else False,
- True if cls.ssl else False,
- "127.0.0.1",
- cls.server.port
- ]
+ d["upstream_server"] = (
+ "https" if cls.ssl else "http",
+ ("127.0.0.1", cls.server.port)
+ )
d["mode"] = "reverse"
return d
@@ -274,48 +271,6 @@ class SocksModeTest(HTTPProxTest):
d["mode"] = "socks5"
return d
-class SpoofModeTest(ProxTestBase):
- ssl = None
-
- @classmethod
- def get_proxy_config(cls):
- d = ProxTestBase.get_proxy_config()
- d["upstream_server"] = None
- d["mode"] = "spoof"
- return d
-
- def pathoc(self, sni=None):
- """
- Returns a connected Pathoc instance.
- """
- p = libpathod.pathoc.Pathoc(
- ("localhost", self.proxy.port), ssl=self.ssl, sni=sni, fp=None
- )
- p.connect()
- return p
-
-
-class SSLSpoofModeTest(ProxTestBase):
- ssl = True
-
- @classmethod
- def get_proxy_config(cls):
- d = ProxTestBase.get_proxy_config()
- d["upstream_server"] = None
- d["mode"] = "sslspoof"
- d["spoofed_ssl_port"] = 443
- return d
-
- def pathoc(self, sni=None):
- """
- Returns a connected Pathoc instance.
- """
- p = libpathod.pathoc.Pathoc(
- ("localhost", self.proxy.port), ssl=self.ssl, sni=sni, fp=None
- )
- p.connect()
- return p
-
class ChainProxTest(ProxTestBase):
"""
@@ -360,7 +315,7 @@ class ChainProxTest(ProxTestBase):
if cls.chain: # First proxy is in normal mode.
d.update(
mode="upstream",
- upstream_server=(False, False, "127.0.0.1", cls.chain[0].port)
+ upstream_server=("http", ("127.0.0.1", cls.chain[0].port))
)
return d
diff --git a/test/tutils.py b/test/tutils.py
index 61b1154c..d64388f3 100644
--- a/test/tutils.py
+++ b/test/tutils.py
@@ -3,22 +3,19 @@ import shutil
import tempfile
import argparse
import sys
-import mock_urwid
from cStringIO import StringIO
from contextlib import contextmanager
+
from nose.plugins.skip import SkipTest
from mock import Mock
-from time import time
-from netlib import certutils, odict
import netlib.tutils
-
-from libmproxy import flow, utils, controller
-from libmproxy.protocol import http, http_wrappers
-from libmproxy.proxy.connection import ClientConnection, ServerConnection
+from libmproxy import utils, controller
+from libmproxy.models import (
+ ClientConnection, ServerConnection, Error, HTTPRequest, HTTPResponse, HTTPFlow
+)
from libmproxy.console.flowview import FlowView
from libmproxy.console import ConsoleState
-from libmproxy.protocol.primitives import Error
def _SkipWindows():
@@ -53,11 +50,11 @@ def tflow(client_conn=True, server_conn=True, req=True, resp=None, err=None):
err = terr()
if req:
- req = http_wrappers.HTTPRequest.wrap(req)
+ req = HTTPRequest.wrap(req)
if resp:
- resp = http_wrappers.HTTPResponse.wrap(resp)
+ resp = HTTPResponse.wrap(resp)
- f = http.HTTPFlow(client_conn, server_conn)
+ f = HTTPFlow(client_conn, server_conn)
f.request = req
f.response = resp
f.error = err
@@ -91,7 +88,6 @@ def tserver_conn():
return c
-
def terr(content="error"):
"""
@return: libmproxy.protocol.primitives.Error
@@ -106,7 +102,7 @@ def tflowview(request_contents=None):
if request_contents is None:
flow = tflow()
else:
- flow = tflow(req=treq(request_contents))
+ flow = tflow(req=netlib.tutils.treq(request_contents))
fv = FlowView(m, cs, flow)
return fv
@@ -184,4 +180,5 @@ def capture_stderr(command, *args, **kwargs):
yield sys.stderr.getvalue()
sys.stderr = out
+
test_data = utils.Data(__name__)