diff options
Diffstat (limited to 'test/test_server.py')
-rw-r--r-- | test/test_server.py | 156 |
1 files changed, 54 insertions, 102 deletions
diff --git a/test/test_server.py b/test/test_server.py index 77ba4576..a1259b7f 100644 --- a/test/test_server.py +++ b/test/test_server.py @@ -1,6 +1,7 @@ import socket import time from OpenSSL import SSL +from netlib.tcp import Address import netlib.tutils from netlib import tcp, http, socks @@ -10,7 +11,9 @@ from netlib.http.semantics import CONTENT_MISSING from libpathod import pathoc, pathod from libmproxy.proxy.config import HostMatcher -from libmproxy.protocol import KILL, Error, http_wrappers +from libmproxy.protocol import Kill +from libmproxy.models import Error, HTTPResponse + import tutils import tservers @@ -67,7 +70,7 @@ class CommonMixin: # SSL with the upstream proxy. rt = self.master.replay_request(l, block=True) assert not rt - if isinstance(self, tservers.HTTPUpstreamProxTest) and not self.ssl: + if isinstance(self, tservers.HTTPUpstreamProxTest): assert l.response.code == 502 else: assert l.error @@ -319,17 +322,6 @@ class TestHTTPAuth(tservers.HTTPProxTest): assert ret.status_code == 202 -class TestHTTPConnectSSLError(tservers.HTTPProxTest): - certfile = True - - def test_go(self): - self.config.ssl_ports.append(self.proxy.port) - p = self.pathoc_raw() - dst = ("localhost", self.proxy.port) - p.connect(connect_to=dst) - tutils.raises("502 - Bad Gateway", p.http_connect, dst) - - class TestHTTPS(tservers.HTTPProxTest, CommonMixin, TcpMixin): ssl = True ssloptions = pathod.SSLOptions(request_client_cert=True) @@ -390,26 +382,31 @@ class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxTest): ("untrusted-cert", tutils.test_data.path("data/untrusted-server.crt")) ]) + def _request(self): + p = self.pathoc() + # We need to make an actual request because the upstream connection is lazy-loaded. + return p.request("get:/p/242") + def test_default_verification_w_bad_cert(self): """Should use no verification.""" self.config.openssl_trusted_ca_server = tutils.test_data.path( "data/trusted-cadir/trusted-ca.pem") - self.pathoc() + assert self._request().status_code == 242 def test_no_verification_w_bad_cert(self): self.config.openssl_verification_mode_server = SSL.VERIFY_NONE self.config.openssl_trusted_ca_server = tutils.test_data.path( "data/trusted-cadir/trusted-ca.pem") - self.pathoc() + assert self._request().status_code == 242 def test_verification_w_bad_cert(self): self.config.openssl_verification_mode_server = SSL.VERIFY_PEER self.config.openssl_trusted_ca_server = tutils.test_data.path( "data/trusted-cadir/trusted-ca.pem") - tutils.raises("SSL handshake error", self.pathoc) + assert self._request().status_code == 502 class TestHTTPSNoCommonName(tservers.HTTPProxTest): @@ -469,60 +466,11 @@ class TestSocks5(tservers.SocksModeTest): assert "SOCKS5 mode failure" in f.content -class TestSpoof(tservers.SpoofModeTest): - def test_http(self): - alist = ( - ("localhost", self.server.port), - ("127.0.0.1", self.server.port) - ) - for a in alist: - self.server.clear_log() - p = self.pathoc() - f = p.request("get:/p/304:h'Host'='%s:%s'" % a) - assert self.server.last_log() - assert f.status_code == 304 - l = self.master.state.view[-1] - assert l.server_conn.address - assert l.server_conn.address.host == a[0] - assert l.server_conn.address.port == a[1] - - def test_http_without_host(self): - p = self.pathoc() - f = p.request("get:/p/304:r") - assert f.status_code == 400 - - -class TestSSLSpoof(tservers.SSLSpoofModeTest): - def test_https(self): - alist = ( - ("localhost", self.server.port), - ("127.0.0.1", self.server.port) - ) - for a in alist: - self.server.clear_log() - self.config.mode.sslport = a[1] - p = self.pathoc(sni=a[0]) - f = p.request("get:/p/304") - assert self.server.last_log() - assert f.status_code == 304 - l = self.master.state.view[-1] - assert l.server_conn.address - assert l.server_conn.address.host == a[0] - assert l.server_conn.address.port == a[1] - - def test_https_without_sni(self): - a = ("localhost", self.server.port) - self.config.mode.sslport = a[1] - p = self.pathoc(sni=None) - f = p.request("get:/p/304") - assert f.status_code == 400 - - class TestHttps2Http(tservers.ReverseProxTest): @classmethod def get_proxy_config(cls): d = super(TestHttps2Http, cls).get_proxy_config() - d["upstream_server"][0] = True + d["upstream_server"] = ("http", d["upstream_server"][1]) return d def pathoc(self, ssl, sni=None): @@ -530,7 +478,7 @@ class TestHttps2Http(tservers.ReverseProxTest): Returns a connected Pathoc instance. """ p = pathoc.Pathoc( - ("localhost", self.proxy.port), ssl=ssl, sni=sni, fp=None + ("localhost", self.proxy.port), ssl=True, sni=sni, fp=None ) p.connect() return p @@ -546,7 +494,7 @@ class TestHttps2Http(tservers.ReverseProxTest): def test_http(self): p = self.pathoc(ssl=False) - assert p.request("get:'/p/200'").status_code == 400 + assert p.request("get:'/p/200'").status_code == 200 class TestTransparent(tservers.TransparentProxTest, CommonMixin, TcpMixin): @@ -560,7 +508,7 @@ class TestTransparentSSL(tservers.TransparentProxTest, CommonMixin, TcpMixin): p = pathoc.Pathoc(("localhost", self.proxy.port), fp=None) p.connect() r = p.request("get:/") - assert r.status_code == 400 + assert r.status_code == 502 class TestProxy(tservers.HTTPProxTest): @@ -661,63 +609,65 @@ class MasterRedirectRequest(tservers.TestMaster): redirect_port = None # Set by TestRedirectRequest def handle_request(self, f): - request = f.request - if request.path == "/p/201": - addr = f.live.c.server_conn.address - assert f.live.change_server( - ("127.0.0.1", self.redirect_port), ssl=False) - assert not f.live.change_server( - ("127.0.0.1", self.redirect_port), ssl=False) - tutils.raises( - "SSL handshake error", - f.live.change_server, - ("127.0.0.1", - self.redirect_port), - ssl=True) - assert f.live.change_server(addr, ssl=False) - request.url = "http://127.0.0.1:%s/p/201" % self.redirect_port - tservers.TestMaster.handle_request(self, f) + if f.request.path == "/p/201": + + # This part should have no impact, but it should also not cause any exceptions. + addr = f.live.server_conn.address + addr2 = Address(("127.0.0.1", self.redirect_port)) + f.live.set_server(addr2) + f.live.set_server(addr) + + # This is the actual redirection. + f.request.port = self.redirect_port + super(MasterRedirectRequest, self).handle_request(f) def handle_response(self, f): f.response.content = str(f.client_conn.address.port) f.response.headers[ "server-conn-id"] = [str(f.server_conn.source_address.port)] - tservers.TestMaster.handle_response(self, f) + super(MasterRedirectRequest, self).handle_response(f) class TestRedirectRequest(tservers.HTTPProxTest): masterclass = MasterRedirectRequest + ssl = True def test_redirect(self): + """ + Imagine a single HTTPS connection with three requests: + + 1. First request should pass through unmodified + 2. Second request will be redirected to a different host by an inline script + 3. Third request should pass through unmodified + + This test verifies that the original destination is restored for the third request. + """ self.master.redirect_port = self.server2.port p = self.pathoc() self.server.clear_log() self.server2.clear_log() - r1 = p.request("get:'%s/p/200'" % self.server.urlbase) + r1 = p.request("get:'/p/200'") assert r1.status_code == 200 assert self.server.last_log() assert not self.server2.last_log() self.server.clear_log() self.server2.clear_log() - r2 = p.request("get:'%s/p/201'" % self.server.urlbase) + r2 = p.request("get:'/p/201'") assert r2.status_code == 201 assert not self.server.last_log() assert self.server2.last_log() self.server.clear_log() self.server2.clear_log() - r3 = p.request("get:'%s/p/202'" % self.server.urlbase) + r3 = p.request("get:'/p/202'") assert r3.status_code == 202 assert self.server.last_log() assert not self.server2.last_log() assert r1.content == r2.content == r3.content - assert r1.headers.get_first( - "server-conn-id") == r3.headers.get_first("server-conn-id") - # Make sure that we actually use the same connection in this test case class MasterStreamRequest(tservers.TestMaster): @@ -774,9 +724,9 @@ class TestStreamRequest(tservers.HTTPProxTest): assert resp.headers["Transfer-Encoding"][0] == 'chunked' assert resp.status_code == 200 - chunks = list( - content for _, content, _ in protocol.read_http_body_chunked( - resp.headers, None, "GET", 200, False)) + chunks = list(protocol.read_http_body_chunked( + resp.headers, None, "GET", 200, False + )) assert chunks == ["this", "isatest", ""] connection.close() @@ -784,7 +734,7 @@ class TestStreamRequest(tservers.HTTPProxTest): class MasterFakeResponse(tservers.TestMaster): def handle_request(self, f): - resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp()) + resp = HTTPResponse.wrap(netlib.tutils.tresp()) f.reply(resp) @@ -809,7 +759,7 @@ class TestServerConnect(tservers.HTTPProxTest): class MasterKillRequest(tservers.TestMaster): def handle_request(self, f): - f.reply(KILL) + f.reply(Kill) class TestKillRequest(tservers.HTTPProxTest): @@ -823,7 +773,7 @@ class TestKillRequest(tservers.HTTPProxTest): class MasterKillResponse(tservers.TestMaster): def handle_response(self, f): - f.reply(KILL) + f.reply(Kill) class TestKillResponse(tservers.HTTPProxTest): @@ -849,7 +799,7 @@ class TestTransparentResolveError(tservers.TransparentProxTest): class MasterIncomplete(tservers.TestMaster): def handle_request(self, f): - resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp()) + resp = HTTPResponse.wrap(netlib.tutils.tresp()) resp.content = CONTENT_MISSING f.reply(resp) @@ -988,7 +938,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest): if not (k[0] in exclude): f.client_conn.finish() f.error = Error("terminated") - f.reply(KILL) + f.reply(Kill) return _func(f) setattr(master, attr, handler) @@ -1009,6 +959,9 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest): p = self.pathoc() req = p.request("get:'/p/418:b\"content\"'") + assert req.content == "content" + assert req.status_code == 418 + assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT and request # CONNECT, failing request, assert self.chain[0].tmaster.state.flow_count() == 4 @@ -1017,8 +970,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest): assert self.chain[1].tmaster.state.flow_count() == 2 # (doesn't store (repeated) CONNECTs from chain[0] # as it is a regular proxy) - assert req.content == "content" - assert req.status_code == 418 + assert not self.chain[1].tmaster.state.flows[0].response # killed assert self.chain[1].tmaster.state.flows[1].response |