diff options
Diffstat (limited to 'test/test_protocol_http.py')
-rw-r--r-- | test/test_protocol_http.py | 170 |
1 files changed, 45 insertions, 125 deletions
diff --git a/test/test_protocol_http.py b/test/test_protocol_http.py index 3b922c06..ea6cf3fd 100644 --- a/test/test_protocol_http.py +++ b/test/test_protocol_http.py @@ -1,5 +1,4 @@ from libmproxy.protocol.http import * -from libmproxy.protocol import KILL from cStringIO import StringIO import tutils, tservers @@ -26,42 +25,66 @@ def test_stripped_chunked_encoding_no_content(): class TestHTTPRequest: def test_asterisk_form(self): s = StringIO("OPTIONS * HTTP/1.1") - f = tutils.tflow_noreq() + f = tutils.tflow(req=None) f.request = HTTPRequest.from_stream(s) assert f.request.form_in == "relative" - x = f.request._assemble() - assert f.request._assemble() == "OPTIONS * HTTP/1.1\r\nHost: address:22\r\n\r\n" + f.request.host = f.server_conn.address.host + f.request.port = f.server_conn.address.port + f.request.scheme = "http" + assert f.request.assemble() == "OPTIONS * HTTP/1.1\r\nHost: address:22\r\n\r\n" def test_origin_form(self): s = StringIO("GET /foo\xff HTTP/1.1") tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s) + s = StringIO("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c") + r = HTTPRequest.from_stream(s) + assert r.headers["Upgrade"] == ["h2c"] + + raw = r._assemble_headers() + assert "Upgrade" not in raw + assert "Host" not in raw + + r.url = "http://example.com/foo" + + raw = r._assemble_headers() + assert "Host" in raw + assert not "Host" in r.headers + r.update_host_header() + assert "Host" in r.headers + def test_authority_form(self): s = StringIO("CONNECT oops-no-port.com HTTP/1.1") tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s) s = StringIO("CONNECT address:22 HTTP/1.1") r = HTTPRequest.from_stream(s) - assert r._assemble() == "CONNECT address:22 HTTP/1.1\r\nHost: address:22\r\n\r\n" + r.scheme, r.host, r.port = "http", "address", 22 + assert r.assemble() == "CONNECT address:22 HTTP/1.1\r\nHost: address:22\r\n\r\n" + assert r.pretty_url(False) == "address:22" def test_absolute_form(self): s = StringIO("GET oops-no-protocol.com HTTP/1.1") tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s) s = StringIO("GET http://address:22/ HTTP/1.1") r = HTTPRequest.from_stream(s) - assert r._assemble() == "GET http://address:22/ HTTP/1.1\r\nHost: address:22\r\n\r\n" + assert r.assemble() == "GET http://address:22/ HTTP/1.1\r\nHost: address:22\r\n\r\n" def test_assemble_unknown_form(self): r = tutils.treq() - tutils.raises("Invalid request form", r._assemble, "antiauthority") + tutils.raises("Invalid request form", r.assemble, "antiauthority") def test_set_url(self): r = tutils.treq_absolute() - r.set_url("https://otheraddress:42/ORLY") + r.url = "https://otheraddress:42/ORLY" assert r.scheme == "https" assert r.host == "otheraddress" assert r.port == 42 assert r.path == "/ORLY" + def test_repr(self): + r = tutils.treq() + assert repr(r) + class TestHTTPResponse: def test_read_from_stringio(self): @@ -83,6 +106,19 @@ class TestHTTPResponse: assert r.content == "" tutils.raises("Invalid server response: 'content", HTTPResponse.from_stream, s, "GET") + def test_repr(self): + r = tutils.tresp() + assert "unknown content type" in repr(r) + r.headers["content-type"] = ["foo"] + assert "foo" in repr(r) + assert repr(tutils.tresp(content=CONTENT_MISSING)) + + +class TestHTTPFlow(object): + def test_repr(self): + f = tutils.tflow(resp=True, err=True) + assert repr(f) + class TestInvalidRequests(tservers.HTTPProxTest): ssl = True @@ -97,120 +133,4 @@ class TestInvalidRequests(tservers.HTTPProxTest): p.connect() r = p.request("get:/p/200") assert r.status_code == 400 - assert "Invalid HTTP request form" in r.content - - -class TestProxyChaining(tservers.HTTPChainProxyTest): - def test_all(self): - self.chain[1].tmaster.replacehooks.add("~q", "foo", "bar") # replace in request - self.chain[0].tmaster.replacehooks.add("~q", "foo", "oh noes!") - self.proxy.tmaster.replacehooks.add("~q", "bar", "baz") - self.chain[0].tmaster.replacehooks.add("~s", "baz", "ORLY") # replace in response - - p = self.pathoc() - req = p.request("get:'%s/p/418:b\"foo\"'" % self.server.urlbase) - assert req.content == "ORLY" - assert req.status_code == 418 - -class TestProxyChainingSSL(tservers.HTTPChainProxyTest): - ssl = True - def test_simple(self): - p = self.pathoc() - req = p.request("get:'/p/418:b\"content\"'") - assert req.content == "content" - assert req.status_code == 418 - - assert self.chain[1].tmaster.state.flow_count() == 2 # CONNECT from pathoc to chain[0], - # request from pathoc to chain[0] - assert self.chain[0].tmaster.state.flow_count() == 2 # CONNECT from chain[1] to proxy, - # request from chain[1] to proxy - assert self.proxy.tmaster.state.flow_count() == 1 # request from chain[0] (regular proxy doesn't store CONNECTs) - - def test_closing_connect_response(self): - """ - https://github.com/mitmproxy/mitmproxy/issues/313 - """ - def handle_request(r): - r.httpversion = (1,0) - del r.headers["Content-Length"] - r.reply() - _handle_request = self.chain[0].tmaster.handle_request - self.chain[0].tmaster.handle_request = handle_request - try: - assert self.pathoc().request("get:/p/418").status_code == 418 - finally: - self.chain[0].tmaster.handle_request = _handle_request - - def test_sni(self): - p = self.pathoc(sni="foo.com") - req = p.request("get:'/p/418:b\"content\"'") - assert req.content == "content" - assert req.status_code == 418 - -class TestProxyChainingSSLReconnect(tservers.HTTPChainProxyTest): - ssl = True - - def test_reconnect(self): - """ - Tests proper functionality of ConnectionHandler.server_reconnect mock. - If we have a disconnect on a secure connection that's transparently proxified to - an upstream http proxy, we need to send the CONNECT request again. - """ - def kill_requests(master, attr, exclude): - k = [0] # variable scope workaround: put into array - _func = getattr(master, attr) - def handler(r): - k[0] += 1 - if not (k[0] in exclude): - r.flow.client_conn.finish() - r.flow.error = Error("terminated") - r.reply(KILL) - return _func(r) - setattr(master, attr, handler) - - kill_requests(self.proxy.tmaster, "handle_request", - exclude=[ - # fail first request - 2, # allow second request - ]) - - kill_requests(self.chain[0].tmaster, "handle_request", - exclude=[ - 1, # CONNECT - # fail first request - 3, # reCONNECT - 4, # request - ]) - - p = self.pathoc() - req = p.request("get:'/p/418:b\"content\"'") - assert self.chain[1].tmaster.state.flow_count() == 2 # CONNECT and request - assert self.chain[0].tmaster.state.flow_count() == 4 # CONNECT, failing request, - # reCONNECT, request - assert self.proxy.tmaster.state.flow_count() == 2 # failing request, request - # (doesn't store (repeated) CONNECTs from chain[0] - # as it is a regular proxy) - assert req.content == "content" - assert req.status_code == 418 - - assert not self.proxy.tmaster.state._flow_list[0].response # killed - assert self.proxy.tmaster.state._flow_list[1].response - - assert self.chain[1].tmaster.state._flow_list[0].request.form_in == "authority" - assert self.chain[1].tmaster.state._flow_list[1].request.form_in == "relative" - - assert self.chain[0].tmaster.state._flow_list[0].request.form_in == "authority" - assert self.chain[0].tmaster.state._flow_list[1].request.form_in == "relative" - assert self.chain[0].tmaster.state._flow_list[2].request.form_in == "authority" - assert self.chain[0].tmaster.state._flow_list[3].request.form_in == "relative" - - assert self.proxy.tmaster.state._flow_list[0].request.form_in == "relative" - assert self.proxy.tmaster.state._flow_list[1].request.form_in == "relative" - - req = p.request("get:'/p/418:b\"content2\"'") - - assert req.status_code == 502 - assert self.chain[1].tmaster.state.flow_count() == 3 # + new request - assert self.chain[0].tmaster.state.flow_count() == 6 # + new request, repeated CONNECT from chain[1] - # (both terminated) - assert self.proxy.tmaster.state.flow_count() == 2 # nothing happened here + assert "Invalid HTTP request form" in r.content
\ No newline at end of file |