From 33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Mon, 15 Feb 2016 14:58:46 +0100 Subject: move mitmproxy --- test/test_server.py | 1001 --------------------------------------------------- 1 file changed, 1001 deletions(-) delete mode 100644 test/test_server.py (limited to 'test/test_server.py') diff --git a/test/test_server.py b/test/test_server.py deleted file mode 100644 index 1b7e6966..00000000 --- a/test/test_server.py +++ /dev/null @@ -1,1001 +0,0 @@ -import os -import socket -import time -from OpenSSL import SSL -from netlib.exceptions import HttpReadDisconnect, HttpException -from netlib.tcp import Address - -import netlib.tutils -from netlib import tcp, http, socks -from netlib.certutils import SSLCert -from netlib.http import authentication, CONTENT_MISSING, http1 -from netlib.tutils import raises -from libpathod import pathoc, pathod - -from libmproxy.proxy.config import HostMatcher -from libmproxy.protocol import Kill -from libmproxy.models import Error, HTTPResponse - -from . import tutils, tservers - -""" - Note that the choice of response code in these tests matters more than you - might think. libcurl treats a 304 response code differently from, say, a - 200 response code - it will correctly terminate a 304 response with no - content-length header, whereas it will block forever waiting for content - for a 200 response. -""" - - -class CommonMixin: - - def test_large(self): - assert len(self.pathod("200:b@50k").content) == 1024 * 50 - - @staticmethod - def wait_until_not_live(flow): - """ - Race condition: We don't want to replay the flow while it is still live. - """ - s = time.time() - while flow.live: - time.sleep(0.001) - if time.time() - s > 5: - raise RuntimeError("Flow is live for too long.") - - def test_replay(self): - assert self.pathod("304").status_code == 304 - if isinstance(self, tservers.HTTPUpstreamProxTest) and self.ssl: - assert len(self.master.state.view) == 2 - else: - assert len(self.master.state.view) == 1 - l = self.master.state.view[-1] - assert l.response.status_code == 304 - l.request.path = "/p/305" - self.wait_until_not_live(l) - rt = self.master.replay_request(l, block=True) - assert l.response.status_code == 305 - - # Disconnect error - l.request.path = "/p/305:d0" - rt = self.master.replay_request(l, block=True) - assert not rt - if isinstance(self, tservers.HTTPUpstreamProxTest): - assert l.response.status_code == 502 - else: - assert l.error - - # Port error - l.request.port = 1 - # In upstream mode, we get a 502 response from the upstream proxy server. - # In upstream mode with ssl, the replay will fail as we cannot establish - # SSL with the upstream proxy. - rt = self.master.replay_request(l, block=True) - assert not rt - if isinstance(self, tservers.HTTPUpstreamProxTest): - assert l.response.status_code == 502 - else: - assert l.error - - def test_http(self): - f = self.pathod("304") - assert f.status_code == 304 - - # In Upstream mode with SSL, we may already have a previous CONNECT - # request. - l = self.master.state.view[-1] - assert l.client_conn.address - assert "host" in l.request.headers - assert l.response.status_code == 304 - - def test_invalid_http(self): - t = tcp.TCPClient(("127.0.0.1", self.proxy.port)) - t.connect() - t.wfile.write("invalid\r\n\r\n") - t.wfile.flush() - line = t.rfile.readline() - assert ("Bad Request" in line) or ("Bad Gateway" in line) - - def test_sni(self): - if not self.ssl: - return - - f = self.pathod("304", sni="testserver.com") - assert f.status_code == 304 - log = self.server.last_log() - assert log["request"]["sni"] == "testserver.com" - - -class TcpMixin: - - def _ignore_on(self): - assert not hasattr(self, "_ignore_backup") - self._ignore_backup = self.config.check_ignore - self.config.check_ignore = HostMatcher( - [".+:%s" % self.server.port] + self.config.check_ignore.patterns) - - def _ignore_off(self): - assert hasattr(self, "_ignore_backup") - self.config.check_ignore = self._ignore_backup - del self._ignore_backup - - def test_ignore(self): - n = self.pathod("304") - self._ignore_on() - i = self.pathod("305") - i2 = self.pathod("306") - self._ignore_off() - - self.master.masterq.join() - - assert n.status_code == 304 - assert i.status_code == 305 - assert i2.status_code == 306 - assert any(f.response.status_code == 304 for f in self.master.state.flows) - assert not any(f.response.status_code == 305 for f in self.master.state.flows) - assert not any(f.response.status_code == 306 for f in self.master.state.flows) - - # Test that we get the original SSL cert - if self.ssl: - i_cert = SSLCert(i.sslinfo.certchain[0]) - i2_cert = SSLCert(i2.sslinfo.certchain[0]) - n_cert = SSLCert(n.sslinfo.certchain[0]) - - assert i_cert == i2_cert - assert i_cert != n_cert - - # Test Non-HTTP traffic - spec = "200:i0,@100:d0" # this results in just 100 random bytes - # mitmproxy responds with bad gateway - assert self.pathod(spec).status_code == 502 - self._ignore_on() - with raises(HttpException): - self.pathod(spec) # pathoc tries to parse answer as HTTP - - self._ignore_off() - - def _tcpproxy_on(self): - assert not hasattr(self, "_tcpproxy_backup") - self._tcpproxy_backup = self.config.check_tcp - self.config.check_tcp = HostMatcher( - [".+:%s" % self.server.port] + self.config.check_tcp.patterns) - - def _tcpproxy_off(self): - assert hasattr(self, "_tcpproxy_backup") - self.config.check_tcp = self._tcpproxy_backup - del self._tcpproxy_backup - - def test_tcp(self): - n = self.pathod("304") - self._tcpproxy_on() - i = self.pathod("305") - i2 = self.pathod("306") - self._tcpproxy_off() - - self.master.masterq.join() - - assert n.status_code == 304 - assert i.status_code == 305 - assert i2.status_code == 306 - assert any(f.response.status_code == 304 for f in self.master.state.flows) - assert not any(f.response.status_code == 305 for f in self.master.state.flows) - assert not any(f.response.status_code == 306 for f in self.master.state.flows) - - # Test that we get the original SSL cert - if self.ssl: - i_cert = SSLCert(i.sslinfo.certchain[0]) - i2_cert = SSLCert(i2.sslinfo.certchain[0]) - n_cert = SSLCert(n.sslinfo.certchain[0]) - - assert i_cert == i2_cert == n_cert - - # Make sure that TCP messages are in the event log. - assert any("305" in m for m in self.master.log) - assert any("306" in m for m in self.master.log) - - -class AppMixin: - - def test_app(self): - ret = self.app("/") - assert ret.status_code == 200 - assert "mitmproxy" in ret.content - - -class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin): - - def test_app_err(self): - p = self.pathoc() - ret = p.request("get:'http://errapp/'") - assert ret.status_code == 500 - assert "ValueError" in ret.content - - def test_invalid_connect(self): - t = tcp.TCPClient(("127.0.0.1", self.proxy.port)) - t.connect() - t.wfile.write("CONNECT invalid\n\n") - t.wfile.flush() - assert "Bad Request" in t.rfile.readline() - - def test_upstream_ssl_error(self): - p = self.pathoc() - ret = p.request("get:'https://localhost:%s/'" % self.server.port) - assert ret.status_code == 400 - - def test_connection_close(self): - # Add a body, so we have a content-length header, which combined with - # HTTP1.1 means the connection is kept alive. - response = '%s/p/200:b@1' % self.server.urlbase - - # Lets sanity check that the connection does indeed stay open by - # issuing two requests over the same connection - p = self.pathoc() - assert p.request("get:'%s'" % response) - assert p.request("get:'%s'" % response) - - # Now check that the connection is closed as the client specifies - p = self.pathoc() - assert p.request("get:'%s':h'Connection'='close'" % response) - # There's a race here, which means we can get any of a number of errors. - # Rather than introduce yet another sleep into the test suite, we just - # relax the Exception specification. - with raises(Exception): - p.request("get:'%s'" % response) - - def test_reconnect(self): - req = "get:'%s/p/200:b@1:da'" % self.server.urlbase - p = self.pathoc() - assert p.request(req) - # Server has disconnected. Mitmproxy should detect this, and reconnect. - assert p.request(req) - assert p.request(req) - - def test_get_connection_switching(self): - def switched(l): - for i in l: - if "serverdisconnect" in i: - return True - - req = "get:'%s/p/200:b@1'" - p = self.pathoc() - assert p.request(req % self.server.urlbase) - assert p.request(req % self.server2.urlbase) - assert switched(self.proxy.log) - - def test_blank_leading_line(self): - p = self.pathoc() - req = "get:'%s/p/201':i0,'\r\n'" - assert p.request(req % self.server.urlbase).status_code == 201 - - def test_invalid_headers(self): - p = self.pathoc() - resp = p.request("get:'http://foo':h':foo'='bar'") - assert resp.status_code == 400 - - def test_stream(self): - self.master.set_stream_large_bodies(1024 * 2) - - self.pathod("200:b@1k") - assert not self.master.state.view[-1].response.stream - assert len(self.master.state.view[-1].response.content) == 1024 * 1 - - self.pathod("200:b@3k") - assert self.master.state.view[-1].response.stream - assert self.master.state.view[-1].response.content == CONTENT_MISSING - self.master.set_stream_large_bodies(None) - - def test_stream_modify(self): - self.master.load_script( - tutils.test_data.path("scripts/stream_modify.py")) - d = self.pathod('200:b"foo"') - assert d.content == "bar" - self.master.unload_scripts() - - -class TestHTTPAuth(tservers.HTTPProxTest): - authenticator = http.authentication.BasicProxyAuth( - http.authentication.PassManSingleUser( - "test", - "test"), - "realm") - - def test_auth(self): - assert self.pathod("202").status_code == 407 - p = self.pathoc() - ret = p.request(""" - get - 'http://localhost:%s/p/202' - h'%s'='%s' - """ % ( - self.server.port, - http.authentication.BasicProxyAuth.AUTH_HEADER, - authentication.assemble_http_basic_auth("basic", "test", "test") - )) - assert ret.status_code == 202 - - -class TestHTTPS(tservers.HTTPProxTest, CommonMixin, TcpMixin): - ssl = True - ssloptions = pathod.SSLOptions(request_client_cert=True) - - def test_clientcert_file(self): - try: - self.config.clientcerts = os.path.join( - tutils.test_data.path("data/clientcert"), "client.pem") - f = self.pathod("304") - assert f.status_code == 304 - assert self.server.last_log()["request"]["clientcert"]["keyinfo"] - finally: - self.config.clientcerts = None - - def test_clientcert_dir(self): - try: - self.config.clientcerts = tutils.test_data.path("data/clientcert") - f = self.pathod("304") - assert f.status_code == 304 - assert self.server.last_log()["request"]["clientcert"]["keyinfo"] - finally: - self.config.clientcerts = None - - def test_error_post_connect(self): - p = self.pathoc() - assert p.request("get:/:i0,'invalid\r\n\r\n'").status_code == 400 - - -class TestHTTPSCertfile(tservers.HTTPProxTest, CommonMixin): - ssl = True - certfile = True - - def test_certfile(self): - assert self.pathod("304") - - -class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxTest): - - """ - Test upstream server certificate verification with a trusted server cert. - """ - ssl = True - ssloptions = pathod.SSLOptions( - cn="trusted-cert", - certs=[ - ("trusted-cert", tutils.test_data.path("data/trusted-server.crt")) - ]) - - def test_verification_w_cadir(self): - self.config.openssl_verification_mode_server = SSL.VERIFY_PEER - self.config.openssl_trusted_cadir_server = tutils.test_data.path( - "data/trusted-cadir/") - - self.pathoc() - - def test_verification_w_pemfile(self): - self.config.openssl_verification_mode_server = SSL.VERIFY_PEER - self.config.openssl_trusted_ca_server = tutils.test_data.path( - "data/trusted-cadir/trusted-ca.pem") - - self.pathoc() - - -class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxTest): - - """ - Test upstream server certificate verification with an untrusted server cert. - """ - ssl = True - ssloptions = pathod.SSLOptions( - cn="untrusted-cert", - certs=[ - ("untrusted-cert", tutils.test_data.path("data/untrusted-server.crt")) - ]) - - def _request(self): - p = self.pathoc() - # We need to make an actual request because the upstream connection is lazy-loaded. - return p.request("get:/p/242") - - def test_default_verification_w_bad_cert(self): - """Should use no verification.""" - self.config.openssl_trusted_ca_server = tutils.test_data.path( - "data/trusted-cadir/trusted-ca.pem") - - assert self._request().status_code == 242 - - def test_no_verification_w_bad_cert(self): - self.config.openssl_verification_mode_server = SSL.VERIFY_NONE - self.config.openssl_trusted_ca_server = tutils.test_data.path( - "data/trusted-cadir/trusted-ca.pem") - - assert self._request().status_code == 242 - - def test_verification_w_bad_cert(self): - self.config.openssl_verification_mode_server = SSL.VERIFY_PEER - self.config.openssl_trusted_ca_server = tutils.test_data.path( - "data/trusted-cadir/trusted-ca.pem") - - assert self._request().status_code == 502 - - -class TestHTTPSNoCommonName(tservers.HTTPProxTest): - - """ - Test what happens if we get a cert without common name back. - """ - ssl = True - ssloptions = pathod.SSLOptions( - certs=[ - ("*", tutils.test_data.path("data/no_common_name.pem")) - ] - ) - - def test_http(self): - f = self.pathod("202") - assert f.sslinfo.certchain[0].get_subject().CN == "127.0.0.1" - - -class TestReverse(tservers.ReverseProxTest, CommonMixin, TcpMixin): - reverse = True - - -class TestSocks5(tservers.SocksModeTest): - - def test_simple(self): - p = self.pathoc() - p.socks_connect(("localhost", self.server.port)) - f = p.request("get:/p/200") - assert f.status_code == 200 - - def test_with_authentication_only(self): - p = self.pathoc() - f = p.request("get:/p/200") - assert f.status_code == 502 - assert "SOCKS5 mode failure" in f.content - - def test_no_connect(self): - """ - mitmproxy doesn't support UDP or BIND SOCKS CMDs - """ - p = self.pathoc() - - socks.ClientGreeting( - socks.VERSION.SOCKS5, - [socks.METHOD.NO_AUTHENTICATION_REQUIRED] - ).to_file(p.wfile) - socks.Message( - socks.VERSION.SOCKS5, - socks.CMD.BIND, - socks.ATYP.DOMAINNAME, - ("example.com", 8080) - ).to_file(p.wfile) - - p.wfile.flush() - p.rfile.read(2) # read server greeting - f = p.request("get:/p/200") # the request doesn't matter, error response from handshake will be read anyway. - assert f.status_code == 502 - assert "SOCKS5 mode failure" in f.content - - -class TestHttps2Http(tservers.ReverseProxTest): - - @classmethod - def get_proxy_config(cls): - d = super(TestHttps2Http, cls).get_proxy_config() - d["upstream_server"] = ("http", d["upstream_server"][1]) - return d - - def pathoc(self, ssl, sni=None): - """ - Returns a connected Pathoc instance. - """ - p = pathoc.Pathoc( - ("localhost", self.proxy.port), ssl=True, sni=sni, fp=None - ) - p.connect() - return p - - def test_all(self): - p = self.pathoc(ssl=True) - assert p.request("get:'/p/200'").status_code == 200 - - def test_sni(self): - p = self.pathoc(ssl=True, sni="example.com") - assert p.request("get:'/p/200'").status_code == 200 - assert all("Error in handle_sni" not in msg for msg in self.proxy.log) - - def test_http(self): - p = self.pathoc(ssl=False) - assert p.request("get:'/p/200'").status_code == 200 - - -class TestTransparent(tservers.TransparentProxTest, CommonMixin, TcpMixin): - ssl = False - - def test_tcp_stream_modify(self): - self.master.load_script( - tutils.test_data.path("scripts/tcp_stream_modify.py")) - - self._tcpproxy_on() - d = self.pathod('200:b"foo"') - self._tcpproxy_off() - - assert d.content == "bar" - - self.master.unload_scripts() - - -class TestTransparentSSL(tservers.TransparentProxTest, CommonMixin, TcpMixin): - ssl = True - - def test_sslerr(self): - p = pathoc.Pathoc(("localhost", self.proxy.port), fp=None) - p.connect() - r = p.request("get:/") - assert r.status_code == 502 - - -class TestProxy(tservers.HTTPProxTest): - - def test_http(self): - f = self.pathod("304") - assert f.status_code == 304 - - f = self.master.state.view[0] - assert f.client_conn.address - assert "host" in f.request.headers - assert f.response.status_code == 304 - - @tutils.skip_appveyor - def test_response_timestamps(self): - # test that we notice at least 1 sec delay between timestamps - # in response object - f = self.pathod("304:b@1k:p50,1") - assert f.status_code == 304 - - response = self.master.state.view[0].response - # timestamp_start might fire a bit late, so we play safe and only require 300ms. - assert 0.3 <= response.timestamp_end - response.timestamp_start - - @tutils.skip_appveyor - def test_request_timestamps(self): - # test that we notice a delay between timestamps in request object - connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - connection.connect(("127.0.0.1", self.proxy.port)) - - # call pathod server, wait a second to complete the request - connection.send( - "GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n" % - self.server.port) - time.sleep(1) - connection.send("\r\n") - connection.recv(50000) - connection.close() - - request, response = self.master.state.view[ - 0].request, self.master.state.view[0].response - assert response.status_code == 304 # sanity test for our low level request - # timestamp_start might fire a bit late, so we play safe and only require 300ms. - assert 0.3 <= request.timestamp_end - request.timestamp_start - - def test_request_tcp_setup_timestamp_presence(self): - # tests that the client_conn a tcp connection has a tcp_setup_timestamp - connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - connection.connect(("localhost", self.proxy.port)) - connection.send( - "GET http://localhost:%d/p/200:b@1k HTTP/1.1\r\n" % - self.server.port) - connection.send("\r\n") - # a bit hacky: make sure that we don't just read the headers only. - recvd = 0 - while recvd < 1024: - recvd += len(connection.recv(5000)) - connection.send( - "GET http://localhost:%d/p/200:b@1k HTTP/1.1\r\n" % - self.server.port) - connection.send("\r\n") - recvd = 0 - while recvd < 1024: - recvd += len(connection.recv(5000)) - connection.close() - - first_flow = self.master.state.view[0] - second_flow = self.master.state.view[1] - assert first_flow.server_conn.timestamp_tcp_setup - assert first_flow.server_conn.timestamp_ssl_setup is None - assert second_flow.server_conn.timestamp_tcp_setup - assert first_flow.server_conn.timestamp_tcp_setup == second_flow.server_conn.timestamp_tcp_setup - - def test_request_ip(self): - f = self.pathod("200:b@100") - assert f.status_code == 200 - f = self.master.state.view[0] - assert f.server_conn.address == ("127.0.0.1", self.server.port) - - -class TestProxySSL(tservers.HTTPProxTest): - ssl = True - - def test_request_ssl_setup_timestamp_presence(self): - # tests that the ssl timestamp is present when ssl is used - f = self.pathod("304:b@10k") - assert f.status_code == 304 - first_flow = self.master.state.view[0] - assert first_flow.server_conn.timestamp_ssl_setup - - -class MasterRedirectRequest(tservers.TestMaster): - redirect_port = None # Set by TestRedirectRequest - - def handle_request(self, f): - if f.request.path == "/p/201": - - # This part should have no impact, but it should also not cause any exceptions. - addr = f.live.server_conn.address - addr2 = Address(("127.0.0.1", self.redirect_port)) - f.live.set_server(addr2) - f.live.set_server(addr) - - # This is the actual redirection. - f.request.port = self.redirect_port - super(MasterRedirectRequest, self).handle_request(f) - - def handle_response(self, f): - f.response.content = str(f.client_conn.address.port) - f.response.headers["server-conn-id"] = str(f.server_conn.source_address.port) - super(MasterRedirectRequest, self).handle_response(f) - - -class TestRedirectRequest(tservers.HTTPProxTest): - masterclass = MasterRedirectRequest - ssl = True - - def test_redirect(self): - """ - Imagine a single HTTPS connection with three requests: - - 1. First request should pass through unmodified - 2. Second request will be redirected to a different host by an inline script - 3. Third request should pass through unmodified - - This test verifies that the original destination is restored for the third request. - """ - self.master.redirect_port = self.server2.port - - p = self.pathoc() - - self.server.clear_log() - self.server2.clear_log() - r1 = p.request("get:'/p/200'") - assert r1.status_code == 200 - assert self.server.last_log() - assert not self.server2.last_log() - - self.server.clear_log() - self.server2.clear_log() - r2 = p.request("get:'/p/201'") - assert r2.status_code == 201 - assert not self.server.last_log() - assert self.server2.last_log() - - self.server.clear_log() - self.server2.clear_log() - r3 = p.request("get:'/p/202'") - assert r3.status_code == 202 - assert self.server.last_log() - assert not self.server2.last_log() - - assert r1.content == r2.content == r3.content - - -class MasterStreamRequest(tservers.TestMaster): - - """ - Enables the stream flag on the flow for all requests - """ - - def handle_responseheaders(self, f): - f.response.stream = True - f.reply() - - -class TestStreamRequest(tservers.HTTPProxTest): - masterclass = MasterStreamRequest - - def test_stream_simple(self): - p = self.pathoc() - - # a request with 100k of data but without content-length - r1 = p.request("get:'%s/p/200:r:b@100k:d102400'" % self.server.urlbase) - assert r1.status_code == 200 - assert len(r1.content) > 100000 - - def test_stream_multiple(self): - p = self.pathoc() - - # simple request with streaming turned on - r1 = p.request("get:'%s/p/200'" % self.server.urlbase) - assert r1.status_code == 200 - - # now send back 100k of data, streamed but not chunked - r1 = p.request("get:'%s/p/201:b@100k'" % self.server.urlbase) - assert r1.status_code == 201 - - def test_stream_chunked(self): - connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - connection.connect(("127.0.0.1", self.proxy.port)) - fconn = connection.makefile() - spec = '200:h"Transfer-Encoding"="chunked":r:b"4\\r\\nthis\\r\\n11\\r\\nisatest__reachhex\\r\\n0\\r\\n\\r\\n"' - connection.send( - "GET %s/p/%s HTTP/1.1\r\n" % - (self.server.urlbase, spec)) - connection.send("\r\n") - - resp = http1.read_response_head(fconn) - - assert resp.headers["Transfer-Encoding"] == 'chunked' - assert resp.status_code == 200 - - chunks = list(http1.read_body(fconn, None)) - assert chunks == ["this", "isatest__reachhex"] - - connection.close() - - -class MasterFakeResponse(tservers.TestMaster): - - def handle_request(self, f): - resp = HTTPResponse.wrap(netlib.tutils.tresp()) - f.reply(resp) - - -class TestFakeResponse(tservers.HTTPProxTest): - masterclass = MasterFakeResponse - - def test_fake(self): - f = self.pathod("200") - assert "header-response" in f.headers - - -class TestServerConnect(tservers.HTTPProxTest): - masterclass = MasterFakeResponse - no_upstream_cert = True - ssl = True - - def test_unnecessary_serverconnect(self): - """A replayed/fake response with no_upstream_cert should not connect to an upstream server""" - assert self.pathod("200").status_code == 200 - for msg in self.proxy.tmaster.log: - assert "serverconnect" not in msg - - -class MasterKillRequest(tservers.TestMaster): - - def handle_request(self, f): - f.reply(Kill) - - -class TestKillRequest(tservers.HTTPProxTest): - masterclass = MasterKillRequest - - def test_kill(self): - with raises(HttpReadDisconnect): - self.pathod("200") - # Nothing should have hit the server - assert not self.server.last_log() - - -class MasterKillResponse(tservers.TestMaster): - - def handle_response(self, f): - f.reply(Kill) - - -class TestKillResponse(tservers.HTTPProxTest): - masterclass = MasterKillResponse - - def test_kill(self): - with raises(HttpReadDisconnect): - self.pathod("200") - # The server should have seen a request - assert self.server.last_log() - - -class EResolver(tservers.TResolver): - - def original_addr(self, sock): - raise RuntimeError("Could not resolve original destination.") - - -class TestTransparentResolveError(tservers.TransparentProxTest): - resolver = EResolver - - def test_resolve_error(self): - assert self.pathod("304").status_code == 502 - - -class MasterIncomplete(tservers.TestMaster): - - def handle_request(self, f): - resp = HTTPResponse.wrap(netlib.tutils.tresp()) - resp.content = CONTENT_MISSING - f.reply(resp) - - -class TestIncompleteResponse(tservers.HTTPProxTest): - masterclass = MasterIncomplete - - def test_incomplete(self): - assert self.pathod("200").status_code == 502 - - -class TestUpstreamProxy(tservers.HTTPUpstreamProxTest, CommonMixin, AppMixin): - ssl = False - - def test_order(self): - self.proxy.tmaster.replacehooks.add( - "~q", - "foo", - "bar") # replace in request - self.chain[0].tmaster.replacehooks.add("~q", "bar", "baz") - self.chain[1].tmaster.replacehooks.add("~q", "foo", "oh noes!") - self.chain[0].tmaster.replacehooks.add( - "~s", - "baz", - "ORLY") # replace in response - - p = self.pathoc() - req = p.request("get:'%s/p/418:b\"foo\"'" % self.server.urlbase) - assert req.content == "ORLY" - assert req.status_code == 418 - - -class TestUpstreamProxySSL( - tservers.HTTPUpstreamProxTest, - CommonMixin, - TcpMixin): - ssl = True - - def _host_pattern_on(self, attr): - """ - Updates config.check_tcp or check_ignore, depending on attr. - """ - assert not hasattr(self, "_ignore_%s_backup" % attr) - backup = [] - for proxy in self.chain: - old_matcher = getattr( - proxy.tmaster.server.config, - "check_%s" % - attr) - backup.append(old_matcher) - setattr( - proxy.tmaster.server.config, - "check_%s" % attr, - HostMatcher([".+:%s" % self.server.port] + old_matcher.patterns) - ) - - setattr(self, "_ignore_%s_backup" % attr, backup) - - def _host_pattern_off(self, attr): - backup = getattr(self, "_ignore_%s_backup" % attr) - for proxy in reversed(self.chain): - setattr( - proxy.tmaster.server.config, - "check_%s" % attr, - backup.pop() - ) - - assert not backup - delattr(self, "_ignore_%s_backup" % attr) - - def _ignore_on(self): - super(TestUpstreamProxySSL, self)._ignore_on() - self._host_pattern_on("ignore") - - def _ignore_off(self): - super(TestUpstreamProxySSL, self)._ignore_off() - self._host_pattern_off("ignore") - - def _tcpproxy_on(self): - super(TestUpstreamProxySSL, self)._tcpproxy_on() - self._host_pattern_on("tcp") - - def _tcpproxy_off(self): - super(TestUpstreamProxySSL, self)._tcpproxy_off() - self._host_pattern_off("tcp") - - def test_simple(self): - p = self.pathoc() - req = p.request("get:'/p/418:b\"content\"'") - assert req.content == "content" - assert req.status_code == 418 - - # CONNECT from pathoc to chain[0], - assert self.proxy.tmaster.state.flow_count() == 2 - # request from pathoc to chain[0] - # CONNECT from proxy to chain[1], - assert self.chain[0].tmaster.state.flow_count() == 2 - # request from proxy to chain[1] - # request from chain[0] (regular proxy doesn't store CONNECTs) - assert self.chain[1].tmaster.state.flow_count() == 1 - - -class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest): - ssl = True - - def test_reconnect(self): - """ - Tests proper functionality of ConnectionHandler.server_reconnect mock. - If we have a disconnect on a secure connection that's transparently proxified to - an upstream http proxy, we need to send the CONNECT request again. - """ - - def kill_requests(master, attr, exclude): - k = [0] # variable scope workaround: put into array - _func = getattr(master, attr) - - def handler(f): - k[0] += 1 - if not (k[0] in exclude): - f.client_conn.finish() - f.error = Error("terminated") - f.reply(Kill) - return _func(f) - - setattr(master, attr, handler) - - kill_requests(self.chain[1].tmaster, "handle_request", - exclude=[ - # fail first request - 2, # allow second request - ]) - - kill_requests(self.chain[0].tmaster, "handle_request", - exclude=[ - 1, # CONNECT - # fail first request - 3, # reCONNECT - 4, # request - ]) - - p = self.pathoc() - req = p.request("get:'/p/418:b\"content\"'") - assert req.content == "content" - assert req.status_code == 418 - - assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT and request - # CONNECT, failing request, - assert self.chain[0].tmaster.state.flow_count() == 4 - # reCONNECT, request - # failing request, request - assert self.chain[1].tmaster.state.flow_count() == 2 - # (doesn't store (repeated) CONNECTs from chain[0] - # as it is a regular proxy) - - assert not self.chain[1].tmaster.state.flows[0].response # killed - assert self.chain[1].tmaster.state.flows[1].response - - assert self.proxy.tmaster.state.flows[0].request.form_in == "authority" - assert self.proxy.tmaster.state.flows[1].request.form_in == "relative" - - assert self.chain[0].tmaster.state.flows[ - 0].request.form_in == "authority" - assert self.chain[0].tmaster.state.flows[ - 1].request.form_in == "relative" - assert self.chain[0].tmaster.state.flows[ - 2].request.form_in == "authority" - assert self.chain[0].tmaster.state.flows[ - 3].request.form_in == "relative" - - assert self.chain[1].tmaster.state.flows[ - 0].request.form_in == "relative" - assert self.chain[1].tmaster.state.flows[ - 1].request.form_in == "relative" - - req = p.request("get:'/p/418:b\"content2\"'") - - assert req.status_code == 502 - assert self.proxy.tmaster.state.flow_count() == 3 # + new request - # + new request, repeated CONNECT from chain[1] - assert self.chain[0].tmaster.state.flow_count() == 6 - # (both terminated) - # nothing happened here - assert self.chain[1].tmaster.state.flow_count() == 2 -- cgit v1.2.3