diff options
author | Thomas Kriechbaumer <thomas@kriechbaumer.name> | 2015-08-05 21:32:53 +0200 |
---|---|---|
committer | Thomas Kriechbaumer <thomas@kriechbaumer.name> | 2015-08-10 20:34:27 +0200 |
commit | 690b8b4f4e00d60b373b5a1481930f21bbc5054a (patch) | |
tree | dd0091035e4027e792174cb2dcf63eb37afa0a56 /test/http/http1/test_protocol.py | |
parent | c2832ef72bd4eed485a1c8d4bcb732da69896444 (diff) | |
download | mitmproxy-690b8b4f4e00d60b373b5a1481930f21bbc5054a.tar.gz mitmproxy-690b8b4f4e00d60b373b5a1481930f21bbc5054a.tar.bz2 mitmproxy-690b8b4f4e00d60b373b5a1481930f21bbc5054a.zip |
add move tests and code from mitmproxy
Diffstat (limited to 'test/http/http1/test_protocol.py')
-rw-r--r-- | test/http/http1/test_protocol.py | 265 |
1 files changed, 169 insertions, 96 deletions
diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py index e3c3ff43..ff70b87d 100644 --- a/test/http/http1/test_protocol.py +++ b/test/http/http1/test_protocol.py @@ -3,16 +3,40 @@ import textwrap import binascii from netlib import http, odict, tcp, tutils +from netlib.http import semantics from netlib.http.http1 import HTTP1Protocol from ... import tservers -def mock_protocol(data='', chunked=False): +class NoContentLengthHTTPHandler(tcp.BaseHandler): + def handle(self): + self.wfile.write("HTTP/1.1 200 OK\r\n\r\nbar\r\n\r\n") + self.wfile.flush() + + +def mock_protocol(data=''): rfile = cStringIO.StringIO(data) wfile = cStringIO.StringIO() return HTTP1Protocol(rfile=rfile, wfile=wfile) +def match_http_string(data): + return textwrap.dedent(data).strip().replace('\n', '\r\n') + + +def test_stripped_chunked_encoding_no_content(): + """ + https://github.com/mitmproxy/mitmproxy/issues/186 + """ + + r = tutils.treq(content="") + r.headers["Transfer-Encoding"] = ["chunked"] + assert "Content-Length" in mock_protocol()._assemble_request_headers(r) + + r = tutils.tresp(content="") + r.headers["Transfer-Encoding"] = ["chunked"] + assert "Content-Length" in mock_protocol()._assemble_response_headers(r) + def test_has_chunked_encoding(): h = odict.ODictCaseless() @@ -75,7 +99,6 @@ def test_connection_close(): assert HTTP1Protocol.connection_close((1, 1), h) - def test_read_http_body_request(): h = odict.ODictCaseless() data = "testing" @@ -85,7 +108,7 @@ def test_read_http_body_request(): def test_read_http_body_response(): h = odict.ODictCaseless() data = "testing" - assert mock_protocol(data, chunked=True).read_http_body(h, None, "GET", 200, False) == "testing" + assert mock_protocol(data).read_http_body(h, None, "GET", 200, False) == "testing" def test_read_http_body(): @@ -129,13 +152,13 @@ def test_read_http_body(): # test no content length: limit > actual content h = odict.ODictCaseless() data = "testing" - assert len(mock_protocol(data, chunked=True).read_http_body(h, 100, "GET", 200, False)) == 7 + assert len(mock_protocol(data).read_http_body(h, 100, "GET", 200, False)) == 7 # test no content length: limit < actual content data = "testing" tutils.raises( http.HttpError, - mock_protocol(data, chunked=True).read_http_body, + mock_protocol(data).read_http_body, h, 4, "GET", 200, False ) @@ -143,7 +166,7 @@ def test_read_http_body(): h = odict.ODictCaseless() h["transfer-encoding"] = ["chunked"] data = "5\r\naaaaa\r\n0\r\n\r\n" - assert mock_protocol(data, chunked=True).read_http_body(h, 100, "GET", 200, False) == "aaaaa" + assert mock_protocol(data).read_http_body(h, 100, "GET", 200, False) == "aaaaa" def test_expected_http_body_size(): @@ -167,6 +190,13 @@ def test_expected_http_body_size(): assert HTTP1Protocol.expected_http_body_size(h, True, "GET", None) == 0 +def test_get_request_line(): + data = "\nfoo" + p = mock_protocol(data) + assert p._get_request_line() == "foo" + assert not p._get_request_line() + + def test_parse_http_protocol(): assert HTTP1Protocol._parse_http_protocol("HTTP/1.1") == (1, 1) assert HTTP1Protocol._parse_http_protocol("HTTP/0.0") == (0, 0) @@ -269,96 +299,7 @@ class TestReadHeaders: assert self._read(data) is None -class NoContentLengthHTTPHandler(tcp.BaseHandler): - - def handle(self): - self.wfile.write("HTTP/1.1 200 OK\r\n\r\nbar\r\n\r\n") - self.wfile.flush() - - -class TestReadResponseNoContentLength(tservers.ServerTestBase): - handler = NoContentLengthHTTPHandler - - def test_no_content_length(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - resp = HTTP1Protocol(c).read_response("GET", None) - assert resp.body == "bar\r\n\r\n" - - -def test_read_response(): - def tst(data, method, body_size_limit, include_body=True): - data = textwrap.dedent(data) - return mock_protocol(data).read_response( - method, body_size_limit, include_body=include_body - ) - - tutils.raises("server disconnect", tst, "", "GET", None) - tutils.raises("invalid server response", tst, "foo", "GET", None) - data = """ - HTTP/1.1 200 OK - """ - assert tst(data, "GET", None) == http.Response( - (1, 1), 200, 'OK', odict.ODictCaseless(), '' - ) - data = """ - HTTP/1.1 200 - """ - assert tst(data, "GET", None) == http.Response( - (1, 1), 200, '', odict.ODictCaseless(), '' - ) - data = """ - HTTP/x 200 OK - """ - tutils.raises("invalid http version", tst, data, "GET", None) - data = """ - HTTP/1.1 xx OK - """ - tutils.raises("invalid server response", tst, data, "GET", None) - - data = """ - HTTP/1.1 100 CONTINUE - - HTTP/1.1 200 OK - """ - assert tst(data, "GET", None) == http.Response( - (1, 1), 100, 'CONTINUE', odict.ODictCaseless(), '' - ) - - data = """ - HTTP/1.1 200 OK - Content-Length: 3 - - foo - """ - assert tst(data, "GET", None).body == 'foo' - assert tst(data, "HEAD", None).body == '' - - data = """ - HTTP/1.1 200 OK - \tContent-Length: 3 - - foo - """ - tutils.raises("invalid headers", tst, data, "GET", None) - - data = """ - HTTP/1.1 200 OK - Content-Length: 3 - - foo - """ - assert tst(data, "GET", None, include_body=False).body is None - - -def test_get_request_line(): - data = "\nfoo" - p = mock_protocol(data) - assert p._get_request_line() == "foo" - assert not p._get_request_line() - - -class TestReadRequest(): +class TestReadRequest(object): def tst(self, data, **kwargs): return mock_protocol(data).read_request(**kwargs) @@ -385,6 +326,10 @@ class TestReadRequest(): "\r\n" ) + def test_empty(self): + v = self.tst("", allow_empty=True) + assert isinstance(v, semantics.EmptyRequest) + def test_asterisk_form_in(self): v = self.tst("OPTIONS * HTTP/1.1") assert v.form_in == "relative" @@ -427,3 +372,131 @@ class TestReadRequest(): assert p.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" assert v.body == "foo" assert p.tcp_handler.rfile.read(3) == "bar" + + +class TestReadResponse(object): + def tst(self, data, method, body_size_limit, include_body=True): + data = textwrap.dedent(data) + return mock_protocol(data).read_response( + method, body_size_limit, include_body=include_body + ) + + def test_errors(self): + tutils.raises("server disconnect", self.tst, "", "GET", None) + tutils.raises("invalid server response", self.tst, "foo", "GET", None) + + def test_simple(self): + data = """ + HTTP/1.1 200 + """ + assert self.tst(data, "GET", None) == http.Response( + (1, 1), 200, '', odict.ODictCaseless(), '' + ) + + def test_simple_message(self): + data = """ + HTTP/1.1 200 OK + """ + assert self.tst(data, "GET", None) == http.Response( + (1, 1), 200, 'OK', odict.ODictCaseless(), '' + ) + + def test_invalid_http_version(self): + data = """ + HTTP/x 200 OK + """ + tutils.raises("invalid http version", self.tst, data, "GET", None) + + def test_invalid_status_code(self): + data = """ + HTTP/1.1 xx OK + """ + tutils.raises("invalid server response", self.tst, data, "GET", None) + + def test_valid_with_continue(self): + data = """ + HTTP/1.1 100 CONTINUE + + HTTP/1.1 200 OK + """ + assert self.tst(data, "GET", None) == http.Response( + (1, 1), 100, 'CONTINUE', odict.ODictCaseless(), '' + ) + + def test_simple_body(self): + data = """ + HTTP/1.1 200 OK + Content-Length: 3 + + foo + """ + assert self.tst(data, "GET", None).body == 'foo' + assert self.tst(data, "HEAD", None).body == '' + + def test_invalid_headers(self): + data = """ + HTTP/1.1 200 OK + \tContent-Length: 3 + + foo + """ + tutils.raises("invalid headers", self.tst, data, "GET", None) + + def test_without_body(self): + data = """ + HTTP/1.1 200 OK + Content-Length: 3 + + foo + """ + assert self.tst(data, "GET", None, include_body=False).body is None + + +class TestReadResponseNoContentLength(tservers.ServerTestBase): + handler = NoContentLengthHTTPHandler + + def test_no_content_length(self): + c = tcp.TCPClient(("127.0.0.1", self.port)) + c.connect() + resp = HTTP1Protocol(c).read_response("GET", None) + assert resp.body == "bar\r\n\r\n" + + +class TestAssembleRequest(object): + def test_simple(self): + req = tutils.treq() + b = HTTP1Protocol().assemble_request(req) + assert b == match_http_string(""" + GET /path HTTP/1.1 + header: qvalue + Host: address:22 + Content-Length: 7 + + content""") + + def test_body_missing(self): + req = tutils.treq(content=semantics.CONTENT_MISSING) + tutils.raises(http.HttpError, HTTP1Protocol().assemble_request, req) + + def test_not_a_request(self): + tutils.raises(AssertionError, HTTP1Protocol().assemble_request, 'foo') + + +class TestAssembleResponse(object): + def test_simple(self): + resp = tutils.tresp() + b = HTTP1Protocol().assemble_response(resp) + print(b) + assert b == match_http_string(""" + HTTP/1.1 200 OK + header_response: svalue + Content-Length: 7 + + message""") + + def test_body_missing(self): + resp = tutils.tresp(content=semantics.CONTENT_MISSING) + tutils.raises(http.HttpError, HTTP1Protocol().assemble_response, resp) + + def test_not_a_request(self): + tutils.raises(AssertionError, HTTP1Protocol().assemble_response, 'foo') |