aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_server.py')
-rw-r--r--test/test_server.py156
1 files changed, 54 insertions, 102 deletions
diff --git a/test/test_server.py b/test/test_server.py
index 77ba4576..a1259b7f 100644
--- a/test/test_server.py
+++ b/test/test_server.py
@@ -1,6 +1,7 @@
import socket
import time
from OpenSSL import SSL
+from netlib.tcp import Address
import netlib.tutils
from netlib import tcp, http, socks
@@ -10,7 +11,9 @@ from netlib.http.semantics import CONTENT_MISSING
from libpathod import pathoc, pathod
from libmproxy.proxy.config import HostMatcher
-from libmproxy.protocol import KILL, Error, http_wrappers
+from libmproxy.protocol import Kill
+from libmproxy.models import Error, HTTPResponse
+
import tutils
import tservers
@@ -67,7 +70,7 @@ class CommonMixin:
# SSL with the upstream proxy.
rt = self.master.replay_request(l, block=True)
assert not rt
- if isinstance(self, tservers.HTTPUpstreamProxTest) and not self.ssl:
+ if isinstance(self, tservers.HTTPUpstreamProxTest):
assert l.response.code == 502
else:
assert l.error
@@ -319,17 +322,6 @@ class TestHTTPAuth(tservers.HTTPProxTest):
assert ret.status_code == 202
-class TestHTTPConnectSSLError(tservers.HTTPProxTest):
- certfile = True
-
- def test_go(self):
- self.config.ssl_ports.append(self.proxy.port)
- p = self.pathoc_raw()
- dst = ("localhost", self.proxy.port)
- p.connect(connect_to=dst)
- tutils.raises("502 - Bad Gateway", p.http_connect, dst)
-
-
class TestHTTPS(tservers.HTTPProxTest, CommonMixin, TcpMixin):
ssl = True
ssloptions = pathod.SSLOptions(request_client_cert=True)
@@ -390,26 +382,31 @@ class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxTest):
("untrusted-cert", tutils.test_data.path("data/untrusted-server.crt"))
])
+ def _request(self):
+ p = self.pathoc()
+ # We need to make an actual request because the upstream connection is lazy-loaded.
+ return p.request("get:/p/242")
+
def test_default_verification_w_bad_cert(self):
"""Should use no verification."""
self.config.openssl_trusted_ca_server = tutils.test_data.path(
"data/trusted-cadir/trusted-ca.pem")
- self.pathoc()
+ assert self._request().status_code == 242
def test_no_verification_w_bad_cert(self):
self.config.openssl_verification_mode_server = SSL.VERIFY_NONE
self.config.openssl_trusted_ca_server = tutils.test_data.path(
"data/trusted-cadir/trusted-ca.pem")
- self.pathoc()
+ assert self._request().status_code == 242
def test_verification_w_bad_cert(self):
self.config.openssl_verification_mode_server = SSL.VERIFY_PEER
self.config.openssl_trusted_ca_server = tutils.test_data.path(
"data/trusted-cadir/trusted-ca.pem")
- tutils.raises("SSL handshake error", self.pathoc)
+ assert self._request().status_code == 502
class TestHTTPSNoCommonName(tservers.HTTPProxTest):
@@ -469,60 +466,11 @@ class TestSocks5(tservers.SocksModeTest):
assert "SOCKS5 mode failure" in f.content
-class TestSpoof(tservers.SpoofModeTest):
- def test_http(self):
- alist = (
- ("localhost", self.server.port),
- ("127.0.0.1", self.server.port)
- )
- for a in alist:
- self.server.clear_log()
- p = self.pathoc()
- f = p.request("get:/p/304:h'Host'='%s:%s'" % a)
- assert self.server.last_log()
- assert f.status_code == 304
- l = self.master.state.view[-1]
- assert l.server_conn.address
- assert l.server_conn.address.host == a[0]
- assert l.server_conn.address.port == a[1]
-
- def test_http_without_host(self):
- p = self.pathoc()
- f = p.request("get:/p/304:r")
- assert f.status_code == 400
-
-
-class TestSSLSpoof(tservers.SSLSpoofModeTest):
- def test_https(self):
- alist = (
- ("localhost", self.server.port),
- ("127.0.0.1", self.server.port)
- )
- for a in alist:
- self.server.clear_log()
- self.config.mode.sslport = a[1]
- p = self.pathoc(sni=a[0])
- f = p.request("get:/p/304")
- assert self.server.last_log()
- assert f.status_code == 304
- l = self.master.state.view[-1]
- assert l.server_conn.address
- assert l.server_conn.address.host == a[0]
- assert l.server_conn.address.port == a[1]
-
- def test_https_without_sni(self):
- a = ("localhost", self.server.port)
- self.config.mode.sslport = a[1]
- p = self.pathoc(sni=None)
- f = p.request("get:/p/304")
- assert f.status_code == 400
-
-
class TestHttps2Http(tservers.ReverseProxTest):
@classmethod
def get_proxy_config(cls):
d = super(TestHttps2Http, cls).get_proxy_config()
- d["upstream_server"][0] = True
+ d["upstream_server"] = ("http", d["upstream_server"][1])
return d
def pathoc(self, ssl, sni=None):
@@ -530,7 +478,7 @@ class TestHttps2Http(tservers.ReverseProxTest):
Returns a connected Pathoc instance.
"""
p = pathoc.Pathoc(
- ("localhost", self.proxy.port), ssl=ssl, sni=sni, fp=None
+ ("localhost", self.proxy.port), ssl=True, sni=sni, fp=None
)
p.connect()
return p
@@ -546,7 +494,7 @@ class TestHttps2Http(tservers.ReverseProxTest):
def test_http(self):
p = self.pathoc(ssl=False)
- assert p.request("get:'/p/200'").status_code == 400
+ assert p.request("get:'/p/200'").status_code == 200
class TestTransparent(tservers.TransparentProxTest, CommonMixin, TcpMixin):
@@ -560,7 +508,7 @@ class TestTransparentSSL(tservers.TransparentProxTest, CommonMixin, TcpMixin):
p = pathoc.Pathoc(("localhost", self.proxy.port), fp=None)
p.connect()
r = p.request("get:/")
- assert r.status_code == 400
+ assert r.status_code == 502
class TestProxy(tservers.HTTPProxTest):
@@ -661,63 +609,65 @@ class MasterRedirectRequest(tservers.TestMaster):
redirect_port = None # Set by TestRedirectRequest
def handle_request(self, f):
- request = f.request
- if request.path == "/p/201":
- addr = f.live.c.server_conn.address
- assert f.live.change_server(
- ("127.0.0.1", self.redirect_port), ssl=False)
- assert not f.live.change_server(
- ("127.0.0.1", self.redirect_port), ssl=False)
- tutils.raises(
- "SSL handshake error",
- f.live.change_server,
- ("127.0.0.1",
- self.redirect_port),
- ssl=True)
- assert f.live.change_server(addr, ssl=False)
- request.url = "http://127.0.0.1:%s/p/201" % self.redirect_port
- tservers.TestMaster.handle_request(self, f)
+ if f.request.path == "/p/201":
+
+ # This part should have no impact, but it should also not cause any exceptions.
+ addr = f.live.server_conn.address
+ addr2 = Address(("127.0.0.1", self.redirect_port))
+ f.live.set_server(addr2)
+ f.live.set_server(addr)
+
+ # This is the actual redirection.
+ f.request.port = self.redirect_port
+ super(MasterRedirectRequest, self).handle_request(f)
def handle_response(self, f):
f.response.content = str(f.client_conn.address.port)
f.response.headers[
"server-conn-id"] = [str(f.server_conn.source_address.port)]
- tservers.TestMaster.handle_response(self, f)
+ super(MasterRedirectRequest, self).handle_response(f)
class TestRedirectRequest(tservers.HTTPProxTest):
masterclass = MasterRedirectRequest
+ ssl = True
def test_redirect(self):
+ """
+ Imagine a single HTTPS connection with three requests:
+
+ 1. First request should pass through unmodified
+ 2. Second request will be redirected to a different host by an inline script
+ 3. Third request should pass through unmodified
+
+ This test verifies that the original destination is restored for the third request.
+ """
self.master.redirect_port = self.server2.port
p = self.pathoc()
self.server.clear_log()
self.server2.clear_log()
- r1 = p.request("get:'%s/p/200'" % self.server.urlbase)
+ r1 = p.request("get:'/p/200'")
assert r1.status_code == 200
assert self.server.last_log()
assert not self.server2.last_log()
self.server.clear_log()
self.server2.clear_log()
- r2 = p.request("get:'%s/p/201'" % self.server.urlbase)
+ r2 = p.request("get:'/p/201'")
assert r2.status_code == 201
assert not self.server.last_log()
assert self.server2.last_log()
self.server.clear_log()
self.server2.clear_log()
- r3 = p.request("get:'%s/p/202'" % self.server.urlbase)
+ r3 = p.request("get:'/p/202'")
assert r3.status_code == 202
assert self.server.last_log()
assert not self.server2.last_log()
assert r1.content == r2.content == r3.content
- assert r1.headers.get_first(
- "server-conn-id") == r3.headers.get_first("server-conn-id")
- # Make sure that we actually use the same connection in this test case
class MasterStreamRequest(tservers.TestMaster):
@@ -774,9 +724,9 @@ class TestStreamRequest(tservers.HTTPProxTest):
assert resp.headers["Transfer-Encoding"][0] == 'chunked'
assert resp.status_code == 200
- chunks = list(
- content for _, content, _ in protocol.read_http_body_chunked(
- resp.headers, None, "GET", 200, False))
+ chunks = list(protocol.read_http_body_chunked(
+ resp.headers, None, "GET", 200, False
+ ))
assert chunks == ["this", "isatest", ""]
connection.close()
@@ -784,7 +734,7 @@ class TestStreamRequest(tservers.HTTPProxTest):
class MasterFakeResponse(tservers.TestMaster):
def handle_request(self, f):
- resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ resp = HTTPResponse.wrap(netlib.tutils.tresp())
f.reply(resp)
@@ -809,7 +759,7 @@ class TestServerConnect(tservers.HTTPProxTest):
class MasterKillRequest(tservers.TestMaster):
def handle_request(self, f):
- f.reply(KILL)
+ f.reply(Kill)
class TestKillRequest(tservers.HTTPProxTest):
@@ -823,7 +773,7 @@ class TestKillRequest(tservers.HTTPProxTest):
class MasterKillResponse(tservers.TestMaster):
def handle_response(self, f):
- f.reply(KILL)
+ f.reply(Kill)
class TestKillResponse(tservers.HTTPProxTest):
@@ -849,7 +799,7 @@ class TestTransparentResolveError(tservers.TransparentProxTest):
class MasterIncomplete(tservers.TestMaster):
def handle_request(self, f):
- resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
+ resp = HTTPResponse.wrap(netlib.tutils.tresp())
resp.content = CONTENT_MISSING
f.reply(resp)
@@ -988,7 +938,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
if not (k[0] in exclude):
f.client_conn.finish()
f.error = Error("terminated")
- f.reply(KILL)
+ f.reply(Kill)
return _func(f)
setattr(master, attr, handler)
@@ -1009,6 +959,9 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
p = self.pathoc()
req = p.request("get:'/p/418:b\"content\"'")
+ assert req.content == "content"
+ assert req.status_code == 418
+
assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT and request
# CONNECT, failing request,
assert self.chain[0].tmaster.state.flow_count() == 4
@@ -1017,8 +970,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
assert self.chain[1].tmaster.state.flow_count() == 2
# (doesn't store (repeated) CONNECTs from chain[0]
# as it is a regular proxy)
- assert req.content == "content"
- assert req.status_code == 418
+
assert not self.chain[1].tmaster.state.flows[0].response # killed
assert self.chain[1].tmaster.state.flows[1].response