diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/http/http1/test_assemble.py | 29 | ||||
-rw-r--r-- | test/http/http1/test_read.py | 34 | ||||
-rw-r--r-- | test/http/http2/test_protocol.py | 12 | ||||
-rw-r--r-- | test/http/test_cookies.py | 1 | ||||
-rw-r--r-- | test/http/test_headers.py | 3 | ||||
-rw-r--r-- | test/http/test_message.py | 153 | ||||
-rw-r--r-- | test/http/test_models.py | 395 | ||||
-rw-r--r-- | test/http/test_request.py | 238 | ||||
-rw-r--r-- | test/http/test_response.py | 100 | ||||
-rw-r--r-- | test/http/test_status_codes.py | 6 | ||||
-rw-r--r-- | test/test_utils.py | 8 | ||||
-rw-r--r-- | test/websockets/test_websockets.py | 2 |
12 files changed, 556 insertions, 425 deletions
diff --git a/test/http/http1/test_assemble.py b/test/http/http1/test_assemble.py index 963e7549..ed94292d 100644 --- a/test/http/http1/test_assemble.py +++ b/test/http/http1/test_assemble.py @@ -20,7 +20,7 @@ def test_assemble_request(): ) with raises(HttpException): - assemble_request(treq(body=CONTENT_MISSING)) + assemble_request(treq(content=CONTENT_MISSING)) def test_assemble_request_head(): @@ -40,7 +40,7 @@ def test_assemble_response(): ) with raises(HttpException): - assemble_response(tresp(body=CONTENT_MISSING)) + assemble_response(tresp(content=CONTENT_MISSING)) def test_assemble_response_head(): @@ -62,31 +62,40 @@ def test_assemble_body(): def test_assemble_request_line(): - assert _assemble_request_line(treq()) == b"GET /path HTTP/1.1" + assert _assemble_request_line(treq().data) == b"GET /path HTTP/1.1" - authority_request = treq(method=b"CONNECT", form_in="authority") + authority_request = treq(method=b"CONNECT", first_line_format="authority").data assert _assemble_request_line(authority_request) == b"CONNECT address:22 HTTP/1.1" - absolute_request = treq(form_in="absolute") + absolute_request = treq(first_line_format="absolute").data assert _assemble_request_line(absolute_request) == b"GET http://address:22/path HTTP/1.1" with raises(RuntimeError): - _assemble_request_line(treq(), "invalid_form") + _assemble_request_line(treq(first_line_format="invalid_form").data) def test_assemble_request_headers(): # https://github.com/mitmproxy/mitmproxy/issues/186 - r = treq(body=b"") + r = treq(content=b"") r.headers["Transfer-Encoding"] = "chunked" - c = _assemble_request_headers(r) + c = _assemble_request_headers(r.data) assert b"Transfer-Encoding" in c - assert b"host" in _assemble_request_headers(treq(headers=Headers())) + +def test_assemble_request_headers_host_header(): + r = treq() + r.headers = Headers() + c = _assemble_request_headers(r.data) + assert b"host" in c + + r.host = None + c = _assemble_request_headers(r.data) + assert b"host" not in c def test_assemble_response_headers(): # https://github.com/mitmproxy/mitmproxy/issues/186 - r = tresp(body=b"") + r = tresp(content=b"") r.headers["Transfer-Encoding"] = "chunked" c = _assemble_response_headers(r) assert b"Transfer-Encoding" in c diff --git a/test/http/http1/test_read.py b/test/http/http1/test_read.py index 98d31bc2..45f61b4f 100644 --- a/test/http/http1/test_read.py +++ b/test/http/http1/test_read.py @@ -2,7 +2,7 @@ from __future__ import absolute_import, print_function, division from io import BytesIO import textwrap from mock import Mock -from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect +from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect, TcpDisconnect from netlib.http import Headers from netlib.http.http1.read import ( read_request, read_response, read_request_head, @@ -16,8 +16,8 @@ from netlib.tutils import treq, tresp, raises def test_read_request(): rfile = BytesIO(b"GET / HTTP/1.1\r\n\r\nskip") r = read_request(rfile) - assert r.method == b"GET" - assert r.body == b"" + assert r.method == "GET" + assert r.content == b"" assert r.timestamp_end assert rfile.read() == b"skip" @@ -32,9 +32,9 @@ def test_read_request_head(): rfile.reset_timestamps = Mock() rfile.first_byte_timestamp = 42 r = read_request_head(rfile) - assert r.method == b"GET" + assert r.method == "GET" assert r.headers["Content-Length"] == "4" - assert r.body is None + assert r.content is None assert rfile.reset_timestamps.called assert r.timestamp_start == 42 assert rfile.read() == b"skip" @@ -45,7 +45,7 @@ def test_read_response(): rfile = BytesIO(b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody") r = read_response(rfile, req) assert r.status_code == 418 - assert r.body == b"body" + assert r.content == b"body" assert r.timestamp_end @@ -61,7 +61,7 @@ def test_read_response_head(): r = read_response_head(rfile) assert r.status_code == 418 assert r.headers["Content-Length"] == "4" - assert r.body is None + assert r.content is None assert rfile.reset_timestamps.called assert r.timestamp_start == 42 assert rfile.read() == b"skip" @@ -100,6 +100,11 @@ class TestReadBody(object): with raises(HttpException): b"".join(read_body(rfile, -1, 3)) + def test_max_chunk_size(self): + rfile = BytesIO(b"123456") + assert list(read_body(rfile, -1, max_chunk_size=None)) == [b"123456"] + rfile = BytesIO(b"123456") + assert list(read_body(rfile, -1, max_chunk_size=1)) == [b"1", b"2", b"3", b"4", b"5", b"6"] def test_connection_close(): headers = Headers() @@ -112,6 +117,9 @@ def test_connection_close(): headers["connection"] = "close" assert connection_close(b"HTTP/1.1", headers) + headers["connection"] = "foobar" + assert connection_close(b"HTTP/1.0", headers) + assert not connection_close(b"HTTP/1.1", headers) def test_expected_http_body_size(): # Expect: 100-continue @@ -169,6 +177,11 @@ def test_get_first_line(): rfile = BytesIO(b"") _get_first_line(rfile) + with raises(HttpReadDisconnect): + rfile = Mock() + rfile.readline.side_effect = TcpDisconnect + _get_first_line(rfile) + def test_read_request_line(): def t(b): @@ -187,7 +200,8 @@ def test_read_request_line(): t(b"GET / WTF/1.1") with raises(HttpSyntaxException): t(b"this is not http") - + with raises(HttpReadDisconnect): + t(b"") def test_parse_authority_form(): assert _parse_authority_form(b"foo:42") == (b"foo", 42) @@ -218,6 +232,8 @@ def test_read_response_line(): t(b"HTTP/1.1 OK OK") with raises(HttpSyntaxException): t(b"WTF/1.1 200 OK") + with raises(HttpReadDisconnect): + t(b"") def test_check_http_version(): @@ -283,7 +299,7 @@ class TestReadHeaders(object): def test_read_chunked(): - req = treq(body=None) + req = treq(content=None) req.headers["Transfer-Encoding"] = "chunked" data = b"1\r\na\r\n0\r\n" diff --git a/test/http/http2/test_protocol.py b/test/http/http2/test_protocol.py index a55941e0..6bda96f5 100644 --- a/test/http/http2/test_protocol.py +++ b/test/http/http2/test_protocol.py @@ -65,7 +65,7 @@ class TestProtocol: class TestCheckALPNMatch(tservers.ServerTestBase): handler = EchoHandler ssl = dict( - alpn_select=ALPN_PROTO_H2, + alpn_select=b'h2', ) if OpenSSL._util.lib.Cryptography_HAS_ALPN: @@ -73,7 +73,7 @@ class TestCheckALPNMatch(tservers.ServerTestBase): def test_check_alpn(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - c.convert_to_ssl(alpn_protos=[ALPN_PROTO_H2]) + c.convert_to_ssl(alpn_protos=[b'h2']) protocol = HTTP2Protocol(c) assert protocol.check_alpn() @@ -89,7 +89,7 @@ class TestCheckALPNMismatch(tservers.ServerTestBase): def test_check_alpn(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - c.convert_to_ssl(alpn_protos=[ALPN_PROTO_H2]) + c.convert_to_ssl(alpn_protos=[b'h2']) protocol = HTTP2Protocol(c) tutils.raises(NotImplementedError, protocol.check_alpn) @@ -311,7 +311,7 @@ class TestReadRequest(tservers.ServerTestBase): assert req.stream_id assert req.headers.fields == [[':method', 'GET'], [':path', '/'], [':scheme', 'https']] - assert req.body == b'foobar' + assert req.content == b'foobar' class TestReadRequestRelative(tservers.ServerTestBase): @@ -417,7 +417,7 @@ class TestReadResponse(tservers.ServerTestBase): assert resp.status_code == 200 assert resp.msg == "" assert resp.headers.fields == [[':status', '200'], ['etag', 'foobar']] - assert resp.body == b'foobar' + assert resp.content == b'foobar' assert resp.timestamp_end @@ -444,7 +444,7 @@ class TestReadEmptyResponse(tservers.ServerTestBase): assert resp.status_code == 200 assert resp.msg == "" assert resp.headers.fields == [[':status', '200'], ['etag', 'foobar']] - assert resp.body == b'' + assert resp.content == b'' class TestAssembleRequest(object): diff --git a/test/http/test_cookies.py b/test/http/test_cookies.py index 413b6241..34bb64f2 100644 --- a/test/http/test_cookies.py +++ b/test/http/test_cookies.py @@ -21,6 +21,7 @@ def test_read_quoted_string(): [(r'"f\\o" x', 0), (r"f\o", 6)], [(r'"f\\" x', 0), (r"f" + '\\', 5)], [('"fo\\\"" x', 0), ("fo\"", 6)], + [('"foo" x', 7), ("", 8)], ] for q, a in tokens: assert cookies._read_quoted_string(*q) == a diff --git a/test/http/test_headers.py b/test/http/test_headers.py index f1af1feb..8bddc0b2 100644 --- a/test/http/test_headers.py +++ b/test/http/test_headers.py @@ -38,6 +38,9 @@ class TestHeaders(object): assert headers["Host"] == "example.com" assert headers["Accept"] == "text/plain" + with raises(ValueError): + Headers([[b"Host", u"not-bytes"]]) + def test_getitem(self): headers = Headers(Host="example.com") assert headers["Host"] == "example.com" diff --git a/test/http/test_message.py b/test/http/test_message.py new file mode 100644 index 00000000..2c37dc3e --- /dev/null +++ b/test/http/test_message.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division + +from netlib.http import decoded, Headers +from netlib.tutils import tresp, raises + + +def _test_passthrough_attr(message, attr): + assert getattr(message, attr) == getattr(message.data, attr) + setattr(message, attr, "foo") + assert getattr(message.data, attr) == "foo" + + +def _test_decoded_attr(message, attr): + assert getattr(message, attr) == getattr(message.data, attr).decode("utf8") + # Set str, get raw bytes + setattr(message, attr, "foo") + assert getattr(message.data, attr) == b"foo" + # Set raw bytes, get decoded + setattr(message.data, attr, b"bar") + assert getattr(message, attr) == "bar" + # Set bytes, get raw bytes + setattr(message, attr, b"baz") + assert getattr(message.data, attr) == b"baz" + + # Set UTF8 + setattr(message, attr, "Non-Autorisé") + assert getattr(message.data, attr) == b"Non-Autoris\xc3\xa9" + # Don't fail on garbage + setattr(message.data, attr, b"foo\xFF\x00bar") + assert getattr(message, attr).startswith("foo") + assert getattr(message, attr).endswith("bar") + # foo.bar = foo.bar should not cause any side effects. + d = getattr(message, attr) + setattr(message, attr, d) + assert getattr(message.data, attr) == b"foo\xFF\x00bar" + + +class TestMessageData(object): + def test_eq_ne(self): + data = tresp(timestamp_start=42, timestamp_end=42).data + same = tresp(timestamp_start=42, timestamp_end=42).data + assert data == same + assert not data != same + + other = tresp(content=b"foo").data + assert not data == other + assert data != other + + assert data != 0 + + +class TestMessage(object): + + def test_init(self): + resp = tresp() + assert resp.data + + def test_eq_ne(self): + resp = tresp(timestamp_start=42, timestamp_end=42) + same = tresp(timestamp_start=42, timestamp_end=42) + assert resp == same + assert not resp != same + + other = tresp(timestamp_start=0, timestamp_end=0) + assert not resp == other + assert resp != other + + assert resp != 0 + + def test_content_length_update(self): + resp = tresp() + resp.content = b"foo" + assert resp.data.content == b"foo" + assert resp.headers["content-length"] == "3" + resp.content = b"" + assert resp.data.content == b"" + assert resp.headers["content-length"] == "0" + + def test_content_basic(self): + _test_passthrough_attr(tresp(), "content") + + def test_headers(self): + _test_passthrough_attr(tresp(), "headers") + + def test_timestamp_start(self): + _test_passthrough_attr(tresp(), "timestamp_start") + + def test_timestamp_end(self): + _test_passthrough_attr(tresp(), "timestamp_end") + + def teste_http_version(self): + _test_decoded_attr(tresp(), "http_version") + + +class TestDecodedDecorator(object): + + def test_simple(self): + r = tresp() + assert r.content == b"message" + assert "content-encoding" not in r.headers + assert r.encode("gzip") + + assert r.headers["content-encoding"] + assert r.content != b"message" + with decoded(r): + assert "content-encoding" not in r.headers + assert r.content == b"message" + assert r.headers["content-encoding"] + assert r.content != b"message" + + def test_modify(self): + r = tresp() + assert "content-encoding" not in r.headers + assert r.encode("gzip") + + with decoded(r): + r.content = b"foo" + + assert r.content != b"foo" + r.decode() + assert r.content == b"foo" + + def test_unknown_ce(self): + r = tresp() + r.headers["content-encoding"] = "zopfli" + r.content = b"foo" + with decoded(r): + assert r.headers["content-encoding"] + assert r.content == b"foo" + assert r.headers["content-encoding"] + assert r.content == b"foo" + + def test_cannot_decode(self): + r = tresp() + assert r.encode("gzip") + r.content = b"foo" + with decoded(r): + assert r.headers["content-encoding"] + assert r.content == b"foo" + assert r.headers["content-encoding"] + assert r.content != b"foo" + r.decode() + assert r.content == b"foo" + + def test_cannot_encode(self): + r = tresp() + assert r.encode("gzip") + with decoded(r): + r.content = None + + assert "content-encoding" not in r.headers + assert r.content is None diff --git a/test/http/test_models.py b/test/http/test_models.py deleted file mode 100644 index 10e0795a..00000000 --- a/test/http/test_models.py +++ /dev/null @@ -1,395 +0,0 @@ -import mock - -from netlib import tutils -from netlib import utils -from netlib.odict import ODict, ODictCaseless -from netlib.http import Request, Response, Headers, CONTENT_MISSING, HDR_FORM_URLENCODED, \ - HDR_FORM_MULTIPART - - -class TestRequest(object): - def test_repr(self): - r = tutils.treq() - assert repr(r) - - def test_headers(self): - tutils.raises(AssertionError, Request, - 'form_in', - 'method', - 'scheme', - 'host', - 'port', - 'path', - b"HTTP/1.1", - 'foobar', - ) - - req = Request( - 'form_in', - 'method', - 'scheme', - 'host', - 'port', - 'path', - b"HTTP/1.1", - ) - assert isinstance(req.headers, Headers) - - def test_equal(self): - a = tutils.treq(timestamp_start=42, timestamp_end=43) - b = tutils.treq(timestamp_start=42, timestamp_end=43) - assert a == b - - assert not a == 'foo' - assert not b == 'foo' - assert not 'foo' == a - assert not 'foo' == b - - - def test_anticache(self): - req = tutils.treq() - req.headers["If-Modified-Since"] = "foo" - req.headers["If-None-Match"] = "bar" - req.anticache() - assert "If-Modified-Since" not in req.headers - assert "If-None-Match" not in req.headers - - def test_anticomp(self): - req = tutils.treq() - req.headers["Accept-Encoding"] = "foobar" - req.anticomp() - assert req.headers["Accept-Encoding"] == "identity" - - def test_constrain_encoding(self): - req = tutils.treq() - req.headers["Accept-Encoding"] = "identity, gzip, foo" - req.constrain_encoding() - assert "foo" not in req.headers["Accept-Encoding"] - - def test_update_host(self): - req = tutils.treq() - req.headers["Host"] = "" - req.host = "foobar" - req.update_host_header() - assert req.headers["Host"] == "foobar" - - def test_get_form(self): - req = tutils.treq() - assert req.get_form() == ODict() - - @mock.patch("netlib.http.Request.get_form_multipart") - @mock.patch("netlib.http.Request.get_form_urlencoded") - def test_get_form_with_url_encoded(self, mock_method_urlencoded, mock_method_multipart): - req = tutils.treq() - assert req.get_form() == ODict() - - req = tutils.treq() - req.body = "foobar" - req.headers["Content-Type"] = HDR_FORM_URLENCODED - req.get_form() - assert req.get_form_urlencoded.called - assert not req.get_form_multipart.called - - @mock.patch("netlib.http.Request.get_form_multipart") - @mock.patch("netlib.http.Request.get_form_urlencoded") - def test_get_form_with_multipart(self, mock_method_urlencoded, mock_method_multipart): - req = tutils.treq() - req.body = "foobar" - req.headers["Content-Type"] = HDR_FORM_MULTIPART - req.get_form() - assert not req.get_form_urlencoded.called - assert req.get_form_multipart.called - - def test_get_form_urlencoded(self): - req = tutils.treq(body="foobar") - assert req.get_form_urlencoded() == ODict() - - req.headers["Content-Type"] = HDR_FORM_URLENCODED - assert req.get_form_urlencoded() == ODict(utils.urldecode(req.body)) - - def test_get_form_multipart(self): - req = tutils.treq(body="foobar") - assert req.get_form_multipart() == ODict() - - req.headers["Content-Type"] = HDR_FORM_MULTIPART - assert req.get_form_multipart() == ODict( - utils.multipartdecode( - req.headers, - req.body - ) - ) - - def test_set_form_urlencoded(self): - req = tutils.treq() - req.set_form_urlencoded(ODict([('foo', 'bar'), ('rab', 'oof')])) - assert req.headers["Content-Type"] == HDR_FORM_URLENCODED - assert req.body - - def test_get_path_components(self): - req = tutils.treq() - assert req.get_path_components() - # TODO: add meaningful assertions - - def test_set_path_components(self): - req = tutils.treq() - req.set_path_components([b"foo", b"bar"]) - # TODO: add meaningful assertions - - def test_get_query(self): - req = tutils.treq() - assert req.get_query().lst == [] - - req.url = "http://localhost:80/foo?bar=42" - assert req.get_query().lst == [(b"bar", b"42")] - - def test_set_query(self): - req = tutils.treq() - req.set_query(ODict([])) - - def test_pretty_host(self): - r = tutils.treq() - assert r.pretty_host(True) == "address" - assert r.pretty_host(False) == "address" - r.headers["host"] = "other" - assert r.pretty_host(True) == "other" - assert r.pretty_host(False) == "address" - r.host = None - assert r.pretty_host(True) == "other" - assert r.pretty_host(False) is None - del r.headers["host"] - assert r.pretty_host(True) is None - assert r.pretty_host(False) is None - - # Invalid IDNA - r.headers["host"] = ".disqus.com" - assert r.pretty_host(True) == ".disqus.com" - - def test_pretty_url(self): - req = tutils.treq() - req.form_out = "authority" - assert req.pretty_url(True) == b"address:22" - assert req.pretty_url(False) == b"address:22" - - req.form_out = "relative" - assert req.pretty_url(True) == b"http://address:22/path" - assert req.pretty_url(False) == b"http://address:22/path" - - def test_get_cookies_none(self): - headers = Headers() - r = tutils.treq() - r.headers = headers - assert len(r.get_cookies()) == 0 - - def test_get_cookies_single(self): - r = tutils.treq() - r.headers = Headers(cookie="cookiename=cookievalue") - result = r.get_cookies() - assert len(result) == 1 - assert result['cookiename'] == ['cookievalue'] - - def test_get_cookies_double(self): - r = tutils.treq() - r.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue") - result = r.get_cookies() - assert len(result) == 2 - assert result['cookiename'] == ['cookievalue'] - assert result['othercookiename'] == ['othercookievalue'] - - def test_get_cookies_withequalsign(self): - r = tutils.treq() - r.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue") - result = r.get_cookies() - assert len(result) == 2 - assert result['cookiename'] == ['coo=kievalue'] - assert result['othercookiename'] == ['othercookievalue'] - - def test_set_cookies(self): - r = tutils.treq() - r.headers = Headers(cookie="cookiename=cookievalue") - result = r.get_cookies() - result["cookiename"] = ["foo"] - r.set_cookies(result) - assert r.get_cookies()["cookiename"] == ["foo"] - - def test_set_url(self): - r = tutils.treq(form_in="absolute") - r.url = b"https://otheraddress:42/ORLY" - assert r.scheme == b"https" - assert r.host == b"otheraddress" - assert r.port == 42 - assert r.path == b"/ORLY" - - try: - r.url = "//localhost:80/foo@bar" - assert False - except: - assert True - - # def test_asterisk_form_in(self): - # f = tutils.tflow(req=None) - # protocol = mock_protocol("OPTIONS * HTTP/1.1") - # f.request = HTTPRequest.from_protocol(protocol) - # - # assert f.request.form_in == "relative" - # f.request.host = f.server_conn.address.host - # f.request.port = f.server_conn.address.port - # f.request.scheme = "http" - # assert protocol.assemble(f.request) == ( - # "OPTIONS * HTTP/1.1\r\n" - # "Host: address:22\r\n" - # "Content-Length: 0\r\n\r\n") - # - # def test_relative_form_in(self): - # protocol = mock_protocol("GET /foo\xff HTTP/1.1") - # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) - # - # protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c") - # r = HTTPRequest.from_protocol(protocol) - # assert r.headers["Upgrade"] == ["h2c"] - # - # def test_expect_header(self): - # protocol = mock_protocol( - # "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar") - # r = HTTPRequest.from_protocol(protocol) - # assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" - # assert r.content == "foo" - # assert protocol.tcp_handler.rfile.read(3) == "bar" - # - # def test_authority_form_in(self): - # protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1") - # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) - # - # protocol = mock_protocol("CONNECT address:22 HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # r.scheme, r.host, r.port = "http", "address", 22 - # assert protocol.assemble(r) == ( - # "CONNECT address:22 HTTP/1.1\r\n" - # "Host: address:22\r\n" - # "Content-Length: 0\r\n\r\n") - # assert r.pretty_url(False) == "address:22" - # - # def test_absolute_form_in(self): - # protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1") - # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) - # - # protocol = mock_protocol("GET http://address:22/ HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # assert protocol.assemble(r) == ( - # "GET http://address:22/ HTTP/1.1\r\n" - # "Host: address:22\r\n" - # "Content-Length: 0\r\n\r\n") - # - # def test_http_options_relative_form_in(self): - # """ - # Exercises fix for Issue #392. - # """ - # protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # r.host = 'address' - # r.port = 80 - # r.scheme = "http" - # assert protocol.assemble(r) == ( - # "OPTIONS /secret/resource HTTP/1.1\r\n" - # "Host: address\r\n" - # "Content-Length: 0\r\n\r\n") - # - # def test_http_options_absolute_form_in(self): - # protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # r.host = 'address' - # r.port = 80 - # r.scheme = "http" - # assert protocol.assemble(r) == ( - # "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" - # "Host: address\r\n" - # "Content-Length: 0\r\n\r\n") - -class TestResponse(object): - def test_headers(self): - tutils.raises(AssertionError, Response, - b"HTTP/1.1", - 200, - headers='foobar', - ) - - resp = Response( - b"HTTP/1.1", - 200, - ) - assert isinstance(resp.headers, Headers) - - def test_equal(self): - a = tutils.tresp(timestamp_start=42, timestamp_end=43) - b = tutils.tresp(timestamp_start=42, timestamp_end=43) - assert a == b - - assert not a == 'foo' - assert not b == 'foo' - assert not 'foo' == a - assert not 'foo' == b - - def test_repr(self): - r = tutils.tresp() - assert "unknown content type" in repr(r) - r.headers["content-type"] = "foo" - assert "foo" in repr(r) - assert repr(tutils.tresp(body=CONTENT_MISSING)) - - def test_get_cookies_none(self): - resp = tutils.tresp() - resp.headers = Headers() - assert not resp.get_cookies() - - def test_get_cookies_simple(self): - resp = tutils.tresp() - resp.headers = Headers(set_cookie="cookiename=cookievalue") - result = resp.get_cookies() - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0] == ["cookievalue", ODict()] - - def test_get_cookies_with_parameters(self): - resp = tutils.tresp() - resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly") - result = resp.get_cookies() - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0][0] == "cookievalue" - attrs = result["cookiename"][0][1] - assert len(attrs) == 4 - assert attrs["domain"] == ["example.com"] - assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"] - assert attrs["path"] == ["/"] - assert attrs["httponly"] == [None] - - def test_get_cookies_no_value(self): - resp = tutils.tresp() - resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/") - result = resp.get_cookies() - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0][0] == "" - assert len(result["cookiename"][0][1]) == 2 - - def test_get_cookies_twocookies(self): - resp = tutils.tresp() - resp.headers = Headers([ - [b"Set-Cookie", b"cookiename=cookievalue"], - [b"Set-Cookie", b"othercookie=othervalue"] - ]) - result = resp.get_cookies() - assert len(result) == 2 - assert "cookiename" in result - assert result["cookiename"][0] == ["cookievalue", ODict()] - assert "othercookie" in result - assert result["othercookie"][0] == ["othervalue", ODict()] - - def test_set_cookies(self): - resp = tutils.tresp() - v = resp.get_cookies() - v.add("foo", ["bar", ODictCaseless()]) - resp.set_cookies(v) - - v = resp.get_cookies() - assert len(v) == 1 - assert v["foo"] == [["bar", ODictCaseless()]] diff --git a/test/http/test_request.py b/test/http/test_request.py new file mode 100644 index 00000000..8cf69ffe --- /dev/null +++ b/test/http/test_request.py @@ -0,0 +1,238 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division + +import six + +from netlib import utils +from netlib.http import Headers +from netlib.odict import ODict +from netlib.tutils import treq, raises +from .test_message import _test_decoded_attr, _test_passthrough_attr + + +class TestRequestData(object): + def test_init(self): + with raises(AssertionError): + treq(headers="foobar") + + assert isinstance(treq(headers=None).headers, Headers) + + +class TestRequestCore(object): + """ + Tests for builtins and the attributes that are directly proxied from the data structure + """ + def test_repr(self): + request = treq() + assert repr(request) == "Request(GET address:22/path)" + request.host = None + assert repr(request) == "Request(GET /path)" + + def test_first_line_format(self): + _test_passthrough_attr(treq(), "first_line_format") + + def test_method(self): + _test_decoded_attr(treq(), "method") + + def test_scheme(self): + _test_decoded_attr(treq(), "scheme") + + def test_port(self): + _test_passthrough_attr(treq(), "port") + + def test_path(self): + _test_decoded_attr(treq(), "path") + + def test_host(self): + if six.PY2: + from unittest import SkipTest + raise SkipTest() + + request = treq() + assert request.host == request.data.host.decode("idna") + + # Test IDNA encoding + # Set str, get raw bytes + request.host = "ídna.example" + assert request.data.host == b"xn--dna-qma.example" + # Set raw bytes, get decoded + request.data.host = b"xn--idn-gla.example" + assert request.host == "idná.example" + # Set bytes, get raw bytes + request.host = b"xn--dn-qia9b.example" + assert request.data.host == b"xn--dn-qia9b.example" + # IDNA encoding is not bijective + request.host = "fußball" + assert request.host == "fussball" + + # Don't fail on garbage + request.data.host = b"foo\xFF\x00bar" + assert request.host.startswith("foo") + assert request.host.endswith("bar") + # foo.bar = foo.bar should not cause any side effects. + d = request.host + request.host = d + assert request.data.host == b"foo\xFF\x00bar" + + def test_host_header_update(self): + request = treq() + assert "host" not in request.headers + request.host = "example.com" + assert "host" not in request.headers + + request.headers["Host"] = "foo" + request.host = "example.org" + assert request.headers["Host"] == "example.org" + + +class TestRequestUtils(object): + """ + Tests for additional convenience methods. + """ + def test_url(self): + request = treq() + assert request.url == "http://address:22/path" + + request.url = "https://otheraddress:42/foo" + assert request.scheme == "https" + assert request.host == "otheraddress" + assert request.port == 42 + assert request.path == "/foo" + + with raises(ValueError): + request.url = "not-a-url" + + def test_pretty_host(self): + request = treq() + assert request.pretty_host == "address" + assert request.host == "address" + request.headers["host"] = "other" + assert request.pretty_host == "other" + assert request.host == "address" + request.host = None + assert request.pretty_host is None + assert request.host is None + + # Invalid IDNA + request.headers["host"] = ".disqus.com" + assert request.pretty_host == ".disqus.com" + + def test_pretty_url(self): + request = treq() + assert request.url == "http://address:22/path" + assert request.pretty_url == "http://address:22/path" + request.headers["host"] = "other" + assert request.pretty_url == "http://other:22/path" + + def test_pretty_url_authority(self): + request = treq(first_line_format="authority") + assert request.pretty_url == "address:22" + + def test_get_query(self): + request = treq() + assert request.query is None + + request.url = "http://localhost:80/foo?bar=42" + assert request.query.lst == [("bar", "42")] + + def test_set_query(self): + request = treq() + request.query = ODict([]) + + def test_get_cookies_none(self): + request = treq() + request.headers = Headers() + assert len(request.cookies) == 0 + + def test_get_cookies_single(self): + request = treq() + request.headers = Headers(cookie="cookiename=cookievalue") + result = request.cookies + assert len(result) == 1 + assert result['cookiename'] == ['cookievalue'] + + def test_get_cookies_double(self): + request = treq() + request.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue") + result = request.cookies + assert len(result) == 2 + assert result['cookiename'] == ['cookievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_get_cookies_withequalsign(self): + request = treq() + request.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue") + result = request.cookies + assert len(result) == 2 + assert result['cookiename'] == ['coo=kievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_set_cookies(self): + request = treq() + request.headers = Headers(cookie="cookiename=cookievalue") + result = request.cookies + result["cookiename"] = ["foo"] + request.cookies = result + assert request.cookies["cookiename"] == ["foo"] + + def test_get_path_components(self): + request = treq(path=b"/foo/bar") + assert request.path_components == ["foo", "bar"] + + def test_set_path_components(self): + request = treq() + request.path_components = ["foo", "baz"] + assert request.path == "/foo/baz" + request.path_components = [] + assert request.path == "/" + + def test_anticache(self): + request = treq() + request.headers["If-Modified-Since"] = "foo" + request.headers["If-None-Match"] = "bar" + request.anticache() + assert "If-Modified-Since" not in request.headers + assert "If-None-Match" not in request.headers + + def test_anticomp(self): + request = treq() + request.headers["Accept-Encoding"] = "foobar" + request.anticomp() + assert request.headers["Accept-Encoding"] == "identity" + + def test_constrain_encoding(self): + request = treq() + + h = request.headers.copy() + request.constrain_encoding() # no-op if there is no accept_encoding header. + assert request.headers == h + + request.headers["Accept-Encoding"] = "identity, gzip, foo" + request.constrain_encoding() + assert "foo" not in request.headers["Accept-Encoding"] + assert "gzip" in request.headers["Accept-Encoding"] + + def test_get_urlencoded_form(self): + request = treq(content="foobar") + assert request.urlencoded_form is None + + request.headers["Content-Type"] = "application/x-www-form-urlencoded" + assert request.urlencoded_form == ODict(utils.urldecode(request.content)) + + def test_set_urlencoded_form(self): + request = treq() + request.urlencoded_form = ODict([('foo', 'bar'), ('rab', 'oof')]) + assert request.headers["Content-Type"] == "application/x-www-form-urlencoded" + assert request.content + + def test_get_multipart_form(self): + request = treq(content="foobar") + assert request.multipart_form is None + + request.headers["Content-Type"] = "multipart/form-data" + assert request.multipart_form == ODict( + utils.multipartdecode( + request.headers, + request.content + ) + ) diff --git a/test/http/test_response.py b/test/http/test_response.py new file mode 100644 index 00000000..a1f4abd7 --- /dev/null +++ b/test/http/test_response.py @@ -0,0 +1,100 @@ +from __future__ import absolute_import, print_function, division + +from netlib.http import Headers +from netlib.odict import ODict, ODictCaseless +from netlib.tutils import raises, tresp +from .test_message import _test_passthrough_attr, _test_decoded_attr + + +class TestResponseData(object): + def test_init(self): + with raises(AssertionError): + tresp(headers="foobar") + + assert isinstance(tresp(headers=None).headers, Headers) + + +class TestResponseCore(object): + """ + Tests for builtins and the attributes that are directly proxied from the data structure + """ + def test_repr(self): + response = tresp() + assert repr(response) == "Response(200 OK, unknown content type, 7B)" + response.content = None + assert repr(response) == "Response(200 OK, no content)" + + def test_status_code(self): + _test_passthrough_attr(tresp(), "status_code") + + def test_reason(self): + _test_decoded_attr(tresp(), "reason") + + +class TestResponseUtils(object): + """ + Tests for additional convenience methods. + """ + def test_get_cookies_none(self): + resp = tresp() + resp.headers = Headers() + assert not resp.cookies + + def test_get_cookies_empty(self): + resp = tresp() + resp.headers = Headers(set_cookie="") + assert not resp.cookies + + def test_get_cookies_simple(self): + resp = tresp() + resp.headers = Headers(set_cookie="cookiename=cookievalue") + result = resp.cookies + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", ODict()] + + def test_get_cookies_with_parameters(self): + resp = tresp() + resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly") + result = resp.cookies + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "cookievalue" + attrs = result["cookiename"][0][1] + assert len(attrs) == 4 + assert attrs["domain"] == ["example.com"] + assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"] + assert attrs["path"] == ["/"] + assert attrs["httponly"] == [None] + + def test_get_cookies_no_value(self): + resp = tresp() + resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/") + result = resp.cookies + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "" + assert len(result["cookiename"][0][1]) == 2 + + def test_get_cookies_twocookies(self): + resp = tresp() + resp.headers = Headers([ + [b"Set-Cookie", b"cookiename=cookievalue"], + [b"Set-Cookie", b"othercookie=othervalue"] + ]) + result = resp.cookies + assert len(result) == 2 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", ODict()] + assert "othercookie" in result + assert result["othercookie"][0] == ["othervalue", ODict()] + + def test_set_cookies(self): + resp = tresp() + v = resp.cookies + v.add("foo", ["bar", ODictCaseless()]) + resp.set_cookies(v) + + v = resp.cookies + assert len(v) == 1 + assert v["foo"] == [["bar", ODictCaseless()]] diff --git a/test/http/test_status_codes.py b/test/http/test_status_codes.py new file mode 100644 index 00000000..9fea6b70 --- /dev/null +++ b/test/http/test_status_codes.py @@ -0,0 +1,6 @@ +from netlib.http import status_codes + + +def test_simple(): + assert status_codes.IM_A_TEAPOT == 418 + assert status_codes.RESPONSES[418] == "I'm a teapot" diff --git a/test/test_utils.py b/test/test_utils.py index 17636cc4..b096e5bc 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -84,10 +84,10 @@ def test_parse_url(): def test_unparse_url(): - assert utils.unparse_url(b"http", b"foo.com", 99, b"") == b"http://foo.com:99" - assert utils.unparse_url(b"http", b"foo.com", 80, b"/bar") == b"http://foo.com/bar" - assert utils.unparse_url(b"https", b"foo.com", 80, b"") == b"https://foo.com:80" - assert utils.unparse_url(b"https", b"foo.com", 443, b"") == b"https://foo.com" + assert utils.unparse_url("http", "foo.com", 99, "") == "http://foo.com:99" + assert utils.unparse_url("http", "foo.com", 80, "/bar") == "http://foo.com/bar" + assert utils.unparse_url("https", "foo.com", 80, "") == "https://foo.com:80" + assert utils.unparse_url("https", "foo.com", 443, "") == "https://foo.com" def test_urlencode(): diff --git a/test/websockets/test_websockets.py b/test/websockets/test_websockets.py index 4ae4cf45..9a1e5d3d 100644 --- a/test/websockets/test_websockets.py +++ b/test/websockets/test_websockets.py @@ -68,7 +68,7 @@ class WebSocketsClient(tcp.TCPClient): self.wfile.write(bytes(headers) + b"\r\n") self.wfile.flush() - resp = read_response(self.rfile, treq(method="GET")) + resp = read_response(self.rfile, treq(method=b"GET")) server_nonce = self.protocol.check_server_handshake(resp.headers) if not server_nonce == self.protocol.create_server_nonce(self.client_nonce): |