diff options
Diffstat (limited to 'netlib/test/http')
-rw-r--r-- | netlib/test/http/__init__.py | 0 | ||||
-rw-r--r-- | netlib/test/http/http1/__init__.py | 0 | ||||
-rw-r--r-- | netlib/test/http/http1/test_assemble.py | 102 | ||||
-rw-r--r-- | netlib/test/http/http1/test_read.py | 333 | ||||
-rw-r--r-- | netlib/test/http/http2/__init__.py | 0 | ||||
-rw-r--r-- | netlib/test/http/http2/test_connections.py | 540 | ||||
-rw-r--r-- | netlib/test/http/test_authentication.py | 122 | ||||
-rw-r--r-- | netlib/test/http/test_cookies.py | 218 | ||||
-rw-r--r-- | netlib/test/http/test_headers.py | 152 | ||||
-rw-r--r-- | netlib/test/http/test_message.py | 153 | ||||
-rw-r--r-- | netlib/test/http/test_request.py | 238 | ||||
-rw-r--r-- | netlib/test/http/test_response.py | 102 | ||||
-rw-r--r-- | netlib/test/http/test_status_codes.py | 6 | ||||
-rw-r--r-- | netlib/test/http/test_user_agents.py | 6 |
14 files changed, 0 insertions, 1972 deletions
diff --git a/netlib/test/http/__init__.py b/netlib/test/http/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/netlib/test/http/__init__.py +++ /dev/null diff --git a/netlib/test/http/http1/__init__.py b/netlib/test/http/http1/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/netlib/test/http/http1/__init__.py +++ /dev/null diff --git a/netlib/test/http/http1/test_assemble.py b/netlib/test/http/http1/test_assemble.py deleted file mode 100644 index 31a62438..00000000 --- a/netlib/test/http/http1/test_assemble.py +++ /dev/null @@ -1,102 +0,0 @@ -from __future__ import absolute_import, print_function, division -from netlib.exceptions import HttpException -from netlib.http import CONTENT_MISSING, Headers -from netlib.http.http1.assemble import ( - assemble_request, assemble_request_head, assemble_response, - assemble_response_head, _assemble_request_line, _assemble_request_headers, - _assemble_response_headers, - assemble_body) -from netlib.tutils import treq, raises, tresp - - -def test_assemble_request(): - c = assemble_request(treq()) == ( - b"GET /path HTTP/1.1\r\n" - b"header: qvalue\r\n" - b"Host: address:22\r\n" - b"Content-Length: 7\r\n" - b"\r\n" - b"content" - ) - - with raises(HttpException): - assemble_request(treq(content=CONTENT_MISSING)) - - -def test_assemble_request_head(): - c = assemble_request_head(treq(content="foo")) - assert b"GET" in c - assert b"qvalue" in c - assert b"content-length" in c - assert b"foo" not in c - - -def test_assemble_response(): - c = assemble_response(tresp()) == ( - b"HTTP/1.1 200 OK\r\n" - b"header-response: svalue\r\n" - b"Content-Length: 7\r\n" - b"\r\n" - b"message" - ) - - with raises(HttpException): - assemble_response(tresp(content=CONTENT_MISSING)) - - -def test_assemble_response_head(): - c = assemble_response_head(tresp()) - assert b"200" in c - assert b"svalue" in c - assert b"message" not in c - - -def test_assemble_body(): - c = list(assemble_body(Headers(), [b"body"])) - assert c == [b"body"] - - c = list(assemble_body(Headers(transfer_encoding="chunked"), [b"123456789a", b""])) - assert c == [b"a\r\n123456789a\r\n", b"0\r\n\r\n"] - - c = list(assemble_body(Headers(transfer_encoding="chunked"), [b"123456789a"])) - assert c == [b"a\r\n123456789a\r\n", b"0\r\n\r\n"] - - -def test_assemble_request_line(): - assert _assemble_request_line(treq().data) == b"GET /path HTTP/1.1" - - authority_request = treq(method=b"CONNECT", first_line_format="authority").data - assert _assemble_request_line(authority_request) == b"CONNECT address:22 HTTP/1.1" - - absolute_request = treq(first_line_format="absolute").data - assert _assemble_request_line(absolute_request) == b"GET http://address:22/path HTTP/1.1" - - with raises(RuntimeError): - _assemble_request_line(treq(first_line_format="invalid_form").data) - - -def test_assemble_request_headers(): - # https://github.com/mitmproxy/mitmproxy/issues/186 - r = treq(content=b"") - r.headers["Transfer-Encoding"] = "chunked" - c = _assemble_request_headers(r.data) - assert b"Transfer-Encoding" in c - - -def test_assemble_request_headers_host_header(): - r = treq() - r.headers = Headers() - c = _assemble_request_headers(r.data) - assert b"host" in c - - r.host = None - c = _assemble_request_headers(r.data) - assert b"host" not in c - - -def test_assemble_response_headers(): - # https://github.com/mitmproxy/mitmproxy/issues/186 - r = tresp(content=b"") - r.headers["Transfer-Encoding"] = "chunked" - c = _assemble_response_headers(r) - assert b"Transfer-Encoding" in c diff --git a/netlib/test/http/http1/test_read.py b/netlib/test/http/http1/test_read.py deleted file mode 100644 index 90234070..00000000 --- a/netlib/test/http/http1/test_read.py +++ /dev/null @@ -1,333 +0,0 @@ -from __future__ import absolute_import, print_function, division -from io import BytesIO -import textwrap -from mock import Mock -from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect, TcpDisconnect -from netlib.http import Headers -from netlib.http.http1.read import ( - read_request, read_response, read_request_head, - read_response_head, read_body, connection_close, expected_http_body_size, _get_first_line, - _read_request_line, _parse_authority_form, _read_response_line, _check_http_version, - _read_headers, _read_chunked -) -from netlib.tutils import treq, tresp, raises - - -def test_read_request(): - rfile = BytesIO(b"GET / HTTP/1.1\r\n\r\nskip") - r = read_request(rfile) - assert r.method == "GET" - assert r.content == b"" - assert r.timestamp_end - assert rfile.read() == b"skip" - - -def test_read_request_head(): - rfile = BytesIO( - b"GET / HTTP/1.1\r\n" - b"Content-Length: 4\r\n" - b"\r\n" - b"skip" - ) - rfile.reset_timestamps = Mock() - rfile.first_byte_timestamp = 42 - r = read_request_head(rfile) - assert r.method == "GET" - assert r.headers["Content-Length"] == "4" - assert r.content is None - assert rfile.reset_timestamps.called - assert r.timestamp_start == 42 - assert rfile.read() == b"skip" - - -def test_read_response(): - req = treq() - rfile = BytesIO(b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody") - r = read_response(rfile, req) - assert r.status_code == 418 - assert r.content == b"body" - assert r.timestamp_end - - -def test_read_response_head(): - rfile = BytesIO( - b"HTTP/1.1 418 I'm a teapot\r\n" - b"Content-Length: 4\r\n" - b"\r\n" - b"skip" - ) - rfile.reset_timestamps = Mock() - rfile.first_byte_timestamp = 42 - r = read_response_head(rfile) - assert r.status_code == 418 - assert r.headers["Content-Length"] == "4" - assert r.content is None - assert rfile.reset_timestamps.called - assert r.timestamp_start == 42 - assert rfile.read() == b"skip" - - -class TestReadBody(object): - def test_chunked(self): - rfile = BytesIO(b"3\r\nfoo\r\n0\r\n\r\nbar") - body = b"".join(read_body(rfile, None)) - assert body == b"foo" - assert rfile.read() == b"bar" - - def test_known_size(self): - rfile = BytesIO(b"foobar") - body = b"".join(read_body(rfile, 3)) - assert body == b"foo" - assert rfile.read() == b"bar" - - def test_known_size_limit(self): - rfile = BytesIO(b"foobar") - with raises(HttpException): - b"".join(read_body(rfile, 3, 2)) - - def test_known_size_too_short(self): - rfile = BytesIO(b"foo") - with raises(HttpException): - b"".join(read_body(rfile, 6)) - - def test_unknown_size(self): - rfile = BytesIO(b"foobar") - body = b"".join(read_body(rfile, -1)) - assert body == b"foobar" - - def test_unknown_size_limit(self): - rfile = BytesIO(b"foobar") - with raises(HttpException): - b"".join(read_body(rfile, -1, 3)) - - def test_max_chunk_size(self): - rfile = BytesIO(b"123456") - assert list(read_body(rfile, -1, max_chunk_size=None)) == [b"123456"] - rfile = BytesIO(b"123456") - assert list(read_body(rfile, -1, max_chunk_size=1)) == [b"1", b"2", b"3", b"4", b"5", b"6"] - -def test_connection_close(): - headers = Headers() - assert connection_close(b"HTTP/1.0", headers) - assert not connection_close(b"HTTP/1.1", headers) - - headers["connection"] = "keep-alive" - assert not connection_close(b"HTTP/1.1", headers) - - headers["connection"] = "close" - assert connection_close(b"HTTP/1.1", headers) - - headers["connection"] = "foobar" - assert connection_close(b"HTTP/1.0", headers) - assert not connection_close(b"HTTP/1.1", headers) - -def test_expected_http_body_size(): - # Expect: 100-continue - assert expected_http_body_size( - treq(headers=Headers(expect="100-continue", content_length="42")) - ) == 0 - - # http://tools.ietf.org/html/rfc7230#section-3.3 - assert expected_http_body_size( - treq(method=b"HEAD"), - tresp(headers=Headers(content_length="42")) - ) == 0 - assert expected_http_body_size( - treq(method=b"CONNECT"), - tresp() - ) == 0 - for code in (100, 204, 304): - assert expected_http_body_size( - treq(), - tresp(status_code=code) - ) == 0 - - # chunked - assert expected_http_body_size( - treq(headers=Headers(transfer_encoding="chunked")), - ) is None - - # explicit length - for val in (b"foo", b"-7"): - with raises(HttpSyntaxException): - expected_http_body_size( - treq(headers=Headers(content_length=val)) - ) - assert expected_http_body_size( - treq(headers=Headers(content_length="42")) - ) == 42 - - # no length - assert expected_http_body_size( - treq(headers=Headers()) - ) == 0 - assert expected_http_body_size( - treq(headers=Headers()), tresp(headers=Headers()) - ) == -1 - - -def test_get_first_line(): - rfile = BytesIO(b"foo\r\nbar") - assert _get_first_line(rfile) == b"foo" - - rfile = BytesIO(b"\r\nfoo\r\nbar") - assert _get_first_line(rfile) == b"foo" - - with raises(HttpReadDisconnect): - rfile = BytesIO(b"") - _get_first_line(rfile) - - with raises(HttpReadDisconnect): - rfile = Mock() - rfile.readline.side_effect = TcpDisconnect - _get_first_line(rfile) - - -def test_read_request_line(): - def t(b): - return _read_request_line(BytesIO(b)) - - assert (t(b"GET / HTTP/1.1") == - ("relative", b"GET", None, None, None, b"/", b"HTTP/1.1")) - assert (t(b"OPTIONS * HTTP/1.1") == - ("relative", b"OPTIONS", None, None, None, b"*", b"HTTP/1.1")) - assert (t(b"CONNECT foo:42 HTTP/1.1") == - ("authority", b"CONNECT", None, b"foo", 42, None, b"HTTP/1.1")) - assert (t(b"GET http://foo:42/bar HTTP/1.1") == - ("absolute", b"GET", b"http", b"foo", 42, b"/bar", b"HTTP/1.1")) - - with raises(HttpSyntaxException): - t(b"GET / WTF/1.1") - with raises(HttpSyntaxException): - t(b"this is not http") - with raises(HttpReadDisconnect): - t(b"") - -def test_parse_authority_form(): - assert _parse_authority_form(b"foo:42") == (b"foo", 42) - with raises(HttpSyntaxException): - _parse_authority_form(b"foo") - with raises(HttpSyntaxException): - _parse_authority_form(b"foo:bar") - with raises(HttpSyntaxException): - _parse_authority_form(b"foo:99999999") - with raises(HttpSyntaxException): - _parse_authority_form(b"f\x00oo:80") - - -def test_read_response_line(): - def t(b): - return _read_response_line(BytesIO(b)) - - assert t(b"HTTP/1.1 200 OK") == (b"HTTP/1.1", 200, b"OK") - assert t(b"HTTP/1.1 200") == (b"HTTP/1.1", 200, b"") - - # https://github.com/mitmproxy/mitmproxy/issues/784 - assert t(b"HTTP/1.1 200 Non-Autoris\xc3\xa9") == (b"HTTP/1.1", 200, b"Non-Autoris\xc3\xa9") - - with raises(HttpSyntaxException): - assert t(b"HTTP/1.1") - - with raises(HttpSyntaxException): - t(b"HTTP/1.1 OK OK") - with raises(HttpSyntaxException): - t(b"WTF/1.1 200 OK") - with raises(HttpReadDisconnect): - t(b"") - - -def test_check_http_version(): - _check_http_version(b"HTTP/0.9") - _check_http_version(b"HTTP/1.0") - _check_http_version(b"HTTP/1.1") - _check_http_version(b"HTTP/2.0") - with raises(HttpSyntaxException): - _check_http_version(b"WTF/1.0") - with raises(HttpSyntaxException): - _check_http_version(b"HTTP/1.10") - with raises(HttpSyntaxException): - _check_http_version(b"HTTP/1.b") - - -class TestReadHeaders(object): - @staticmethod - def _read(data): - return _read_headers(BytesIO(data)) - - def test_read_simple(self): - data = ( - b"Header: one\r\n" - b"Header2: two\r\n" - b"\r\n" - ) - headers = self._read(data) - assert headers.fields == [[b"Header", b"one"], [b"Header2", b"two"]] - - def test_read_multi(self): - data = ( - b"Header: one\r\n" - b"Header: two\r\n" - b"\r\n" - ) - headers = self._read(data) - assert headers.fields == [[b"Header", b"one"], [b"Header", b"two"]] - - def test_read_continued(self): - data = ( - b"Header: one\r\n" - b"\ttwo\r\n" - b"Header2: three\r\n" - b"\r\n" - ) - headers = self._read(data) - assert headers.fields == [[b"Header", b"one\r\n two"], [b"Header2", b"three"]] - - def test_read_continued_err(self): - data = b"\tfoo: bar\r\n" - with raises(HttpSyntaxException): - self._read(data) - - def test_read_err(self): - data = b"foo" - with raises(HttpSyntaxException): - self._read(data) - - def test_read_empty_name(self): - data = b":foo" - with raises(HttpSyntaxException): - self._read(data) - - def test_read_empty_value(self): - data = b"bar:" - headers = self._read(data) - assert headers.fields == [[b"bar", b""]] - -def test_read_chunked(): - req = treq(content=None) - req.headers["Transfer-Encoding"] = "chunked" - - data = b"1\r\na\r\n0\r\n" - with raises(HttpSyntaxException): - b"".join(_read_chunked(BytesIO(data))) - - data = b"1\r\na\r\n0\r\n\r\n" - assert b"".join(_read_chunked(BytesIO(data))) == b"a" - - data = b"\r\n\r\n1\r\na\r\n1\r\nb\r\n0\r\n\r\n" - assert b"".join(_read_chunked(BytesIO(data))) == b"ab" - - data = b"\r\n" - with raises("closed prematurely"): - b"".join(_read_chunked(BytesIO(data))) - - data = b"1\r\nfoo" - with raises("malformed chunked body"): - b"".join(_read_chunked(BytesIO(data))) - - data = b"foo\r\nfoo" - with raises(HttpSyntaxException): - b"".join(_read_chunked(BytesIO(data))) - - data = b"5\r\naaaaa\r\n0\r\n\r\n" - with raises("too large"): - b"".join(_read_chunked(BytesIO(data), limit=2)) diff --git a/netlib/test/http/http2/__init__.py b/netlib/test/http/http2/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/netlib/test/http/http2/__init__.py +++ /dev/null diff --git a/netlib/test/http/http2/test_connections.py b/netlib/test/http/http2/test_connections.py deleted file mode 100644 index a115fc7c..00000000 --- a/netlib/test/http/http2/test_connections.py +++ /dev/null @@ -1,540 +0,0 @@ -import OpenSSL -import mock -import codecs - -from hyperframe.frame import * - -from netlib import tcp, http, utils, tservers -from netlib.tutils import raises -from netlib.exceptions import TcpDisconnect -from netlib.http.http2.connections import HTTP2Protocol, TCPHandler - - -class TestTCPHandlerWrapper: - def test_wrapped(self): - h = TCPHandler(rfile='foo', wfile='bar') - p = HTTP2Protocol(h) - assert p.tcp_handler.rfile == 'foo' - assert p.tcp_handler.wfile == 'bar' - - def test_direct(self): - p = HTTP2Protocol(rfile='foo', wfile='bar') - assert isinstance(p.tcp_handler, TCPHandler) - assert p.tcp_handler.rfile == 'foo' - assert p.tcp_handler.wfile == 'bar' - - -class EchoHandler(tcp.BaseHandler): - sni = None - - def handle(self): - while True: - v = self.rfile.safe_read(1) - self.wfile.write(v) - self.wfile.flush() - - -class TestProtocol: - @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_server_connection_preface") - @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_client_connection_preface") - def test_perform_connection_preface(self, mock_client_method, mock_server_method): - protocol = HTTP2Protocol(is_server=False) - protocol.connection_preface_performed = True - - protocol.perform_connection_preface() - assert not mock_client_method.called - assert not mock_server_method.called - - protocol.perform_connection_preface(force=True) - assert mock_client_method.called - assert not mock_server_method.called - - @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_server_connection_preface") - @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_client_connection_preface") - def test_perform_connection_preface_server(self, mock_client_method, mock_server_method): - protocol = HTTP2Protocol(is_server=True) - protocol.connection_preface_performed = True - - protocol.perform_connection_preface() - assert not mock_client_method.called - assert not mock_server_method.called - - protocol.perform_connection_preface(force=True) - assert not mock_client_method.called - assert mock_server_method.called - - -class TestCheckALPNMatch(tservers.ServerTestBase): - handler = EchoHandler - ssl = dict( - alpn_select=b'h2', - ) - - if OpenSSL._util.lib.Cryptography_HAS_ALPN: - - def test_check_alpn(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - c.convert_to_ssl(alpn_protos=[b'h2']) - protocol = HTTP2Protocol(c) - assert protocol.check_alpn() - - -class TestCheckALPNMismatch(tservers.ServerTestBase): - handler = EchoHandler - ssl = dict( - alpn_select=None, - ) - - if OpenSSL._util.lib.Cryptography_HAS_ALPN: - - def test_check_alpn(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - c.convert_to_ssl(alpn_protos=[b'h2']) - protocol = HTTP2Protocol(c) - with raises(NotImplementedError): - protocol.check_alpn() - - -class TestPerformServerConnectionPreface(tservers.ServerTestBase): - class handler(tcp.BaseHandler): - - def handle(self): - # send magic - self.wfile.write(codecs.decode('505249202a20485454502f322e300d0a0d0a534d0d0a0d0a', 'hex_codec')) - self.wfile.flush() - - # send empty settings frame - self.wfile.write(codecs.decode('000000040000000000', 'hex_codec')) - self.wfile.flush() - - # check empty settings frame - raw = utils.http2_read_raw_frame(self.rfile) - assert raw == codecs.decode('00000c040000000000000200000000000300000001', 'hex_codec') - - # check settings acknowledgement - raw = utils.http2_read_raw_frame(self.rfile) - assert raw == codecs.decode('000000040100000000', 'hex_codec') - - # send settings acknowledgement - self.wfile.write(codecs.decode('000000040100000000', 'hex_codec')) - self.wfile.flush() - - def test_perform_server_connection_preface(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - protocol = HTTP2Protocol(c) - - assert not protocol.connection_preface_performed - protocol.perform_server_connection_preface() - assert protocol.connection_preface_performed - - with raises(TcpDisconnect): - protocol.perform_server_connection_preface(force=True) - - -class TestPerformClientConnectionPreface(tservers.ServerTestBase): - class handler(tcp.BaseHandler): - - def handle(self): - # check magic - assert self.rfile.read(24) == HTTP2Protocol.CLIENT_CONNECTION_PREFACE - - # check empty settings frame - assert self.rfile.read(9) ==\ - codecs.decode('000000040000000000', 'hex_codec') - - # send empty settings frame - self.wfile.write(codecs.decode('000000040000000000', 'hex_codec')) - self.wfile.flush() - - # check settings acknowledgement - assert self.rfile.read(9) == \ - codecs.decode('000000040100000000', 'hex_codec') - - # send settings acknowledgement - self.wfile.write(codecs.decode('000000040100000000', 'hex_codec')) - self.wfile.flush() - - def test_perform_client_connection_preface(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - protocol = HTTP2Protocol(c) - - assert not protocol.connection_preface_performed - protocol.perform_client_connection_preface() - assert protocol.connection_preface_performed - - -class TestClientStreamIds(object): - c = tcp.TCPClient(("127.0.0.1", 0)) - protocol = HTTP2Protocol(c) - - def test_client_stream_ids(self): - assert self.protocol.current_stream_id is None - assert self.protocol._next_stream_id() == 1 - assert self.protocol.current_stream_id == 1 - assert self.protocol._next_stream_id() == 3 - assert self.protocol.current_stream_id == 3 - assert self.protocol._next_stream_id() == 5 - assert self.protocol.current_stream_id == 5 - - -class TestServerStreamIds(object): - c = tcp.TCPClient(("127.0.0.1", 0)) - protocol = HTTP2Protocol(c, is_server=True) - - def test_server_stream_ids(self): - assert self.protocol.current_stream_id is None - assert self.protocol._next_stream_id() == 2 - assert self.protocol.current_stream_id == 2 - assert self.protocol._next_stream_id() == 4 - assert self.protocol.current_stream_id == 4 - assert self.protocol._next_stream_id() == 6 - assert self.protocol.current_stream_id == 6 - - -class TestApplySettings(tservers.ServerTestBase): - class handler(tcp.BaseHandler): - def handle(self): - # check settings acknowledgement - assert self.rfile.read(9) == codecs.decode('000000040100000000', 'hex_codec') - self.wfile.write("OK") - self.wfile.flush() - self.rfile.safe_read(9) # just to keep the connection alive a bit longer - - ssl = True - - def test_apply_settings(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - c.convert_to_ssl() - protocol = HTTP2Protocol(c) - - protocol._apply_settings({ - SettingsFrame.ENABLE_PUSH: 'foo', - SettingsFrame.MAX_CONCURRENT_STREAMS: 'bar', - SettingsFrame.INITIAL_WINDOW_SIZE: 'deadbeef', - }) - - assert c.rfile.safe_read(2) == b"OK" - - assert protocol.http2_settings[ - SettingsFrame.ENABLE_PUSH] == 'foo' - assert protocol.http2_settings[ - SettingsFrame.MAX_CONCURRENT_STREAMS] == 'bar' - assert protocol.http2_settings[ - SettingsFrame.INITIAL_WINDOW_SIZE] == 'deadbeef' - - -class TestCreateHeaders(object): - c = tcp.TCPClient(("127.0.0.1", 0)) - - def test_create_headers(self): - headers = http.Headers([ - (b':method', b'GET'), - (b':path', b'index.html'), - (b':scheme', b'https'), - (b'foo', b'bar')]) - - bytes = HTTP2Protocol(self.c)._create_headers( - headers, 1, end_stream=True) - assert b''.join(bytes) ==\ - codecs.decode('000014010500000001824488355217caf3a69a3f87408294e7838c767f', 'hex_codec') - - bytes = HTTP2Protocol(self.c)._create_headers( - headers, 1, end_stream=False) - assert b''.join(bytes) ==\ - codecs.decode('000014010400000001824488355217caf3a69a3f87408294e7838c767f', 'hex_codec') - - def test_create_headers_multiple_frames(self): - headers = http.Headers([ - (b':method', b'GET'), - (b':path', b'/'), - (b':scheme', b'https'), - (b'foo', b'bar'), - (b'server', b'version')]) - - protocol = HTTP2Protocol(self.c) - protocol.http2_settings[SettingsFrame.MAX_FRAME_SIZE] = 8 - bytes = protocol._create_headers(headers, 1, end_stream=True) - assert len(bytes) == 3 - assert bytes[0] == codecs.decode('000008010100000001828487408294e783', 'hex_codec') - assert bytes[1] == codecs.decode('0000080900000000018c767f7685ee5b10', 'hex_codec') - assert bytes[2] == codecs.decode('00000209040000000163d5', 'hex_codec') - - -class TestCreateBody(object): - c = tcp.TCPClient(("127.0.0.1", 0)) - - def test_create_body_empty(self): - protocol = HTTP2Protocol(self.c) - bytes = protocol._create_body(b'', 1) - assert b''.join(bytes) == b'' - - def test_create_body_single_frame(self): - protocol = HTTP2Protocol(self.c) - bytes = protocol._create_body(b'foobar', 1) - assert b''.join(bytes) == codecs.decode('000006000100000001666f6f626172', 'hex_codec') - - def test_create_body_multiple_frames(self): - protocol = HTTP2Protocol(self.c) - protocol.http2_settings[SettingsFrame.MAX_FRAME_SIZE] = 5 - bytes = protocol._create_body(b'foobarmehm42', 1) - assert len(bytes) == 3 - assert bytes[0] == codecs.decode('000005000000000001666f6f6261', 'hex_codec') - assert bytes[1] == codecs.decode('000005000000000001726d65686d', 'hex_codec') - assert bytes[2] == codecs.decode('0000020001000000013432', 'hex_codec') - - -class TestReadRequest(tservers.ServerTestBase): - class handler(tcp.BaseHandler): - - def handle(self): - self.wfile.write( - codecs.decode('000003010400000001828487', 'hex_codec')) - self.wfile.write( - codecs.decode('000006000100000001666f6f626172', 'hex_codec')) - self.wfile.flush() - self.rfile.safe_read(9) # just to keep the connection alive a bit longer - - ssl = True - - def test_read_request(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - c.convert_to_ssl() - protocol = HTTP2Protocol(c, is_server=True) - protocol.connection_preface_performed = True - - req = protocol.read_request(NotImplemented) - - assert req.stream_id - assert req.headers.fields == [[b':method', b'GET'], [b':path', b'/'], [b':scheme', b'https']] - assert req.content == b'foobar' - - -class TestReadRequestRelative(tservers.ServerTestBase): - class handler(tcp.BaseHandler): - def handle(self): - self.wfile.write( - codecs.decode('00000c0105000000014287d5af7e4d5a777f4481f9', 'hex_codec')) - self.wfile.flush() - - ssl = True - - def test_asterisk_form_in(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - c.convert_to_ssl() - protocol = HTTP2Protocol(c, is_server=True) - protocol.connection_preface_performed = True - - req = protocol.read_request(NotImplemented) - - assert req.form_in == "relative" - assert req.method == "OPTIONS" - assert req.path == "*" - - -class TestReadRequestAbsolute(tservers.ServerTestBase): - class handler(tcp.BaseHandler): - def handle(self): - self.wfile.write( - codecs.decode('00001901050000000182448d9d29aee30c0e492c2a1170426366871c92585422e085', 'hex_codec')) - self.wfile.flush() - - ssl = True - - def test_absolute_form_in(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - c.convert_to_ssl() - protocol = HTTP2Protocol(c, is_server=True) - protocol.connection_preface_performed = True - - req = protocol.read_request(NotImplemented) - - assert req.form_in == "absolute" - assert req.scheme == "http" - assert req.host == "address" - assert req.port == 22 - - -class TestReadRequestConnect(tservers.ServerTestBase): - class handler(tcp.BaseHandler): - def handle(self): - self.wfile.write( - codecs.decode('00001b0105000000014287bdab4e9c17b7ff44871c92585422e08541871c92585422e085', 'hex_codec')) - self.wfile.write( - codecs.decode('00001d0105000000014287bdab4e9c17b7ff44882f91d35d055c87a741882f91d35d055c87a7', 'hex_codec')) - self.wfile.flush() - - ssl = True - - def test_connect(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - c.convert_to_ssl() - protocol = HTTP2Protocol(c, is_server=True) - protocol.connection_preface_performed = True - - req = protocol.read_request(NotImplemented) - assert req.form_in == "authority" - assert req.method == "CONNECT" - assert req.host == "address" - assert req.port == 22 - - req = protocol.read_request(NotImplemented) - assert req.form_in == "authority" - assert req.method == "CONNECT" - assert req.host == "example.com" - assert req.port == 443 - - -class TestReadResponse(tservers.ServerTestBase): - class handler(tcp.BaseHandler): - def handle(self): - self.wfile.write( - codecs.decode('00000801040000002a88628594e78c767f', 'hex_codec')) - self.wfile.write( - codecs.decode('00000600010000002a666f6f626172', 'hex_codec')) - self.wfile.flush() - self.rfile.safe_read(9) # just to keep the connection alive a bit longer - - ssl = True - - def test_read_response(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - c.convert_to_ssl() - protocol = HTTP2Protocol(c) - protocol.connection_preface_performed = True - - resp = protocol.read_response(NotImplemented, stream_id=42) - - assert resp.http_version == "HTTP/2.0" - assert resp.status_code == 200 - assert resp.msg == '' - assert resp.headers.fields == [[b':status', b'200'], [b'etag', b'foobar']] - assert resp.content == b'foobar' - assert resp.timestamp_end - - -class TestReadEmptyResponse(tservers.ServerTestBase): - class handler(tcp.BaseHandler): - def handle(self): - self.wfile.write( - codecs.decode('00000801050000002a88628594e78c767f', 'hex_codec')) - self.wfile.flush() - - ssl = True - - def test_read_empty_response(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - c.convert_to_ssl() - protocol = HTTP2Protocol(c) - protocol.connection_preface_performed = True - - resp = protocol.read_response(NotImplemented, stream_id=42) - - assert resp.stream_id == 42 - assert resp.http_version == "HTTP/2.0" - assert resp.status_code == 200 - assert resp.msg == '' - assert resp.headers.fields == [[b':status', b'200'], [b'etag', b'foobar']] - assert resp.content == b'' - - -class TestAssembleRequest(object): - c = tcp.TCPClient(("127.0.0.1", 0)) - - def test_request_simple(self): - bytes = HTTP2Protocol(self.c).assemble_request(http.Request( - b'', - b'GET', - b'https', - b'', - b'', - b'/', - b"HTTP/2.0", - None, - None, - )) - assert len(bytes) == 1 - assert bytes[0] == codecs.decode('00000d0105000000018284874188089d5c0b8170dc07', 'hex_codec') - - def test_request_with_stream_id(self): - req = http.Request( - b'', - b'GET', - b'https', - b'', - b'', - b'/', - b"HTTP/2.0", - None, - None, - ) - req.stream_id = 0x42 - bytes = HTTP2Protocol(self.c).assemble_request(req) - assert len(bytes) == 1 - assert bytes[0] == codecs.decode('00000d0105000000428284874188089d5c0b8170dc07', 'hex_codec') - - def test_request_with_body(self): - bytes = HTTP2Protocol(self.c).assemble_request(http.Request( - b'', - b'GET', - b'https', - b'', - b'', - b'/', - b"HTTP/2.0", - http.Headers([(b'foo', b'bar')]), - b'foobar', - )) - assert len(bytes) == 2 - assert bytes[0] ==\ - codecs.decode('0000150104000000018284874188089d5c0b8170dc07408294e7838c767f', 'hex_codec') - assert bytes[1] ==\ - codecs.decode('000006000100000001666f6f626172', 'hex_codec') - - -class TestAssembleResponse(object): - c = tcp.TCPClient(("127.0.0.1", 0)) - - def test_simple(self): - bytes = HTTP2Protocol(self.c, is_server=True).assemble_response(http.Response( - b"HTTP/2.0", - 200, - )) - assert len(bytes) == 1 - assert bytes[0] ==\ - codecs.decode('00000101050000000288', 'hex_codec') - - def test_with_stream_id(self): - resp = http.Response( - b"HTTP/2.0", - 200, - ) - resp.stream_id = 0x42 - bytes = HTTP2Protocol(self.c, is_server=True).assemble_response(resp) - assert len(bytes) == 1 - assert bytes[0] ==\ - codecs.decode('00000101050000004288', 'hex_codec') - - def test_with_body(self): - bytes = HTTP2Protocol(self.c, is_server=True).assemble_response(http.Response( - b"HTTP/2.0", - 200, - b'', - http.Headers(foo=b"bar"), - b'foobar' - )) - assert len(bytes) == 2 - assert bytes[0] ==\ - codecs.decode('00000901040000000288408294e7838c767f', 'hex_codec') - assert bytes[1] ==\ - codecs.decode('000006000100000002666f6f626172', 'hex_codec') diff --git a/netlib/test/http/test_authentication.py b/netlib/test/http/test_authentication.py deleted file mode 100644 index 1df7cd9c..00000000 --- a/netlib/test/http/test_authentication.py +++ /dev/null @@ -1,122 +0,0 @@ -import binascii - -from netlib import tutils -from netlib.http import authentication, Headers - - -def test_parse_http_basic_auth(): - vals = ("basic", "foo", "bar") - assert authentication.parse_http_basic_auth( - authentication.assemble_http_basic_auth(*vals) - ) == vals - assert not authentication.parse_http_basic_auth("") - assert not authentication.parse_http_basic_auth("foo bar") - v = "basic " + binascii.b2a_base64(b"foo").decode("ascii") - assert not authentication.parse_http_basic_auth(v) - - -class TestPassManNonAnon: - - def test_simple(self): - p = authentication.PassManNonAnon() - assert not p.test("", "") - assert p.test("user", "") - - -class TestPassManHtpasswd: - - def test_file_errors(self): - tutils.raises( - "malformed htpasswd file", - authentication.PassManHtpasswd, - tutils.test_data.path("data/server.crt")) - - def test_simple(self): - pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd")) - - vals = ("basic", "test", "test") - authentication.assemble_http_basic_auth(*vals) - assert pm.test("test", "test") - assert not pm.test("test", "foo") - assert not pm.test("foo", "test") - assert not pm.test("test", "") - assert not pm.test("", "") - - -class TestPassManSingleUser: - - def test_simple(self): - pm = authentication.PassManSingleUser("test", "test") - assert pm.test("test", "test") - assert not pm.test("test", "foo") - assert not pm.test("foo", "test") - - -class TestNullProxyAuth: - - def test_simple(self): - na = authentication.NullProxyAuth(authentication.PassManNonAnon()) - assert not na.auth_challenge_headers() - assert na.authenticate("foo") - na.clean({}) - - -class TestBasicProxyAuth: - - def test_simple(self): - ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test") - headers = Headers() - assert ba.auth_challenge_headers() - assert not ba.authenticate(headers) - - def test_authenticate_clean(self): - ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test") - - headers = Headers() - vals = ("basic", "foo", "bar") - headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals) - assert ba.authenticate(headers) - - ba.clean(headers) - assert not ba.AUTH_HEADER in headers - - headers[ba.AUTH_HEADER] = "" - assert not ba.authenticate(headers) - - headers[ba.AUTH_HEADER] = "foo" - assert not ba.authenticate(headers) - - vals = ("foo", "foo", "bar") - headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals) - assert not ba.authenticate(headers) - - ba = authentication.BasicProxyAuth(authentication.PassMan(), "test") - vals = ("basic", "foo", "bar") - headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals) - assert not ba.authenticate(headers) - - -class Bunch: - pass - - -class TestAuthAction: - - def test_nonanonymous(self): - m = Bunch() - aa = authentication.NonanonymousAuthAction(None, "authenticator") - aa(None, m, None, None) - assert m.authenticator - - def test_singleuser(self): - m = Bunch() - aa = authentication.SingleuserAuthAction(None, "authenticator") - aa(None, m, "foo:bar", None) - assert m.authenticator - tutils.raises("invalid", aa, None, m, "foo", None) - - def test_httppasswd(self): - m = Bunch() - aa = authentication.HtpasswdAuthAction(None, "authenticator") - aa(None, m, tutils.test_data.path("data/htpasswd"), None) - assert m.authenticator diff --git a/netlib/test/http/test_cookies.py b/netlib/test/http/test_cookies.py deleted file mode 100644 index 34bb64f2..00000000 --- a/netlib/test/http/test_cookies.py +++ /dev/null @@ -1,218 +0,0 @@ -from netlib.http import cookies - - -def test_read_token(): - tokens = [ - [("foo", 0), ("foo", 3)], - [("foo", 1), ("oo", 3)], - [(" foo", 1), ("foo", 4)], - [(" foo;", 1), ("foo", 4)], - [(" foo=", 1), ("foo", 4)], - [(" foo=bar", 1), ("foo", 4)], - ] - for q, a in tokens: - assert cookies._read_token(*q) == a - - -def test_read_quoted_string(): - tokens = [ - [('"foo" x', 0), ("foo", 5)], - [('"f\oo" x', 0), ("foo", 6)], - [(r'"f\\o" x', 0), (r"f\o", 6)], - [(r'"f\\" x', 0), (r"f" + '\\', 5)], - [('"fo\\\"" x', 0), ("fo\"", 6)], - [('"foo" x', 7), ("", 8)], - ] - for q, a in tokens: - assert cookies._read_quoted_string(*q) == a - - -def test_read_pairs(): - vals = [ - [ - "one", - [["one", None]] - ], - [ - "one=two", - [["one", "two"]] - ], - [ - "one=", - [["one", ""]] - ], - [ - 'one="two"', - [["one", "two"]] - ], - [ - 'one="two"; three=four', - [["one", "two"], ["three", "four"]] - ], - [ - 'one="two"; three=four; five', - [["one", "two"], ["three", "four"], ["five", None]] - ], - [ - 'one="\\"two"; three=four', - [["one", '"two'], ["three", "four"]] - ], - ] - for s, lst in vals: - ret, off = cookies._read_pairs(s) - assert ret == lst - - -def test_pairs_roundtrips(): - pairs = [ - [ - "", - [] - ], - [ - "one=uno", - [["one", "uno"]] - ], - [ - "one", - [["one", None]] - ], - [ - "one=uno; two=due", - [["one", "uno"], ["two", "due"]] - ], - [ - 'one="uno"; two="\due"', - [["one", "uno"], ["two", "due"]] - ], - [ - 'one="un\\"o"', - [["one", 'un"o']] - ], - [ - 'one="uno,due"', - [["one", 'uno,due']] - ], - [ - "one=uno; two; three=tre", - [["one", "uno"], ["two", None], ["three", "tre"]] - ], - [ - "_lvs2=zHai1+Hq+Tc2vmc2r4GAbdOI5Jopg3EwsdUT9g=; " - "_rcc2=53VdltWl+Ov6ordflA==;", - [ - ["_lvs2", "zHai1+Hq+Tc2vmc2r4GAbdOI5Jopg3EwsdUT9g="], - ["_rcc2", "53VdltWl+Ov6ordflA=="] - ] - ] - ] - for s, lst in pairs: - ret, off = cookies._read_pairs(s) - assert ret == lst - s2 = cookies._format_pairs(lst) - ret, off = cookies._read_pairs(s2) - assert ret == lst - - -def test_cookie_roundtrips(): - pairs = [ - [ - "one=uno", - [["one", "uno"]] - ], - [ - "one=uno; two=due", - [["one", "uno"], ["two", "due"]] - ], - ] - for s, lst in pairs: - ret = cookies.parse_cookie_header(s) - assert ret.lst == lst - s2 = cookies.format_cookie_header(ret) - ret = cookies.parse_cookie_header(s2) - assert ret.lst == lst - - -def test_parse_set_cookie_pairs(): - pairs = [ - [ - "one=uno", - [ - ["one", "uno"] - ] - ], - [ - "one=un\x20", - [ - ["one", "un\x20"] - ] - ], - [ - "one=uno; foo", - [ - ["one", "uno"], - ["foo", None] - ] - ], - [ - "mun=1.390.f60; " - "expires=sun, 11-oct-2015 12:38:31 gmt; path=/; " - "domain=b.aol.com", - [ - ["mun", "1.390.f60"], - ["expires", "sun, 11-oct-2015 12:38:31 gmt"], - ["path", "/"], - ["domain", "b.aol.com"] - ] - ], - [ - r'rpb=190%3d1%2616726%3d1%2634832%3d1%2634874%3d1; ' - 'domain=.rubiconproject.com; ' - 'expires=mon, 11-may-2015 21:54:57 gmt; ' - 'path=/', - [ - ['rpb', r'190%3d1%2616726%3d1%2634832%3d1%2634874%3d1'], - ['domain', '.rubiconproject.com'], - ['expires', 'mon, 11-may-2015 21:54:57 gmt'], - ['path', '/'] - ] - ], - ] - for s, lst in pairs: - ret = cookies._parse_set_cookie_pairs(s) - assert ret == lst - s2 = cookies._format_set_cookie_pairs(ret) - ret2 = cookies._parse_set_cookie_pairs(s2) - assert ret2 == lst - - -def test_parse_set_cookie_header(): - vals = [ - [ - "", None - ], - [ - ";", None - ], - [ - "one=uno", - ("one", "uno", []) - ], - [ - "one=uno; foo=bar", - ("one", "uno", [["foo", "bar"]]) - ] - ] - for s, expected in vals: - ret = cookies.parse_set_cookie_header(s) - if expected: - assert ret[0] == expected[0] - assert ret[1] == expected[1] - assert ret[2].lst == expected[2] - s2 = cookies.format_set_cookie_header(*ret) - ret2 = cookies.parse_set_cookie_header(s2) - assert ret2[0] == expected[0] - assert ret2[1] == expected[1] - assert ret2[2].lst == expected[2] - else: - assert ret is None diff --git a/netlib/test/http/test_headers.py b/netlib/test/http/test_headers.py deleted file mode 100644 index d50fee3e..00000000 --- a/netlib/test/http/test_headers.py +++ /dev/null @@ -1,152 +0,0 @@ -from netlib.http import Headers -from netlib.tutils import raises - - -class TestHeaders(object): - def _2host(self): - return Headers( - [ - [b"Host", b"example.com"], - [b"host", b"example.org"] - ] - ) - - def test_init(self): - headers = Headers() - assert len(headers) == 0 - - headers = Headers([[b"Host", b"example.com"]]) - assert len(headers) == 1 - assert headers["Host"] == "example.com" - - headers = Headers(Host="example.com") - assert len(headers) == 1 - assert headers["Host"] == "example.com" - - headers = Headers( - [[b"Host", b"invalid"]], - Host="example.com" - ) - assert len(headers) == 1 - assert headers["Host"] == "example.com" - - headers = Headers( - [[b"Host", b"invalid"], [b"Accept", b"text/plain"]], - Host="example.com" - ) - assert len(headers) == 2 - assert headers["Host"] == "example.com" - assert headers["Accept"] == "text/plain" - - with raises(ValueError): - Headers([[b"Host", u"not-bytes"]]) - - def test_getitem(self): - headers = Headers(Host="example.com") - assert headers["Host"] == "example.com" - assert headers["host"] == "example.com" - with raises(KeyError): - _ = headers["Accept"] - - headers = self._2host() - assert headers["Host"] == "example.com, example.org" - - def test_str(self): - headers = Headers(Host="example.com") - assert bytes(headers) == b"Host: example.com\r\n" - - headers = Headers([ - [b"Host", b"example.com"], - [b"Accept", b"text/plain"] - ]) - assert bytes(headers) == b"Host: example.com\r\nAccept: text/plain\r\n" - - headers = Headers() - assert bytes(headers) == b"" - - def test_setitem(self): - headers = Headers() - headers["Host"] = "example.com" - assert "Host" in headers - assert "host" in headers - assert headers["Host"] == "example.com" - - headers["host"] = "example.org" - assert "Host" in headers - assert "host" in headers - assert headers["Host"] == "example.org" - - headers["accept"] = "text/plain" - assert len(headers) == 2 - assert "Accept" in headers - assert "Host" in headers - - headers = self._2host() - assert len(headers.fields) == 2 - headers["Host"] = "example.com" - assert len(headers.fields) == 1 - assert "Host" in headers - - def test_delitem(self): - headers = Headers(Host="example.com") - assert len(headers) == 1 - del headers["host"] - assert len(headers) == 0 - try: - del headers["host"] - except KeyError: - assert True - else: - assert False - - headers = self._2host() - del headers["Host"] - assert len(headers) == 0 - - def test_keys(self): - headers = Headers(Host="example.com") - assert list(headers.keys()) == ["Host"] - - headers = self._2host() - assert list(headers.keys()) == ["Host"] - - def test_eq_ne(self): - headers1 = Headers(Host="example.com") - headers2 = Headers(host="example.com") - assert not (headers1 == headers2) - assert headers1 != headers2 - - headers1 = Headers(Host="example.com") - headers2 = Headers(Host="example.com") - assert headers1 == headers2 - assert not (headers1 != headers2) - - assert headers1 != 42 - - def test_get_all(self): - headers = self._2host() - assert headers.get_all("host") == ["example.com", "example.org"] - assert headers.get_all("accept") == [] - - def test_set_all(self): - headers = Headers(Host="example.com") - headers.set_all("Accept", ["text/plain"]) - assert len(headers) == 2 - assert "accept" in headers - - headers = self._2host() - headers.set_all("Host", ["example.org"]) - assert headers["host"] == "example.org" - - headers.set_all("Host", ["example.org", "example.net"]) - assert headers["host"] == "example.org, example.net" - - def test_state(self): - headers = self._2host() - assert len(headers.get_state()) == 2 - assert headers == Headers.from_state(headers.get_state()) - - headers2 = Headers() - assert headers != headers2 - headers2.set_state(headers.get_state()) - assert headers == headers2 diff --git a/netlib/test/http/test_message.py b/netlib/test/http/test_message.py deleted file mode 100644 index 4b1f4630..00000000 --- a/netlib/test/http/test_message.py +++ /dev/null @@ -1,153 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -from netlib.http import decoded, Headers -from netlib.tutils import tresp, raises - - -def _test_passthrough_attr(message, attr): - assert getattr(message, attr) == getattr(message.data, attr) - setattr(message, attr, "foo") - assert getattr(message.data, attr) == "foo" - - -def _test_decoded_attr(message, attr): - assert getattr(message, attr) == getattr(message.data, attr).decode("utf8") - # Set str, get raw bytes - setattr(message, attr, "foo") - assert getattr(message.data, attr) == b"foo" - # Set raw bytes, get decoded - setattr(message.data, attr, b"BAR") # use uppercase so that we can also cover request.method - assert getattr(message, attr) == "BAR" - # Set bytes, get raw bytes - setattr(message, attr, b"baz") - assert getattr(message.data, attr) == b"baz" - - # Set UTF8 - setattr(message, attr, "Non-Autorisé") - assert getattr(message.data, attr) == b"Non-Autoris\xc3\xa9" - # Don't fail on garbage - setattr(message.data, attr, b"FOO\xFF\x00BAR") - assert getattr(message, attr).startswith("FOO") - assert getattr(message, attr).endswith("BAR") - # foo.bar = foo.bar should not cause any side effects. - d = getattr(message, attr) - setattr(message, attr, d) - assert getattr(message.data, attr) == b"FOO\xFF\x00BAR" - - -class TestMessageData(object): - def test_eq_ne(self): - data = tresp(timestamp_start=42, timestamp_end=42).data - same = tresp(timestamp_start=42, timestamp_end=42).data - assert data == same - assert not data != same - - other = tresp(content=b"foo").data - assert not data == other - assert data != other - - assert data != 0 - - -class TestMessage(object): - - def test_init(self): - resp = tresp() - assert resp.data - - def test_eq_ne(self): - resp = tresp(timestamp_start=42, timestamp_end=42) - same = tresp(timestamp_start=42, timestamp_end=42) - assert resp == same - assert not resp != same - - other = tresp(timestamp_start=0, timestamp_end=0) - assert not resp == other - assert resp != other - - assert resp != 0 - - def test_content_length_update(self): - resp = tresp() - resp.content = b"foo" - assert resp.data.content == b"foo" - assert resp.headers["content-length"] == "3" - resp.content = b"" - assert resp.data.content == b"" - assert resp.headers["content-length"] == "0" - - def test_content_basic(self): - _test_passthrough_attr(tresp(), "content") - - def test_headers(self): - _test_passthrough_attr(tresp(), "headers") - - def test_timestamp_start(self): - _test_passthrough_attr(tresp(), "timestamp_start") - - def test_timestamp_end(self): - _test_passthrough_attr(tresp(), "timestamp_end") - - def teste_http_version(self): - _test_decoded_attr(tresp(), "http_version") - - -class TestDecodedDecorator(object): - - def test_simple(self): - r = tresp() - assert r.content == b"message" - assert "content-encoding" not in r.headers - assert r.encode("gzip") - - assert r.headers["content-encoding"] - assert r.content != b"message" - with decoded(r): - assert "content-encoding" not in r.headers - assert r.content == b"message" - assert r.headers["content-encoding"] - assert r.content != b"message" - - def test_modify(self): - r = tresp() - assert "content-encoding" not in r.headers - assert r.encode("gzip") - - with decoded(r): - r.content = b"foo" - - assert r.content != b"foo" - r.decode() - assert r.content == b"foo" - - def test_unknown_ce(self): - r = tresp() - r.headers["content-encoding"] = "zopfli" - r.content = b"foo" - with decoded(r): - assert r.headers["content-encoding"] - assert r.content == b"foo" - assert r.headers["content-encoding"] - assert r.content == b"foo" - - def test_cannot_decode(self): - r = tresp() - assert r.encode("gzip") - r.content = b"foo" - with decoded(r): - assert r.headers["content-encoding"] - assert r.content == b"foo" - assert r.headers["content-encoding"] - assert r.content != b"foo" - r.decode() - assert r.content == b"foo" - - def test_cannot_encode(self): - r = tresp() - assert r.encode("gzip") - with decoded(r): - r.content = None - - assert "content-encoding" not in r.headers - assert r.content is None diff --git a/netlib/test/http/test_request.py b/netlib/test/http/test_request.py deleted file mode 100644 index 900b2cd1..00000000 --- a/netlib/test/http/test_request.py +++ /dev/null @@ -1,238 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -import six - -from netlib import utils -from netlib.http import Headers -from netlib.odict import ODict -from netlib.tutils import treq, raises -from .test_message import _test_decoded_attr, _test_passthrough_attr - - -class TestRequestData(object): - def test_init(self): - with raises(ValueError if six.PY2 else TypeError): - treq(headers="foobar") - - assert isinstance(treq(headers=None).headers, Headers) - - -class TestRequestCore(object): - """ - Tests for builtins and the attributes that are directly proxied from the data structure - """ - def test_repr(self): - request = treq() - assert repr(request) == "Request(GET address:22/path)" - request.host = None - assert repr(request) == "Request(GET /path)" - - def test_first_line_format(self): - _test_passthrough_attr(treq(), "first_line_format") - - def test_method(self): - _test_decoded_attr(treq(), "method") - - def test_scheme(self): - _test_decoded_attr(treq(), "scheme") - - def test_port(self): - _test_passthrough_attr(treq(), "port") - - def test_path(self): - _test_decoded_attr(treq(), "path") - - def test_host(self): - if six.PY2: - from unittest import SkipTest - raise SkipTest() - - request = treq() - assert request.host == request.data.host.decode("idna") - - # Test IDNA encoding - # Set str, get raw bytes - request.host = "ídna.example" - assert request.data.host == b"xn--dna-qma.example" - # Set raw bytes, get decoded - request.data.host = b"xn--idn-gla.example" - assert request.host == "idná.example" - # Set bytes, get raw bytes - request.host = b"xn--dn-qia9b.example" - assert request.data.host == b"xn--dn-qia9b.example" - # IDNA encoding is not bijective - request.host = "fußball" - assert request.host == "fussball" - - # Don't fail on garbage - request.data.host = b"foo\xFF\x00bar" - assert request.host.startswith("foo") - assert request.host.endswith("bar") - # foo.bar = foo.bar should not cause any side effects. - d = request.host - request.host = d - assert request.data.host == b"foo\xFF\x00bar" - - def test_host_header_update(self): - request = treq() - assert "host" not in request.headers - request.host = "example.com" - assert "host" not in request.headers - - request.headers["Host"] = "foo" - request.host = "example.org" - assert request.headers["Host"] == "example.org" - - -class TestRequestUtils(object): - """ - Tests for additional convenience methods. - """ - def test_url(self): - request = treq() - assert request.url == "http://address:22/path" - - request.url = "https://otheraddress:42/foo" - assert request.scheme == "https" - assert request.host == "otheraddress" - assert request.port == 42 - assert request.path == "/foo" - - with raises(ValueError): - request.url = "not-a-url" - - def test_pretty_host(self): - request = treq() - assert request.pretty_host == "address" - assert request.host == "address" - request.headers["host"] = "other" - assert request.pretty_host == "other" - assert request.host == "address" - request.host = None - assert request.pretty_host is None - assert request.host is None - - # Invalid IDNA - request.headers["host"] = ".disqus.com" - assert request.pretty_host == ".disqus.com" - - def test_pretty_url(self): - request = treq() - assert request.url == "http://address:22/path" - assert request.pretty_url == "http://address:22/path" - request.headers["host"] = "other" - assert request.pretty_url == "http://other:22/path" - - def test_pretty_url_authority(self): - request = treq(first_line_format="authority") - assert request.pretty_url == "address:22" - - def test_get_query(self): - request = treq() - assert request.query is None - - request.url = "http://localhost:80/foo?bar=42" - assert request.query.lst == [("bar", "42")] - - def test_set_query(self): - request = treq() - request.query = ODict([]) - - def test_get_cookies_none(self): - request = treq() - request.headers = Headers() - assert len(request.cookies) == 0 - - def test_get_cookies_single(self): - request = treq() - request.headers = Headers(cookie="cookiename=cookievalue") - result = request.cookies - assert len(result) == 1 - assert result['cookiename'] == ['cookievalue'] - - def test_get_cookies_double(self): - request = treq() - request.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue") - result = request.cookies - assert len(result) == 2 - assert result['cookiename'] == ['cookievalue'] - assert result['othercookiename'] == ['othercookievalue'] - - def test_get_cookies_withequalsign(self): - request = treq() - request.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue") - result = request.cookies - assert len(result) == 2 - assert result['cookiename'] == ['coo=kievalue'] - assert result['othercookiename'] == ['othercookievalue'] - - def test_set_cookies(self): - request = treq() - request.headers = Headers(cookie="cookiename=cookievalue") - result = request.cookies - result["cookiename"] = ["foo"] - request.cookies = result - assert request.cookies["cookiename"] == ["foo"] - - def test_get_path_components(self): - request = treq(path=b"/foo/bar") - assert request.path_components == ["foo", "bar"] - - def test_set_path_components(self): - request = treq() - request.path_components = ["foo", "baz"] - assert request.path == "/foo/baz" - request.path_components = [] - assert request.path == "/" - - def test_anticache(self): - request = treq() - request.headers["If-Modified-Since"] = "foo" - request.headers["If-None-Match"] = "bar" - request.anticache() - assert "If-Modified-Since" not in request.headers - assert "If-None-Match" not in request.headers - - def test_anticomp(self): - request = treq() - request.headers["Accept-Encoding"] = "foobar" - request.anticomp() - assert request.headers["Accept-Encoding"] == "identity" - - def test_constrain_encoding(self): - request = treq() - - h = request.headers.copy() - request.constrain_encoding() # no-op if there is no accept_encoding header. - assert request.headers == h - - request.headers["Accept-Encoding"] = "identity, gzip, foo" - request.constrain_encoding() - assert "foo" not in request.headers["Accept-Encoding"] - assert "gzip" in request.headers["Accept-Encoding"] - - def test_get_urlencoded_form(self): - request = treq(content="foobar") - assert request.urlencoded_form is None - - request.headers["Content-Type"] = "application/x-www-form-urlencoded" - assert request.urlencoded_form == ODict(utils.urldecode(request.content)) - - def test_set_urlencoded_form(self): - request = treq() - request.urlencoded_form = ODict([('foo', 'bar'), ('rab', 'oof')]) - assert request.headers["Content-Type"] == "application/x-www-form-urlencoded" - assert request.content - - def test_get_multipart_form(self): - request = treq(content="foobar") - assert request.multipart_form is None - - request.headers["Content-Type"] = "multipart/form-data" - assert request.multipart_form == ODict( - utils.multipartdecode( - request.headers, - request.content - ) - ) diff --git a/netlib/test/http/test_response.py b/netlib/test/http/test_response.py deleted file mode 100644 index 14588000..00000000 --- a/netlib/test/http/test_response.py +++ /dev/null @@ -1,102 +0,0 @@ -from __future__ import absolute_import, print_function, division - -import six - -from netlib.http import Headers -from netlib.odict import ODict, ODictCaseless -from netlib.tutils import raises, tresp -from .test_message import _test_passthrough_attr, _test_decoded_attr - - -class TestResponseData(object): - def test_init(self): - with raises(ValueError if six.PY2 else TypeError): - tresp(headers="foobar") - - assert isinstance(tresp(headers=None).headers, Headers) - - -class TestResponseCore(object): - """ - Tests for builtins and the attributes that are directly proxied from the data structure - """ - def test_repr(self): - response = tresp() - assert repr(response) == "Response(200 OK, unknown content type, 7B)" - response.content = None - assert repr(response) == "Response(200 OK, no content)" - - def test_status_code(self): - _test_passthrough_attr(tresp(), "status_code") - - def test_reason(self): - _test_decoded_attr(tresp(), "reason") - - -class TestResponseUtils(object): - """ - Tests for additional convenience methods. - """ - def test_get_cookies_none(self): - resp = tresp() - resp.headers = Headers() - assert not resp.cookies - - def test_get_cookies_empty(self): - resp = tresp() - resp.headers = Headers(set_cookie="") - assert not resp.cookies - - def test_get_cookies_simple(self): - resp = tresp() - resp.headers = Headers(set_cookie="cookiename=cookievalue") - result = resp.cookies - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0] == ["cookievalue", ODict()] - - def test_get_cookies_with_parameters(self): - resp = tresp() - resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly") - result = resp.cookies - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0][0] == "cookievalue" - attrs = result["cookiename"][0][1] - assert len(attrs) == 4 - assert attrs["domain"] == ["example.com"] - assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"] - assert attrs["path"] == ["/"] - assert attrs["httponly"] == [None] - - def test_get_cookies_no_value(self): - resp = tresp() - resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/") - result = resp.cookies - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0][0] == "" - assert len(result["cookiename"][0][1]) == 2 - - def test_get_cookies_twocookies(self): - resp = tresp() - resp.headers = Headers([ - [b"Set-Cookie", b"cookiename=cookievalue"], - [b"Set-Cookie", b"othercookie=othervalue"] - ]) - result = resp.cookies - assert len(result) == 2 - assert "cookiename" in result - assert result["cookiename"][0] == ["cookievalue", ODict()] - assert "othercookie" in result - assert result["othercookie"][0] == ["othervalue", ODict()] - - def test_set_cookies(self): - resp = tresp() - v = resp.cookies - v.add("foo", ["bar", ODictCaseless()]) - resp.set_cookies(v) - - v = resp.cookies - assert len(v) == 1 - assert v["foo"] == [["bar", ODictCaseless()]] diff --git a/netlib/test/http/test_status_codes.py b/netlib/test/http/test_status_codes.py deleted file mode 100644 index 9fea6b70..00000000 --- a/netlib/test/http/test_status_codes.py +++ /dev/null @@ -1,6 +0,0 @@ -from netlib.http import status_codes - - -def test_simple(): - assert status_codes.IM_A_TEAPOT == 418 - assert status_codes.RESPONSES[418] == "I'm a teapot" diff --git a/netlib/test/http/test_user_agents.py b/netlib/test/http/test_user_agents.py deleted file mode 100644 index 0bf1bba7..00000000 --- a/netlib/test/http/test_user_agents.py +++ /dev/null @@ -1,6 +0,0 @@ -from netlib.http import user_agents - - -def test_get_shortcut(): - assert user_agents.get_by_shortcut("c")[0] == "chrome" - assert not user_agents.get_by_shortcut("_") |