aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/scripts/stream_modify.py4
-rw-r--r--test/test_cmdline.py14
-rw-r--r--test/test_dump.py4
-rw-r--r--test/test_flow.py8
-rw-r--r--test/test_protocol_http.py3
-rw-r--r--test/test_proxy.py31
-rw-r--r--test/test_server.py144
-rw-r--r--test/tservers.py79
8 files changed, 89 insertions, 198 deletions
diff --git a/test/scripts/stream_modify.py b/test/scripts/stream_modify.py
index e5c323be..e26d83f1 100644
--- a/test/scripts/stream_modify.py
+++ b/test/scripts/stream_modify.py
@@ -1,6 +1,6 @@
def modify(chunks):
- for prefix, content, suffix in chunks:
- yield prefix, content.replace("foo", "bar"), suffix
+ for chunk in chunks:
+ yield chunk.replace("foo", "bar")
def responseheaders(context, flow):
diff --git a/test/test_cmdline.py b/test/test_cmdline.py
index eafcbde4..1443ee1c 100644
--- a/test/test_cmdline.py
+++ b/test/test_cmdline.py
@@ -38,15 +38,11 @@ def test_parse_replace_hook():
def test_parse_server_spec():
tutils.raises("Invalid server specification", cmdline.parse_server_spec, "")
assert cmdline.parse_server_spec(
- "http://foo.com:88") == [False, False, "foo.com", 88]
+ "http://foo.com:88") == ("http", ("foo.com", 88))
assert cmdline.parse_server_spec(
- "http://foo.com") == [False, False, "foo.com", 80]
+ "http://foo.com") == ("http", ("foo.com", 80))
assert cmdline.parse_server_spec(
- "https://foo.com") == [True, True, "foo.com", 443]
- assert cmdline.parse_server_spec_special(
- "https2http://foo.com") == [True, False, "foo.com", 80]
- assert cmdline.parse_server_spec_special(
- "http2https://foo.com") == [False, True, "foo.com", 443]
+ "https://foo.com") == ("https", ("foo.com", 443))
tutils.raises(
"Invalid server specification",
cmdline.parse_server_spec,
@@ -55,6 +51,10 @@ def test_parse_server_spec():
"Invalid server specification",
cmdline.parse_server_spec,
"http://")
+ tutils.raises(
+ "Invalid server specification",
+ cmdline.parse_server_spec,
+ "https2http://foo.com")
def test_parse_setheaders():
diff --git a/test/test_dump.py b/test/test_dump.py
index b3d724a5..b05f6a0f 100644
--- a/test/test_dump.py
+++ b/test/test_dump.py
@@ -5,8 +5,8 @@ import netlib.tutils
from netlib.http.semantics import CONTENT_MISSING
from libmproxy import dump, flow
-from libmproxy.protocol import http, http_wrappers
-from libmproxy.proxy.primitives import Log
+from libmproxy.protocol import http_wrappers
+from libmproxy.proxy import Log
import tutils
import mock
diff --git a/test/test_flow.py b/test/test_flow.py
index 711688da..5c49deed 100644
--- a/test/test_flow.py
+++ b/test/test_flow.py
@@ -4,6 +4,7 @@ import os.path
from cStringIO import StringIO
import email.utils
import mock
+from libmproxy.cmdline import parse_server_spec
import netlib.utils
from netlib import odict
@@ -672,11 +673,8 @@ class TestSerialize:
s = flow.State()
conf = ProxyConfig(
mode="reverse",
- upstream_server=[
- True,
- True,
- "use-this-domain",
- 80])
+ upstream_server=("https", ("use-this-domain", 80))
+ )
fm = flow.FlowMaster(DummyServer(conf), s)
fm.load_flows(r)
assert s.flows[0].request.host == "use-this-domain"
diff --git a/test/test_protocol_http.py b/test/test_protocol_http.py
index 2da54093..cd0f77fa 100644
--- a/test/test_protocol_http.py
+++ b/test/test_protocol_http.py
@@ -4,6 +4,7 @@ from cStringIO import StringIO
from mock import MagicMock
from libmproxy.protocol.http import *
+import netlib.http
from netlib import odict
from netlib.http import http1
from netlib.http.semantics import CONTENT_MISSING
@@ -56,7 +57,7 @@ class TestInvalidRequests(tservers.HTTPProxTest):
p = self.pathoc()
r = p.request("connect:'%s:%s'" % ("127.0.0.1", self.server2.port))
assert r.status_code == 400
- assert "Must not CONNECT on already encrypted connection" in r.body
+ assert "Invalid HTTP request form" in r.body
def test_relative_request(self):
p = self.pathoc_raw()
diff --git a/test/test_proxy.py b/test/test_proxy.py
index 6ab19e02..301ce2ca 100644
--- a/test/test_proxy.py
+++ b/test/test_proxy.py
@@ -1,8 +1,7 @@
-import argparse
from libmproxy import cmdline
-from libmproxy.proxy import ProxyConfig, process_proxy_options
+from libmproxy.proxy import ProxyConfig
+from libmproxy.proxy.config import process_proxy_options
from libmproxy.proxy.connection import ServerConnection
-from libmproxy.proxy.primitives import ProxyError
from libmproxy.proxy.server import DummyServer, ProxyServer, ConnectionHandler
import tutils
from libpathod import test
@@ -12,11 +11,6 @@ import mock
from OpenSSL import SSL
-def test_proxy_error():
- p = ProxyError(111, "msg")
- assert str(p)
-
-
class TestServerConnection:
def setUp(self):
self.d = test.Daemon()
@@ -97,13 +91,7 @@ class TestProcessProxyOptions:
self.assert_err("expected one argument", "-U")
self.assert_err("Invalid server specification", "-U", "upstream")
- self.assert_noerr("--spoof")
- self.assert_noerr("--ssl-spoof")
-
- self.assert_noerr("--spoofed-port", "443")
- self.assert_err("expected one argument", "--spoofed-port")
-
- self.assert_err("mutually exclusive", "-R", "http://localhost", "-T")
+ self.assert_err("not allowed with", "-R", "http://localhost", "-T")
def test_client_certs(self):
with tutils.tmpdir() as cadir:
@@ -181,13 +169,14 @@ class TestDummyServer:
class TestConnectionHandler:
def test_fatal_error(self):
config = mock.Mock()
- config.mode.get_upstream_server.side_effect = RuntimeError
+ root_layer = mock.Mock()
+ root_layer.side_effect = RuntimeError
+ config.mode.return_value = root_layer
c = ConnectionHandler(
- config,
mock.MagicMock(),
- ("127.0.0.1",
- 8080),
- None,
- mock.MagicMock())
+ ("127.0.0.1", 8080),
+ config,
+ mock.MagicMock()
+ )
with tutils.capture_stderr(c.handle) as output:
assert "mitmproxy has crashed" in output
diff --git a/test/test_server.py b/test/test_server.py
index 77ba4576..b691804b 100644
--- a/test/test_server.py
+++ b/test/test_server.py
@@ -1,6 +1,7 @@
import socket
import time
from OpenSSL import SSL
+from netlib.tcp import Address
import netlib.tutils
from netlib import tcp, http, socks
@@ -67,7 +68,7 @@ class CommonMixin:
# SSL with the upstream proxy.
rt = self.master.replay_request(l, block=True)
assert not rt
- if isinstance(self, tservers.HTTPUpstreamProxTest) and not self.ssl:
+ if isinstance(self, tservers.HTTPUpstreamProxTest):
assert l.response.code == 502
else:
assert l.error
@@ -319,17 +320,6 @@ class TestHTTPAuth(tservers.HTTPProxTest):
assert ret.status_code == 202
-class TestHTTPConnectSSLError(tservers.HTTPProxTest):
- certfile = True
-
- def test_go(self):
- self.config.ssl_ports.append(self.proxy.port)
- p = self.pathoc_raw()
- dst = ("localhost", self.proxy.port)
- p.connect(connect_to=dst)
- tutils.raises("502 - Bad Gateway", p.http_connect, dst)
-
-
class TestHTTPS(tservers.HTTPProxTest, CommonMixin, TcpMixin):
ssl = True
ssloptions = pathod.SSLOptions(request_client_cert=True)
@@ -390,26 +380,31 @@ class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxTest):
("untrusted-cert", tutils.test_data.path("data/untrusted-server.crt"))
])
+ def _request(self):
+ p = self.pathoc()
+ # We need to make an actual request because the upstream connection is lazy-loaded.
+ return p.request("get:/p/242")
+
def test_default_verification_w_bad_cert(self):
"""Should use no verification."""
self.config.openssl_trusted_ca_server = tutils.test_data.path(
"data/trusted-cadir/trusted-ca.pem")
- self.pathoc()
+ assert self._request().status_code == 242
def test_no_verification_w_bad_cert(self):
self.config.openssl_verification_mode_server = SSL.VERIFY_NONE
self.config.openssl_trusted_ca_server = tutils.test_data.path(
"data/trusted-cadir/trusted-ca.pem")
- self.pathoc()
+ assert self._request().status_code == 242
def test_verification_w_bad_cert(self):
self.config.openssl_verification_mode_server = SSL.VERIFY_PEER
self.config.openssl_trusted_ca_server = tutils.test_data.path(
"data/trusted-cadir/trusted-ca.pem")
- tutils.raises("SSL handshake error", self.pathoc)
+ assert self._request().status_code == 502
class TestHTTPSNoCommonName(tservers.HTTPProxTest):
@@ -469,60 +464,11 @@ class TestSocks5(tservers.SocksModeTest):
assert "SOCKS5 mode failure" in f.content
-class TestSpoof(tservers.SpoofModeTest):
- def test_http(self):
- alist = (
- ("localhost", self.server.port),
- ("127.0.0.1", self.server.port)
- )
- for a in alist:
- self.server.clear_log()
- p = self.pathoc()
- f = p.request("get:/p/304:h'Host'='%s:%s'" % a)
- assert self.server.last_log()
- assert f.status_code == 304
- l = self.master.state.view[-1]
- assert l.server_conn.address
- assert l.server_conn.address.host == a[0]
- assert l.server_conn.address.port == a[1]
-
- def test_http_without_host(self):
- p = self.pathoc()
- f = p.request("get:/p/304:r")
- assert f.status_code == 400
-
-
-class TestSSLSpoof(tservers.SSLSpoofModeTest):
- def test_https(self):
- alist = (
- ("localhost", self.server.port),
- ("127.0.0.1", self.server.port)
- )
- for a in alist:
- self.server.clear_log()
- self.config.mode.sslport = a[1]
- p = self.pathoc(sni=a[0])
- f = p.request("get:/p/304")
- assert self.server.last_log()
- assert f.status_code == 304
- l = self.master.state.view[-1]
- assert l.server_conn.address
- assert l.server_conn.address.host == a[0]
- assert l.server_conn.address.port == a[1]
-
- def test_https_without_sni(self):
- a = ("localhost", self.server.port)
- self.config.mode.sslport = a[1]
- p = self.pathoc(sni=None)
- f = p.request("get:/p/304")
- assert f.status_code == 400
-
-
class TestHttps2Http(tservers.ReverseProxTest):
@classmethod
def get_proxy_config(cls):
d = super(TestHttps2Http, cls).get_proxy_config()
- d["upstream_server"][0] = True
+ d["upstream_server"] = ("https2http", d["upstream_server"][1])
return d
def pathoc(self, ssl, sni=None):
@@ -544,10 +490,6 @@ class TestHttps2Http(tservers.ReverseProxTest):
assert p.request("get:'/p/200'").status_code == 200
assert all("Error in handle_sni" not in msg for msg in self.proxy.log)
- def test_http(self):
- p = self.pathoc(ssl=False)
- assert p.request("get:'/p/200'").status_code == 400
-
class TestTransparent(tservers.TransparentProxTest, CommonMixin, TcpMixin):
ssl = False
@@ -560,7 +502,7 @@ class TestTransparentSSL(tservers.TransparentProxTest, CommonMixin, TcpMixin):
p = pathoc.Pathoc(("localhost", self.proxy.port), fp=None)
p.connect()
r = p.request("get:/")
- assert r.status_code == 400
+ assert r.status_code == 502
class TestProxy(tservers.HTTPProxTest):
@@ -661,63 +603,67 @@ class MasterRedirectRequest(tservers.TestMaster):
redirect_port = None # Set by TestRedirectRequest
def handle_request(self, f):
- request = f.request
- if request.path == "/p/201":
- addr = f.live.c.server_conn.address
- assert f.live.change_server(
- ("127.0.0.1", self.redirect_port), ssl=False)
- assert not f.live.change_server(
- ("127.0.0.1", self.redirect_port), ssl=False)
- tutils.raises(
- "SSL handshake error",
- f.live.change_server,
- ("127.0.0.1",
- self.redirect_port),
- ssl=True)
- assert f.live.change_server(addr, ssl=False)
- request.url = "http://127.0.0.1:%s/p/201" % self.redirect_port
- tservers.TestMaster.handle_request(self, f)
+ if f.request.path == "/p/201":
+
+ # This part should have no impact, but it should not cause any exceptions.
+ addr = f.live.server_conn.address
+ addr2 = Address(("127.0.0.1", self.redirect_port))
+ f.live.set_server(addr2)
+ f.live.connect()
+ f.live.set_server(addr)
+ f.live.connect()
+
+ # This is the actual redirection.
+ f.request.port = self.redirect_port
+ super(MasterRedirectRequest, self).handle_request(f)
def handle_response(self, f):
f.response.content = str(f.client_conn.address.port)
f.response.headers[
"server-conn-id"] = [str(f.server_conn.source_address.port)]
- tservers.TestMaster.handle_response(self, f)
+ super(MasterRedirectRequest, self).handle_response(f)
class TestRedirectRequest(tservers.HTTPProxTest):
masterclass = MasterRedirectRequest
+ ssl = True
def test_redirect(self):
+ """
+ Imagine a single HTTPS connection with three requests:
+
+ 1. First request should pass through unmodified
+ 2. Second request will be redirected to a different host by an inline script
+ 3. Third request should pass through unmodified
+
+ This test verifies that the original destination is restored for the third request.
+ """
self.master.redirect_port = self.server2.port
p = self.pathoc()
self.server.clear_log()
self.server2.clear_log()
- r1 = p.request("get:'%s/p/200'" % self.server.urlbase)
+ r1 = p.request("get:'/p/200'")
assert r1.status_code == 200
assert self.server.last_log()
assert not self.server2.last_log()
self.server.clear_log()
self.server2.clear_log()
- r2 = p.request("get:'%s/p/201'" % self.server.urlbase)
+ r2 = p.request("get:'/p/201'")
assert r2.status_code == 201
assert not self.server.last_log()
assert self.server2.last_log()
self.server.clear_log()
self.server2.clear_log()
- r3 = p.request("get:'%s/p/202'" % self.server.urlbase)
+ r3 = p.request("get:'/p/202'")
assert r3.status_code == 202
assert self.server.last_log()
assert not self.server2.last_log()
assert r1.content == r2.content == r3.content
- assert r1.headers.get_first(
- "server-conn-id") == r3.headers.get_first("server-conn-id")
- # Make sure that we actually use the same connection in this test case
class MasterStreamRequest(tservers.TestMaster):
@@ -774,9 +720,9 @@ class TestStreamRequest(tservers.HTTPProxTest):
assert resp.headers["Transfer-Encoding"][0] == 'chunked'
assert resp.status_code == 200
- chunks = list(
- content for _, content, _ in protocol.read_http_body_chunked(
- resp.headers, None, "GET", 200, False))
+ chunks = list(protocol.read_http_body_chunked(
+ resp.headers, None, "GET", 200, False
+ ))
assert chunks == ["this", "isatest", ""]
connection.close()
@@ -1009,6 +955,9 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
p = self.pathoc()
req = p.request("get:'/p/418:b\"content\"'")
+ assert req.content == "content"
+ assert req.status_code == 418
+
assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT and request
# CONNECT, failing request,
assert self.chain[0].tmaster.state.flow_count() == 4
@@ -1017,8 +966,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
assert self.chain[1].tmaster.state.flow_count() == 2
# (doesn't store (repeated) CONNECTs from chain[0]
# as it is a regular proxy)
- assert req.content == "content"
- assert req.status_code == 418
+
assert not self.chain[1].tmaster.state.flows[0].response # killed
assert self.chain[1].tmaster.state.flows[1].response
diff --git a/test/tservers.py b/test/tservers.py
index 3c73b262..c5256e53 100644
--- a/test/tservers.py
+++ b/test/tservers.py
@@ -1,6 +1,5 @@
import os.path
import threading
-import Queue
import shutil
import tempfile
import flask
@@ -8,7 +7,6 @@ import mock
from libmproxy.proxy.config import ProxyConfig
from libmproxy.proxy.server import ProxyServer
-from libmproxy.proxy.primitives import TransparentProxyMode
import libpathod.test
import libpathod.pathoc
from libmproxy import flow, controller
@@ -130,7 +128,6 @@ class ProxTestBase(object):
no_upstream_cert = cls.no_upstream_cert,
cadir = cls.cadir,
authenticator = cls.authenticator,
- ssl_ports=([cls.server.port, cls.server2.port] if cls.ssl else []),
clientcerts = tutils.test_data.path("data/clientcert") if cls.clientcerts else None
)
@@ -183,22 +180,24 @@ class TResolver:
def original_addr(self, sock):
return ("127.0.0.1", self.port)
-
class TransparentProxTest(ProxTestBase):
ssl = None
resolver = TResolver
@classmethod
- @mock.patch("libmproxy.platform.resolver")
- def setupAll(cls, _):
+ def setupAll(cls):
super(TransparentProxTest, cls).setupAll()
- if cls.ssl:
- ports = [cls.server.port, cls.server2.port]
- else:
- ports = []
- cls.config.mode = TransparentProxyMode(
- cls.resolver(cls.server.port),
- ports)
+
+ cls._resolver = mock.patch(
+ "libmproxy.platform.resolver",
+ new=lambda: cls.resolver(cls.server.port)
+ )
+ cls._resolver.start()
+
+ @classmethod
+ def teardownAll(cls):
+ cls._resolver.stop()
+ super(TransparentProxTest, cls).teardownAll()
@classmethod
def get_proxy_config(cls):
@@ -235,12 +234,10 @@ class ReverseProxTest(ProxTestBase):
@classmethod
def get_proxy_config(cls):
d = ProxTestBase.get_proxy_config()
- d["upstream_server"] = [
- True if cls.ssl else False,
- True if cls.ssl else False,
- "127.0.0.1",
- cls.server.port
- ]
+ d["upstream_server"] = (
+ "https" if cls.ssl else "http",
+ ("127.0.0.1", cls.server.port)
+ )
d["mode"] = "reverse"
return d
@@ -274,48 +271,6 @@ class SocksModeTest(HTTPProxTest):
d["mode"] = "socks5"
return d
-class SpoofModeTest(ProxTestBase):
- ssl = None
-
- @classmethod
- def get_proxy_config(cls):
- d = ProxTestBase.get_proxy_config()
- d["upstream_server"] = None
- d["mode"] = "spoof"
- return d
-
- def pathoc(self, sni=None):
- """
- Returns a connected Pathoc instance.
- """
- p = libpathod.pathoc.Pathoc(
- ("localhost", self.proxy.port), ssl=self.ssl, sni=sni, fp=None
- )
- p.connect()
- return p
-
-
-class SSLSpoofModeTest(ProxTestBase):
- ssl = True
-
- @classmethod
- def get_proxy_config(cls):
- d = ProxTestBase.get_proxy_config()
- d["upstream_server"] = None
- d["mode"] = "sslspoof"
- d["spoofed_ssl_port"] = 443
- return d
-
- def pathoc(self, sni=None):
- """
- Returns a connected Pathoc instance.
- """
- p = libpathod.pathoc.Pathoc(
- ("localhost", self.proxy.port), ssl=self.ssl, sni=sni, fp=None
- )
- p.connect()
- return p
-
class ChainProxTest(ProxTestBase):
"""
@@ -360,7 +315,7 @@ class ChainProxTest(ProxTestBase):
if cls.chain: # First proxy is in normal mode.
d.update(
mode="upstream",
- upstream_server=(False, False, "127.0.0.1", cls.chain[0].port)
+ upstream_server=("http", ("127.0.0.1", cls.chain[0].port))
)
return d