diff options
author | Jim Shaver <dcypherd@gmail.com> | 2015-06-23 21:48:05 -0500 |
---|---|---|
committer | Jim Shaver <dcypherd@gmail.com> | 2015-06-23 21:48:05 -0500 |
commit | 080e4534253338c94e6d8c86cb3679ff15410f85 (patch) | |
tree | 6322fb822332b4135f0ff14de8c2d7137016f734 /test/test_protocol_http.py | |
parent | db5c0b210b0133d7cd58124c727dbc24480e2568 (diff) | |
parent | 074d8d7c7463cdb1f0a90e165a4b3ada3554b4c2 (diff) | |
download | mitmproxy-080e4534253338c94e6d8c86cb3679ff15410f85.tar.gz mitmproxy-080e4534253338c94e6d8c86cb3679ff15410f85.tar.bz2 mitmproxy-080e4534253338c94e6d8c86cb3679ff15410f85.zip |
Merge branch 'master' into hardfailvenv
Conflicts:
dev
Diffstat (limited to 'test/test_protocol_http.py')
-rw-r--r-- | test/test_protocol_http.py | 192 |
1 files changed, 183 insertions, 9 deletions
diff --git a/test/test_protocol_http.py b/test/test_protocol_http.py index 16870777..d8489d4d 100644 --- a/test/test_protocol_http.py +++ b/test/test_protocol_http.py @@ -1,6 +1,12 @@ -from libmproxy.protocol.http import * from cStringIO import StringIO -import tutils, tservers + +from mock import MagicMock + +from libmproxy.protocol.http import * +from netlib import odict + +import tutils +import tservers def test_HttpAuthenticationError(): @@ -54,6 +60,15 @@ class TestHTTPRequest: r.update_host_header() assert "Host" in r.headers + def test_expect_header(self): + s = StringIO( + "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar") + w = StringIO() + r = HTTPRequest.from_stream(s, wfile=w) + assert w.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" + assert r.content == "foo" + assert s.read(3) == "bar" + def test_authority_form_in(self): s = StringIO("CONNECT oops-no-port.com HTTP/1.1") tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s) @@ -70,7 +85,8 @@ class TestHTTPRequest: tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s) s = StringIO("GET http://address:22/ HTTP/1.1") r = HTTPRequest.from_stream(s) - assert r.assemble() == "GET http://address:22/ HTTP/1.1\r\nHost: address:22\r\nContent-Length: 0\r\n\r\n" + assert r.assemble( + ) == "GET http://address:22/ HTTP/1.1\r\nHost: address:22\r\nContent-Length: 0\r\n\r\n" def test_http_options_relative_form_in(self): """ @@ -91,10 +107,10 @@ class TestHTTPRequest: r.host = 'address' r.port = 80 r.scheme = "http" - assert r.assemble() == ("OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" - "Host: address\r\n" - "Content-Length: 0\r\n\r\n") - + assert r.assemble() == ( + "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" + "Host: address\r\n" + "Content-Length: 0\r\n\r\n") def test_assemble_unknown_form(self): r = tutils.treq() @@ -112,6 +128,91 @@ class TestHTTPRequest: r = tutils.treq() assert repr(r) + def test_pretty_host(self): + r = tutils.treq() + assert r.pretty_host(True) == "address" + assert r.pretty_host(False) == "address" + r.headers["host"] = ["other"] + assert r.pretty_host(True) == "other" + assert r.pretty_host(False) == "address" + r.host = None + assert r.pretty_host(True) == "other" + assert r.pretty_host(False) is None + del r.headers["host"] + assert r.pretty_host(True) is None + assert r.pretty_host(False) is None + + # Invalid IDNA + r.headers["host"] = [".disqus.com"] + assert r.pretty_host(True) == ".disqus.com" + + def test_get_form_for_urlencoded(self): + r = tutils.treq() + r.headers.add("content-type", "application/x-www-form-urlencoded") + r.get_form_urlencoded = MagicMock() + + r.get_form() + + assert r.get_form_urlencoded.called + + def test_get_form_for_multipart(self): + r = tutils.treq() + r.headers.add("content-type", "multipart/form-data") + r.get_form_multipart = MagicMock() + + r.get_form() + + assert r.get_form_multipart.called + + def test_get_cookies_none(self): + h = odict.ODictCaseless() + r = tutils.treq() + r.headers = h + assert len(r.get_cookies()) == 0 + + def test_get_cookies_single(self): + h = odict.ODictCaseless() + h["Cookie"] = ["cookiename=cookievalue"] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + assert len(result) == 1 + assert result['cookiename'] == ['cookievalue'] + + def test_get_cookies_double(self): + h = odict.ODictCaseless() + h["Cookie"] = [ + "cookiename=cookievalue;othercookiename=othercookievalue" + ] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + assert len(result) == 2 + assert result['cookiename'] == ['cookievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_get_cookies_withequalsign(self): + h = odict.ODictCaseless() + h["Cookie"] = [ + "cookiename=coo=kievalue;othercookiename=othercookievalue" + ] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + assert len(result) == 2 + assert result['cookiename'] == ['coo=kievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_set_cookies(self): + h = odict.ODictCaseless() + h["Cookie"] = ["cookiename=cookievalue"] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + result["cookiename"] = ["foo"] + r.set_cookies(result) + assert r.get_cookies()["cookiename"] == ["foo"] + class TestHTTPResponse: def test_read_from_stringio(self): @@ -128,10 +229,14 @@ class TestHTTPResponse: assert HTTPResponse.from_stream(s, "GET").code == 204 s = StringIO(_s) - r = HTTPResponse.from_stream(s, "HEAD") # HEAD must not have content by spec. We should leave it on the pipe. + # HEAD must not have content by spec. We should leave it on the pipe. + r = HTTPResponse.from_stream(s, "HEAD") assert r.code == 200 assert r.content == "" - tutils.raises("Invalid server response: 'content", HTTPResponse.from_stream, s, "GET") + tutils.raises( + "Invalid server response: 'content", + HTTPResponse.from_stream, s, "GET" + ) def test_repr(self): r = tutils.tresp() @@ -140,6 +245,74 @@ class TestHTTPResponse: assert "foo" in repr(r) assert repr(tutils.tresp(content=CONTENT_MISSING)) + def test_get_cookies_none(self): + h = odict.ODictCaseless() + resp = tutils.tresp() + resp.headers = h + assert not resp.get_cookies() + + def test_get_cookies_simple(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = ["cookiename=cookievalue"] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", odict.ODict()] + + def test_get_cookies_with_parameters(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = [ + "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "cookievalue" + attrs = result["cookiename"][0][1] + assert len(attrs) == 4 + assert attrs["domain"] == ["example.com"] + assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"] + assert attrs["path"] == ["/"] + assert attrs["httponly"] == [None] + + def test_get_cookies_no_value(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = [ + "cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/" + ] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "" + assert len(result["cookiename"][0][1]) == 2 + + def test_get_cookies_twocookies(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = ["cookiename=cookievalue", "othercookie=othervalue"] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 2 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", odict.ODict()] + assert "othercookie" in result + assert result["othercookie"][0] == ["othervalue", odict.ODict()] + + def test_set_cookies(self): + resp = tutils.tresp() + v = resp.get_cookies() + v.add("foo", ["bar", odict.ODictCaseless()]) + resp.set_cookies(v) + + v = resp.get_cookies() + assert len(v) == 1 + assert v["foo"] == [["bar", odict.ODictCaseless()]] + class TestHTTPFlow(object): def test_repr(self): @@ -149,6 +322,7 @@ class TestHTTPFlow(object): class TestInvalidRequests(tservers.HTTPProxTest): ssl = True + def test_double_connect(self): p = self.pathoc() r = p.request("connect:'%s:%s'" % ("127.0.0.1", self.server2.port)) |