diff options
author | Jim Shaver <dcypherd@gmail.com> | 2015-05-31 01:21:44 -0400 |
---|---|---|
committer | Jim Shaver <dcypherd@gmail.com> | 2015-05-31 01:21:44 -0400 |
commit | b51363b3ca43f6572acb673186e6ae78a1f48434 (patch) | |
tree | a7488b32871c142141a813dc6ff2ede172672c31 /test/test_protocol_http.py | |
parent | 4fe2c069cca07aadf983f54e18dac4de492d5d69 (diff) | |
parent | 06fba18106a8f759ec6f08453e86772a170c653b (diff) | |
download | mitmproxy-b51363b3ca43f6572acb673186e6ae78a1f48434.tar.gz mitmproxy-b51363b3ca43f6572acb673186e6ae78a1f48434.tar.bz2 mitmproxy-b51363b3ca43f6572acb673186e6ae78a1f48434.zip |
Merge remote-tracking branch 'upstream/master' into print-bracket-fix
Conflicts:
examples/har_extractor.py
examples/nonblocking.py
examples/read_dumpfile
libmproxy/web/app.py
Diffstat (limited to 'test/test_protocol_http.py')
-rw-r--r-- | test/test_protocol_http.py | 188 |
1 files changed, 179 insertions, 9 deletions
diff --git a/test/test_protocol_http.py b/test/test_protocol_http.py index 16870777..884a528e 100644 --- a/test/test_protocol_http.py +++ b/test/test_protocol_http.py @@ -1,6 +1,12 @@ -from libmproxy.protocol.http import * from cStringIO import StringIO -import tutils, tservers + +from mock import MagicMock + +from libmproxy.protocol.http import * +from netlib import odict + +import tutils +import tservers def test_HttpAuthenticationError(): @@ -54,6 +60,15 @@ class TestHTTPRequest: r.update_host_header() assert "Host" in r.headers + def test_expect_header(self): + s = StringIO( + "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar") + w = StringIO() + r = HTTPRequest.from_stream(s, wfile=w) + assert w.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" + assert r.content == "foo" + assert s.read(3) == "bar" + def test_authority_form_in(self): s = StringIO("CONNECT oops-no-port.com HTTP/1.1") tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s) @@ -70,7 +85,8 @@ class TestHTTPRequest: tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s) s = StringIO("GET http://address:22/ HTTP/1.1") r = HTTPRequest.from_stream(s) - assert r.assemble() == "GET http://address:22/ HTTP/1.1\r\nHost: address:22\r\nContent-Length: 0\r\n\r\n" + assert r.assemble( + ) == "GET http://address:22/ HTTP/1.1\r\nHost: address:22\r\nContent-Length: 0\r\n\r\n" def test_http_options_relative_form_in(self): """ @@ -91,10 +107,10 @@ class TestHTTPRequest: r.host = 'address' r.port = 80 r.scheme = "http" - assert r.assemble() == ("OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" - "Host: address\r\n" - "Content-Length: 0\r\n\r\n") - + assert r.assemble() == ( + "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" + "Host: address\r\n" + "Content-Length: 0\r\n\r\n") def test_assemble_unknown_form(self): r = tutils.treq() @@ -112,6 +128,87 @@ class TestHTTPRequest: r = tutils.treq() assert repr(r) + def test_pretty_host(self): + r = tutils.treq() + assert r.pretty_host(True) == "address" + assert r.pretty_host(False) == "address" + r.headers["host"] = ["other"] + assert r.pretty_host(True) == "other" + assert r.pretty_host(False) == "address" + r.host = None + assert r.pretty_host(True) == "other" + assert r.pretty_host(False) is None + del r.headers["host"] + assert r.pretty_host(True) is None + assert r.pretty_host(False) is None + + def test_get_form_for_urlencoded(self): + r = tutils.treq() + r.headers.add("content-type", "application/x-www-form-urlencoded") + r.get_form_urlencoded = MagicMock() + + r.get_form() + + assert r.get_form_urlencoded.called + + def test_get_form_for_multipart(self): + r = tutils.treq() + r.headers.add("content-type", "multipart/form-data") + r.get_form_multipart = MagicMock() + + r.get_form() + + assert r.get_form_multipart.called + + def test_get_cookies_none(self): + h = odict.ODictCaseless() + r = tutils.treq() + r.headers = h + assert len(r.get_cookies()) == 0 + + def test_get_cookies_single(self): + h = odict.ODictCaseless() + h["Cookie"] = ["cookiename=cookievalue"] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + assert len(result) == 1 + assert result['cookiename'] == ['cookievalue'] + + def test_get_cookies_double(self): + h = odict.ODictCaseless() + h["Cookie"] = [ + "cookiename=cookievalue;othercookiename=othercookievalue" + ] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + assert len(result) == 2 + assert result['cookiename'] == ['cookievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_get_cookies_withequalsign(self): + h = odict.ODictCaseless() + h["Cookie"] = [ + "cookiename=coo=kievalue;othercookiename=othercookievalue" + ] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + assert len(result) == 2 + assert result['cookiename'] == ['coo=kievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_set_cookies(self): + h = odict.ODictCaseless() + h["Cookie"] = ["cookiename=cookievalue"] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + result["cookiename"] = ["foo"] + r.set_cookies(result) + assert r.get_cookies()["cookiename"] == ["foo"] + class TestHTTPResponse: def test_read_from_stringio(self): @@ -128,10 +225,14 @@ class TestHTTPResponse: assert HTTPResponse.from_stream(s, "GET").code == 204 s = StringIO(_s) - r = HTTPResponse.from_stream(s, "HEAD") # HEAD must not have content by spec. We should leave it on the pipe. + # HEAD must not have content by spec. We should leave it on the pipe. + r = HTTPResponse.from_stream(s, "HEAD") assert r.code == 200 assert r.content == "" - tutils.raises("Invalid server response: 'content", HTTPResponse.from_stream, s, "GET") + tutils.raises( + "Invalid server response: 'content", + HTTPResponse.from_stream, s, "GET" + ) def test_repr(self): r = tutils.tresp() @@ -140,6 +241,74 @@ class TestHTTPResponse: assert "foo" in repr(r) assert repr(tutils.tresp(content=CONTENT_MISSING)) + def test_get_cookies_none(self): + h = odict.ODictCaseless() + resp = tutils.tresp() + resp.headers = h + assert not resp.get_cookies() + + def test_get_cookies_simple(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = ["cookiename=cookievalue"] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", odict.ODict()] + + def test_get_cookies_with_parameters(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = [ + "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "cookievalue" + attrs = result["cookiename"][0][1] + assert len(attrs) == 4 + assert attrs["domain"] == ["example.com"] + assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"] + assert attrs["path"] == ["/"] + assert attrs["httponly"] == [None] + + def test_get_cookies_no_value(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = [ + "cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/" + ] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "" + assert len(result["cookiename"][0][1]) == 2 + + def test_get_cookies_twocookies(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = ["cookiename=cookievalue", "othercookie=othervalue"] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 2 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", odict.ODict()] + assert "othercookie" in result + assert result["othercookie"][0] == ["othervalue", odict.ODict()] + + def test_set_cookies(self): + resp = tutils.tresp() + v = resp.get_cookies() + v.add("foo", ["bar", odict.ODictCaseless()]) + resp.set_cookies(v) + + v = resp.get_cookies() + assert len(v) == 1 + assert v["foo"] == [["bar", odict.ODictCaseless()]] + class TestHTTPFlow(object): def test_repr(self): @@ -149,6 +318,7 @@ class TestHTTPFlow(object): class TestInvalidRequests(tservers.HTTPProxTest): ssl = True + def test_double_connect(self): p = self.pathoc() r = p.request("connect:'%s:%s'" % ("127.0.0.1", self.server2.port)) |