aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_protocol_http.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_protocol_http.py')
-rw-r--r--test/test_protocol_http.py201
1 files changed, 201 insertions, 0 deletions
diff --git a/test/test_protocol_http.py b/test/test_protocol_http.py
new file mode 100644
index 00000000..3bf5af22
--- /dev/null
+++ b/test/test_protocol_http.py
@@ -0,0 +1,201 @@
+from libmproxy import proxy # FIXME: Remove
+from libmproxy.protocol.http import *
+from libmproxy.protocol import KILL
+from cStringIO import StringIO
+import tutils, tservers
+
+
+def test_HttpAuthenticationError():
+ x = HttpAuthenticationError({"foo": "bar"})
+ assert str(x)
+ assert "foo" in x.auth_headers
+
+
+def test_stripped_chunked_encoding_no_content():
+ """
+ https://github.com/mitmproxy/mitmproxy/issues/186
+ """
+ r = tutils.tresp(content="")
+ r.headers["Transfer-Encoding"] = ["chunked"]
+ assert "Content-Length" in r._assemble_headers()
+
+ r = tutils.treq(content="")
+ r.headers["Transfer-Encoding"] = ["chunked"]
+ assert "Content-Length" in r._assemble_headers()
+
+
+class TestHTTPRequest:
+ def test_asterisk_form(self):
+ s = StringIO("OPTIONS * HTTP/1.1")
+ f = tutils.tflow_noreq()
+ f.request = HTTPRequest.from_stream(s)
+ assert f.request.form_in == "asterisk"
+ x = f.request._assemble()
+ assert f.request._assemble() == "OPTIONS * HTTP/1.1\r\nHost: address:22\r\n\r\n"
+
+ def test_origin_form(self):
+ s = StringIO("GET /foo\xff HTTP/1.1")
+ tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s)
+
+ def test_authority_form(self):
+ s = StringIO("CONNECT oops-no-port.com HTTP/1.1")
+ tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s)
+ s = StringIO("CONNECT address:22 HTTP/1.1")
+ r = HTTPRequest.from_stream(s)
+ assert r._assemble() == "CONNECT address:22 HTTP/1.1\r\nHost: address:22\r\n\r\n"
+
+
+ def test_absolute_form(self):
+ s = StringIO("GET oops-no-protocol.com HTTP/1.1")
+ tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s)
+ s = StringIO("GET http://address:22/ HTTP/1.1")
+ r = HTTPRequest.from_stream(s)
+ assert r._assemble() == "GET http://address:22/ HTTP/1.1\r\nHost: address:22\r\n\r\n"
+
+ def test_assemble_unknown_form(self):
+ r = tutils.treq()
+ tutils.raises("Invalid request form", r._assemble, "antiauthority")
+
+
+ def test_set_url(self):
+ r = tutils.treq_absolute()
+ r.set_url("https://otheraddress:42/ORLY")
+ assert r.scheme == "https"
+ assert r.host == "otheraddress"
+ assert r.port == 42
+ assert r.path == "/ORLY"
+
+
+class TestHTTPResponse:
+ def test_read_from_stringio(self):
+ _s = "HTTP/1.1 200 OK\r\n" \
+ "Content-Length: 7\r\n" \
+ "\r\n"\
+ "content\r\n" \
+ "HTTP/1.1 204 OK\r\n" \
+ "\r\n"
+ s = StringIO(_s)
+ r = HTTPResponse.from_stream(s, "GET")
+ assert r.code == 200
+ assert r.content == "content"
+ assert HTTPResponse.from_stream(s, "GET").code == 204
+
+ s = StringIO(_s)
+ r = HTTPResponse.from_stream(s, "HEAD") # HEAD must not have content by spec. We should leave it on the pipe.
+ assert r.code == 200
+ assert r.content == ""
+ tutils.raises("Invalid server response: 'content", HTTPResponse.from_stream, s, "GET")
+
+
+class TestInvalidRequests(tservers.HTTPProxTest):
+ ssl = True
+
+ def test_double_connect(self):
+ p = self.pathoc()
+ r = p.request("connect:'%s:%s'" % ("127.0.0.1", self.server2.port))
+ assert r.status_code == 502
+ assert "Must not CONNECT on already encrypted connection" in r.content
+
+ def test_origin_request(self):
+ p = self.pathoc_raw()
+ p.connect()
+ r = p.request("get:/p/200")
+ assert r.status_code == 400
+ assert "Invalid request form" in r.content
+
+
+class TestProxyChaining(tservers.HTTPChainProxyTest):
+ def test_all(self):
+ self.chain[1].tmaster.replacehooks.add("~q", "foo", "bar") # replace in request
+ self.chain[0].tmaster.replacehooks.add("~q", "foo", "oh noes!")
+ self.proxy.tmaster.replacehooks.add("~q", "bar", "baz")
+ self.chain[0].tmaster.replacehooks.add("~s", "baz", "ORLY") # replace in response
+
+ p = self.pathoc()
+ req = p.request("get:'%s/p/418:b\"foo\"'" % self.server.urlbase)
+ assert req.content == "ORLY"
+ assert req.status_code == 418
+
+class TestProxyChainingSSL(tservers.HTTPChainProxyTest):
+ ssl = True
+
+ def test_simple(self):
+
+ p = self.pathoc()
+ req = p.request("get:'/p/418:b\"content\"'")
+ assert req.content == "content"
+ assert req.status_code == 418
+
+ assert self.chain[1].tmaster.state.flow_count() == 2 # CONNECT from pathoc to chain[0],
+ # request from pathoc to chain[0]
+ assert self.chain[0].tmaster.state.flow_count() == 2 # CONNECT from chain[1] to proxy,
+ # request from chain[1] to proxy
+ assert self.proxy.tmaster.state.flow_count() == 1 # request from chain[0] (regular proxy doesn't store CONNECTs)
+
+class TestProxyChainingSSLReconnect(tservers.HTTPChainProxyTest):
+ ssl = True
+
+ def test_reconnect(self):
+ """
+ Tests proper functionality of ConnectionHandler.server_reconnect mock.
+ If we have a disconnect on a secure connection that's transparently proxified to
+ an upstream http proxy, we need to send the CONNECT request again.
+ """
+ def kill_requests(master, attr, exclude):
+ k = [0] # variable scope workaround: put into array
+ _func = getattr(master, attr)
+ def handler(r):
+ k[0] += 1
+ if not (k[0] in exclude):
+ r.flow.client_conn.finish()
+ r.flow.error = Error("terminated")
+ r.reply(KILL)
+ return _func(r)
+ setattr(master, attr, handler)
+
+ kill_requests(self.proxy.tmaster, "handle_request",
+ exclude=[
+ # fail first request
+ 2, # allow second request
+ ])
+
+ kill_requests(self.chain[0].tmaster, "handle_request",
+ exclude=[
+ 1, # CONNECT
+ # fail first request
+ 3, # reCONNECT
+ 4, # request
+ ])
+
+ p = self.pathoc()
+ req = p.request("get:'/p/418:b\"content\"'")
+ assert self.chain[1].tmaster.state.flow_count() == 2 # CONNECT and request
+ assert self.chain[0].tmaster.state.flow_count() == 4 # CONNECT, failing request,
+ # reCONNECT, request
+ assert self.proxy.tmaster.state.flow_count() == 2 # failing request, request
+ # (doesn't store (repeated) CONNECTs from chain[0]
+ # as it is a regular proxy)
+ assert req.content == "content"
+ assert req.status_code == 418
+
+ assert not self.proxy.tmaster.state._flow_list[0].response # killed
+ assert self.proxy.tmaster.state._flow_list[1].response
+
+ assert self.chain[1].tmaster.state._flow_list[0].request.form_in == "authority"
+ assert self.chain[1].tmaster.state._flow_list[1].request.form_in == "origin"
+
+ assert self.chain[0].tmaster.state._flow_list[0].request.form_in == "authority"
+ assert self.chain[0].tmaster.state._flow_list[1].request.form_in == "origin"
+ assert self.chain[0].tmaster.state._flow_list[2].request.form_in == "authority"
+ assert self.chain[0].tmaster.state._flow_list[3].request.form_in == "origin"
+
+ assert self.proxy.tmaster.state._flow_list[0].request.form_in == "origin"
+ assert self.proxy.tmaster.state._flow_list[1].request.form_in == "origin"
+
+ req = p.request("get:'/p/418:b\"content2\"'")
+
+ assert req.status_code == 502
+ assert self.chain[1].tmaster.state.flow_count() == 3 # + new request
+ assert self.chain[0].tmaster.state.flow_count() == 6 # + new request, repeated CONNECT from chain[1]
+ # (both terminated)
+ assert self.proxy.tmaster.state.flow_count() == 2 # nothing happened here