aboutsummaryrefslogtreecommitdiffstats
path: root/test/netlib/http
diff options
context:
space:
mode:
Diffstat (limited to 'test/netlib/http')
-rw-r--r--test/netlib/http/__init__.py0
-rw-r--r--test/netlib/http/http1/__init__.py0
-rw-r--r--test/netlib/http/http1/test_assemble.py102
-rw-r--r--test/netlib/http/http1/test_read.py333
-rw-r--r--test/netlib/http/http2/__init__.py0
-rw-r--r--test/netlib/http/http2/test_connections.py540
-rw-r--r--test/netlib/http/test_authentication.py122
-rw-r--r--test/netlib/http/test_cookies.py218
-rw-r--r--test/netlib/http/test_headers.py152
-rw-r--r--test/netlib/http/test_message.py153
-rw-r--r--test/netlib/http/test_request.py238
-rw-r--r--test/netlib/http/test_response.py102
-rw-r--r--test/netlib/http/test_status_codes.py6
-rw-r--r--test/netlib/http/test_user_agents.py6
14 files changed, 1972 insertions, 0 deletions
diff --git a/test/netlib/http/__init__.py b/test/netlib/http/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/netlib/http/__init__.py
diff --git a/test/netlib/http/http1/__init__.py b/test/netlib/http/http1/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/netlib/http/http1/__init__.py
diff --git a/test/netlib/http/http1/test_assemble.py b/test/netlib/http/http1/test_assemble.py
new file mode 100644
index 00000000..31a62438
--- /dev/null
+++ b/test/netlib/http/http1/test_assemble.py
@@ -0,0 +1,102 @@
+from __future__ import absolute_import, print_function, division
+from netlib.exceptions import HttpException
+from netlib.http import CONTENT_MISSING, Headers
+from netlib.http.http1.assemble import (
+ assemble_request, assemble_request_head, assemble_response,
+ assemble_response_head, _assemble_request_line, _assemble_request_headers,
+ _assemble_response_headers,
+ assemble_body)
+from netlib.tutils import treq, raises, tresp
+
+
+def test_assemble_request():
+ c = assemble_request(treq()) == (
+ b"GET /path HTTP/1.1\r\n"
+ b"header: qvalue\r\n"
+ b"Host: address:22\r\n"
+ b"Content-Length: 7\r\n"
+ b"\r\n"
+ b"content"
+ )
+
+ with raises(HttpException):
+ assemble_request(treq(content=CONTENT_MISSING))
+
+
+def test_assemble_request_head():
+ c = assemble_request_head(treq(content="foo"))
+ assert b"GET" in c
+ assert b"qvalue" in c
+ assert b"content-length" in c
+ assert b"foo" not in c
+
+
+def test_assemble_response():
+ c = assemble_response(tresp()) == (
+ b"HTTP/1.1 200 OK\r\n"
+ b"header-response: svalue\r\n"
+ b"Content-Length: 7\r\n"
+ b"\r\n"
+ b"message"
+ )
+
+ with raises(HttpException):
+ assemble_response(tresp(content=CONTENT_MISSING))
+
+
+def test_assemble_response_head():
+ c = assemble_response_head(tresp())
+ assert b"200" in c
+ assert b"svalue" in c
+ assert b"message" not in c
+
+
+def test_assemble_body():
+ c = list(assemble_body(Headers(), [b"body"]))
+ assert c == [b"body"]
+
+ c = list(assemble_body(Headers(transfer_encoding="chunked"), [b"123456789a", b""]))
+ assert c == [b"a\r\n123456789a\r\n", b"0\r\n\r\n"]
+
+ c = list(assemble_body(Headers(transfer_encoding="chunked"), [b"123456789a"]))
+ assert c == [b"a\r\n123456789a\r\n", b"0\r\n\r\n"]
+
+
+def test_assemble_request_line():
+ assert _assemble_request_line(treq().data) == b"GET /path HTTP/1.1"
+
+ authority_request = treq(method=b"CONNECT", first_line_format="authority").data
+ assert _assemble_request_line(authority_request) == b"CONNECT address:22 HTTP/1.1"
+
+ absolute_request = treq(first_line_format="absolute").data
+ assert _assemble_request_line(absolute_request) == b"GET http://address:22/path HTTP/1.1"
+
+ with raises(RuntimeError):
+ _assemble_request_line(treq(first_line_format="invalid_form").data)
+
+
+def test_assemble_request_headers():
+ # https://github.com/mitmproxy/mitmproxy/issues/186
+ r = treq(content=b"")
+ r.headers["Transfer-Encoding"] = "chunked"
+ c = _assemble_request_headers(r.data)
+ assert b"Transfer-Encoding" in c
+
+
+def test_assemble_request_headers_host_header():
+ r = treq()
+ r.headers = Headers()
+ c = _assemble_request_headers(r.data)
+ assert b"host" in c
+
+ r.host = None
+ c = _assemble_request_headers(r.data)
+ assert b"host" not in c
+
+
+def test_assemble_response_headers():
+ # https://github.com/mitmproxy/mitmproxy/issues/186
+ r = tresp(content=b"")
+ r.headers["Transfer-Encoding"] = "chunked"
+ c = _assemble_response_headers(r)
+ assert b"Transfer-Encoding" in c
diff --git a/test/netlib/http/http1/test_read.py b/test/netlib/http/http1/test_read.py
new file mode 100644
index 00000000..90234070
--- /dev/null
+++ b/test/netlib/http/http1/test_read.py
@@ -0,0 +1,333 @@
+from __future__ import absolute_import, print_function, division
+from io import BytesIO
+import textwrap
+from mock import Mock
+from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect, TcpDisconnect
+from netlib.http import Headers
+from netlib.http.http1.read import (
+ read_request, read_response, read_request_head,
+ read_response_head, read_body, connection_close, expected_http_body_size, _get_first_line,
+ _read_request_line, _parse_authority_form, _read_response_line, _check_http_version,
+ _read_headers, _read_chunked
+)
+from netlib.tutils import treq, tresp, raises
+
+
+def test_read_request():
+ rfile = BytesIO(b"GET / HTTP/1.1\r\n\r\nskip")
+ r = read_request(rfile)
+ assert r.method == "GET"
+ assert r.content == b""
+ assert r.timestamp_end
+ assert rfile.read() == b"skip"
+
+
+def test_read_request_head():
+ rfile = BytesIO(
+ b"GET / HTTP/1.1\r\n"
+ b"Content-Length: 4\r\n"
+ b"\r\n"
+ b"skip"
+ )
+ rfile.reset_timestamps = Mock()
+ rfile.first_byte_timestamp = 42
+ r = read_request_head(rfile)
+ assert r.method == "GET"
+ assert r.headers["Content-Length"] == "4"
+ assert r.content is None
+ assert rfile.reset_timestamps.called
+ assert r.timestamp_start == 42
+ assert rfile.read() == b"skip"
+
+
+def test_read_response():
+ req = treq()
+ rfile = BytesIO(b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody")
+ r = read_response(rfile, req)
+ assert r.status_code == 418
+ assert r.content == b"body"
+ assert r.timestamp_end
+
+
+def test_read_response_head():
+ rfile = BytesIO(
+ b"HTTP/1.1 418 I'm a teapot\r\n"
+ b"Content-Length: 4\r\n"
+ b"\r\n"
+ b"skip"
+ )
+ rfile.reset_timestamps = Mock()
+ rfile.first_byte_timestamp = 42
+ r = read_response_head(rfile)
+ assert r.status_code == 418
+ assert r.headers["Content-Length"] == "4"
+ assert r.content is None
+ assert rfile.reset_timestamps.called
+ assert r.timestamp_start == 42
+ assert rfile.read() == b"skip"
+
+
+class TestReadBody(object):
+ def test_chunked(self):
+ rfile = BytesIO(b"3\r\nfoo\r\n0\r\n\r\nbar")
+ body = b"".join(read_body(rfile, None))
+ assert body == b"foo"
+ assert rfile.read() == b"bar"
+
+ def test_known_size(self):
+ rfile = BytesIO(b"foobar")
+ body = b"".join(read_body(rfile, 3))
+ assert body == b"foo"
+ assert rfile.read() == b"bar"
+
+ def test_known_size_limit(self):
+ rfile = BytesIO(b"foobar")
+ with raises(HttpException):
+ b"".join(read_body(rfile, 3, 2))
+
+ def test_known_size_too_short(self):
+ rfile = BytesIO(b"foo")
+ with raises(HttpException):
+ b"".join(read_body(rfile, 6))
+
+ def test_unknown_size(self):
+ rfile = BytesIO(b"foobar")
+ body = b"".join(read_body(rfile, -1))
+ assert body == b"foobar"
+
+ def test_unknown_size_limit(self):
+ rfile = BytesIO(b"foobar")
+ with raises(HttpException):
+ b"".join(read_body(rfile, -1, 3))
+
+ def test_max_chunk_size(self):
+ rfile = BytesIO(b"123456")
+ assert list(read_body(rfile, -1, max_chunk_size=None)) == [b"123456"]
+ rfile = BytesIO(b"123456")
+ assert list(read_body(rfile, -1, max_chunk_size=1)) == [b"1", b"2", b"3", b"4", b"5", b"6"]
+
+def test_connection_close():
+ headers = Headers()
+ assert connection_close(b"HTTP/1.0", headers)
+ assert not connection_close(b"HTTP/1.1", headers)
+
+ headers["connection"] = "keep-alive"
+ assert not connection_close(b"HTTP/1.1", headers)
+
+ headers["connection"] = "close"
+ assert connection_close(b"HTTP/1.1", headers)
+
+ headers["connection"] = "foobar"
+ assert connection_close(b"HTTP/1.0", headers)
+ assert not connection_close(b"HTTP/1.1", headers)
+
+def test_expected_http_body_size():
+ # Expect: 100-continue
+ assert expected_http_body_size(
+ treq(headers=Headers(expect="100-continue", content_length="42"))
+ ) == 0
+
+ # http://tools.ietf.org/html/rfc7230#section-3.3
+ assert expected_http_body_size(
+ treq(method=b"HEAD"),
+ tresp(headers=Headers(content_length="42"))
+ ) == 0
+ assert expected_http_body_size(
+ treq(method=b"CONNECT"),
+ tresp()
+ ) == 0
+ for code in (100, 204, 304):
+ assert expected_http_body_size(
+ treq(),
+ tresp(status_code=code)
+ ) == 0
+
+ # chunked
+ assert expected_http_body_size(
+ treq(headers=Headers(transfer_encoding="chunked")),
+ ) is None
+
+ # explicit length
+ for val in (b"foo", b"-7"):
+ with raises(HttpSyntaxException):
+ expected_http_body_size(
+ treq(headers=Headers(content_length=val))
+ )
+ assert expected_http_body_size(
+ treq(headers=Headers(content_length="42"))
+ ) == 42
+
+ # no length
+ assert expected_http_body_size(
+ treq(headers=Headers())
+ ) == 0
+ assert expected_http_body_size(
+ treq(headers=Headers()), tresp(headers=Headers())
+ ) == -1
+
+
+def test_get_first_line():
+ rfile = BytesIO(b"foo\r\nbar")
+ assert _get_first_line(rfile) == b"foo"
+
+ rfile = BytesIO(b"\r\nfoo\r\nbar")
+ assert _get_first_line(rfile) == b"foo"
+
+ with raises(HttpReadDisconnect):
+ rfile = BytesIO(b"")
+ _get_first_line(rfile)
+
+ with raises(HttpReadDisconnect):
+ rfile = Mock()
+ rfile.readline.side_effect = TcpDisconnect
+ _get_first_line(rfile)
+
+
+def test_read_request_line():
+ def t(b):
+ return _read_request_line(BytesIO(b))
+
+ assert (t(b"GET / HTTP/1.1") ==
+ ("relative", b"GET", None, None, None, b"/", b"HTTP/1.1"))
+ assert (t(b"OPTIONS * HTTP/1.1") ==
+ ("relative", b"OPTIONS", None, None, None, b"*", b"HTTP/1.1"))
+ assert (t(b"CONNECT foo:42 HTTP/1.1") ==
+ ("authority", b"CONNECT", None, b"foo", 42, None, b"HTTP/1.1"))
+ assert (t(b"GET http://foo:42/bar HTTP/1.1") ==
+ ("absolute", b"GET", b"http", b"foo", 42, b"/bar", b"HTTP/1.1"))
+
+ with raises(HttpSyntaxException):
+ t(b"GET / WTF/1.1")
+ with raises(HttpSyntaxException):
+ t(b"this is not http")
+ with raises(HttpReadDisconnect):
+ t(b"")
+
+def test_parse_authority_form():
+ assert _parse_authority_form(b"foo:42") == (b"foo", 42)
+ with raises(HttpSyntaxException):
+ _parse_authority_form(b"foo")
+ with raises(HttpSyntaxException):
+ _parse_authority_form(b"foo:bar")
+ with raises(HttpSyntaxException):
+ _parse_authority_form(b"foo:99999999")
+ with raises(HttpSyntaxException):
+ _parse_authority_form(b"f\x00oo:80")
+
+
+def test_read_response_line():
+ def t(b):
+ return _read_response_line(BytesIO(b))
+
+ assert t(b"HTTP/1.1 200 OK") == (b"HTTP/1.1", 200, b"OK")
+ assert t(b"HTTP/1.1 200") == (b"HTTP/1.1", 200, b"")
+
+ # https://github.com/mitmproxy/mitmproxy/issues/784
+ assert t(b"HTTP/1.1 200 Non-Autoris\xc3\xa9") == (b"HTTP/1.1", 200, b"Non-Autoris\xc3\xa9")
+
+ with raises(HttpSyntaxException):
+ assert t(b"HTTP/1.1")
+
+ with raises(HttpSyntaxException):
+ t(b"HTTP/1.1 OK OK")
+ with raises(HttpSyntaxException):
+ t(b"WTF/1.1 200 OK")
+ with raises(HttpReadDisconnect):
+ t(b"")
+
+
+def test_check_http_version():
+ _check_http_version(b"HTTP/0.9")
+ _check_http_version(b"HTTP/1.0")
+ _check_http_version(b"HTTP/1.1")
+ _check_http_version(b"HTTP/2.0")
+ with raises(HttpSyntaxException):
+ _check_http_version(b"WTF/1.0")
+ with raises(HttpSyntaxException):
+ _check_http_version(b"HTTP/1.10")
+ with raises(HttpSyntaxException):
+ _check_http_version(b"HTTP/1.b")
+
+
+class TestReadHeaders(object):
+ @staticmethod
+ def _read(data):
+ return _read_headers(BytesIO(data))
+
+ def test_read_simple(self):
+ data = (
+ b"Header: one\r\n"
+ b"Header2: two\r\n"
+ b"\r\n"
+ )
+ headers = self._read(data)
+ assert headers.fields == [[b"Header", b"one"], [b"Header2", b"two"]]
+
+ def test_read_multi(self):
+ data = (
+ b"Header: one\r\n"
+ b"Header: two\r\n"
+ b"\r\n"
+ )
+ headers = self._read(data)
+ assert headers.fields == [[b"Header", b"one"], [b"Header", b"two"]]
+
+ def test_read_continued(self):
+ data = (
+ b"Header: one\r\n"
+ b"\ttwo\r\n"
+ b"Header2: three\r\n"
+ b"\r\n"
+ )
+ headers = self._read(data)
+ assert headers.fields == [[b"Header", b"one\r\n two"], [b"Header2", b"three"]]
+
+ def test_read_continued_err(self):
+ data = b"\tfoo: bar\r\n"
+ with raises(HttpSyntaxException):
+ self._read(data)
+
+ def test_read_err(self):
+ data = b"foo"
+ with raises(HttpSyntaxException):
+ self._read(data)
+
+ def test_read_empty_name(self):
+ data = b":foo"
+ with raises(HttpSyntaxException):
+ self._read(data)
+
+ def test_read_empty_value(self):
+ data = b"bar:"
+ headers = self._read(data)
+ assert headers.fields == [[b"bar", b""]]
+
+def test_read_chunked():
+ req = treq(content=None)
+ req.headers["Transfer-Encoding"] = "chunked"
+
+ data = b"1\r\na\r\n0\r\n"
+ with raises(HttpSyntaxException):
+ b"".join(_read_chunked(BytesIO(data)))
+
+ data = b"1\r\na\r\n0\r\n\r\n"
+ assert b"".join(_read_chunked(BytesIO(data))) == b"a"
+
+ data = b"\r\n\r\n1\r\na\r\n1\r\nb\r\n0\r\n\r\n"
+ assert b"".join(_read_chunked(BytesIO(data))) == b"ab"
+
+ data = b"\r\n"
+ with raises("closed prematurely"):
+ b"".join(_read_chunked(BytesIO(data)))
+
+ data = b"1\r\nfoo"
+ with raises("malformed chunked body"):
+ b"".join(_read_chunked(BytesIO(data)))
+
+ data = b"foo\r\nfoo"
+ with raises(HttpSyntaxException):
+ b"".join(_read_chunked(BytesIO(data)))
+
+ data = b"5\r\naaaaa\r\n0\r\n\r\n"
+ with raises("too large"):
+ b"".join(_read_chunked(BytesIO(data), limit=2))
diff --git a/test/netlib/http/http2/__init__.py b/test/netlib/http/http2/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/netlib/http/http2/__init__.py
diff --git a/test/netlib/http/http2/test_connections.py b/test/netlib/http/http2/test_connections.py
new file mode 100644
index 00000000..a115fc7c
--- /dev/null
+++ b/test/netlib/http/http2/test_connections.py
@@ -0,0 +1,540 @@
+import OpenSSL
+import mock
+import codecs
+
+from hyperframe.frame import *
+
+from netlib import tcp, http, utils, tservers
+from netlib.tutils import raises
+from netlib.exceptions import TcpDisconnect
+from netlib.http.http2.connections import HTTP2Protocol, TCPHandler
+
+
+class TestTCPHandlerWrapper:
+ def test_wrapped(self):
+ h = TCPHandler(rfile='foo', wfile='bar')
+ p = HTTP2Protocol(h)
+ assert p.tcp_handler.rfile == 'foo'
+ assert p.tcp_handler.wfile == 'bar'
+
+ def test_direct(self):
+ p = HTTP2Protocol(rfile='foo', wfile='bar')
+ assert isinstance(p.tcp_handler, TCPHandler)
+ assert p.tcp_handler.rfile == 'foo'
+ assert p.tcp_handler.wfile == 'bar'
+
+
+class EchoHandler(tcp.BaseHandler):
+ sni = None
+
+ def handle(self):
+ while True:
+ v = self.rfile.safe_read(1)
+ self.wfile.write(v)
+ self.wfile.flush()
+
+
+class TestProtocol:
+ @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_server_connection_preface")
+ @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_client_connection_preface")
+ def test_perform_connection_preface(self, mock_client_method, mock_server_method):
+ protocol = HTTP2Protocol(is_server=False)
+ protocol.connection_preface_performed = True
+
+ protocol.perform_connection_preface()
+ assert not mock_client_method.called
+ assert not mock_server_method.called
+
+ protocol.perform_connection_preface(force=True)
+ assert mock_client_method.called
+ assert not mock_server_method.called
+
+ @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_server_connection_preface")
+ @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_client_connection_preface")
+ def test_perform_connection_preface_server(self, mock_client_method, mock_server_method):
+ protocol = HTTP2Protocol(is_server=True)
+ protocol.connection_preface_performed = True
+
+ protocol.perform_connection_preface()
+ assert not mock_client_method.called
+ assert not mock_server_method.called
+
+ protocol.perform_connection_preface(force=True)
+ assert not mock_client_method.called
+ assert mock_server_method.called
+
+
+class TestCheckALPNMatch(tservers.ServerTestBase):
+ handler = EchoHandler
+ ssl = dict(
+ alpn_select=b'h2',
+ )
+
+ if OpenSSL._util.lib.Cryptography_HAS_ALPN:
+
+ def test_check_alpn(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl(alpn_protos=[b'h2'])
+ protocol = HTTP2Protocol(c)
+ assert protocol.check_alpn()
+
+
+class TestCheckALPNMismatch(tservers.ServerTestBase):
+ handler = EchoHandler
+ ssl = dict(
+ alpn_select=None,
+ )
+
+ if OpenSSL._util.lib.Cryptography_HAS_ALPN:
+
+ def test_check_alpn(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl(alpn_protos=[b'h2'])
+ protocol = HTTP2Protocol(c)
+ with raises(NotImplementedError):
+ protocol.check_alpn()
+
+
+class TestPerformServerConnectionPreface(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+
+ def handle(self):
+ # send magic
+ self.wfile.write(codecs.decode('505249202a20485454502f322e300d0a0d0a534d0d0a0d0a', 'hex_codec'))
+ self.wfile.flush()
+
+ # send empty settings frame
+ self.wfile.write(codecs.decode('000000040000000000', 'hex_codec'))
+ self.wfile.flush()
+
+ # check empty settings frame
+ raw = utils.http2_read_raw_frame(self.rfile)
+ assert raw == codecs.decode('00000c040000000000000200000000000300000001', 'hex_codec')
+
+ # check settings acknowledgement
+ raw = utils.http2_read_raw_frame(self.rfile)
+ assert raw == codecs.decode('000000040100000000', 'hex_codec')
+
+ # send settings acknowledgement
+ self.wfile.write(codecs.decode('000000040100000000', 'hex_codec'))
+ self.wfile.flush()
+
+ def test_perform_server_connection_preface(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ protocol = HTTP2Protocol(c)
+
+ assert not protocol.connection_preface_performed
+ protocol.perform_server_connection_preface()
+ assert protocol.connection_preface_performed
+
+ with raises(TcpDisconnect):
+ protocol.perform_server_connection_preface(force=True)
+
+
+class TestPerformClientConnectionPreface(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+
+ def handle(self):
+ # check magic
+ assert self.rfile.read(24) == HTTP2Protocol.CLIENT_CONNECTION_PREFACE
+
+ # check empty settings frame
+ assert self.rfile.read(9) ==\
+ codecs.decode('000000040000000000', 'hex_codec')
+
+ # send empty settings frame
+ self.wfile.write(codecs.decode('000000040000000000', 'hex_codec'))
+ self.wfile.flush()
+
+ # check settings acknowledgement
+ assert self.rfile.read(9) == \
+ codecs.decode('000000040100000000', 'hex_codec')
+
+ # send settings acknowledgement
+ self.wfile.write(codecs.decode('000000040100000000', 'hex_codec'))
+ self.wfile.flush()
+
+ def test_perform_client_connection_preface(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ protocol = HTTP2Protocol(c)
+
+ assert not protocol.connection_preface_performed
+ protocol.perform_client_connection_preface()
+ assert protocol.connection_preface_performed
+
+
+class TestClientStreamIds(object):
+ c = tcp.TCPClient(("127.0.0.1", 0))
+ protocol = HTTP2Protocol(c)
+
+ def test_client_stream_ids(self):
+ assert self.protocol.current_stream_id is None
+ assert self.protocol._next_stream_id() == 1
+ assert self.protocol.current_stream_id == 1
+ assert self.protocol._next_stream_id() == 3
+ assert self.protocol.current_stream_id == 3
+ assert self.protocol._next_stream_id() == 5
+ assert self.protocol.current_stream_id == 5
+
+
+class TestServerStreamIds(object):
+ c = tcp.TCPClient(("127.0.0.1", 0))
+ protocol = HTTP2Protocol(c, is_server=True)
+
+ def test_server_stream_ids(self):
+ assert self.protocol.current_stream_id is None
+ assert self.protocol._next_stream_id() == 2
+ assert self.protocol.current_stream_id == 2
+ assert self.protocol._next_stream_id() == 4
+ assert self.protocol.current_stream_id == 4
+ assert self.protocol._next_stream_id() == 6
+ assert self.protocol.current_stream_id == 6
+
+
+class TestApplySettings(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+ def handle(self):
+ # check settings acknowledgement
+ assert self.rfile.read(9) == codecs.decode('000000040100000000', 'hex_codec')
+ self.wfile.write("OK")
+ self.wfile.flush()
+ self.rfile.safe_read(9) # just to keep the connection alive a bit longer
+
+ ssl = True
+
+ def test_apply_settings(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c)
+
+ protocol._apply_settings({
+ SettingsFrame.ENABLE_PUSH: 'foo',
+ SettingsFrame.MAX_CONCURRENT_STREAMS: 'bar',
+ SettingsFrame.INITIAL_WINDOW_SIZE: 'deadbeef',
+ })
+
+ assert c.rfile.safe_read(2) == b"OK"
+
+ assert protocol.http2_settings[
+ SettingsFrame.ENABLE_PUSH] == 'foo'
+ assert protocol.http2_settings[
+ SettingsFrame.MAX_CONCURRENT_STREAMS] == 'bar'
+ assert protocol.http2_settings[
+ SettingsFrame.INITIAL_WINDOW_SIZE] == 'deadbeef'
+
+
+class TestCreateHeaders(object):
+ c = tcp.TCPClient(("127.0.0.1", 0))
+
+ def test_create_headers(self):
+ headers = http.Headers([
+ (b':method', b'GET'),
+ (b':path', b'index.html'),
+ (b':scheme', b'https'),
+ (b'foo', b'bar')])
+
+ bytes = HTTP2Protocol(self.c)._create_headers(
+ headers, 1, end_stream=True)
+ assert b''.join(bytes) ==\
+ codecs.decode('000014010500000001824488355217caf3a69a3f87408294e7838c767f', 'hex_codec')
+
+ bytes = HTTP2Protocol(self.c)._create_headers(
+ headers, 1, end_stream=False)
+ assert b''.join(bytes) ==\
+ codecs.decode('000014010400000001824488355217caf3a69a3f87408294e7838c767f', 'hex_codec')
+
+ def test_create_headers_multiple_frames(self):
+ headers = http.Headers([
+ (b':method', b'GET'),
+ (b':path', b'/'),
+ (b':scheme', b'https'),
+ (b'foo', b'bar'),
+ (b'server', b'version')])
+
+ protocol = HTTP2Protocol(self.c)
+ protocol.http2_settings[SettingsFrame.MAX_FRAME_SIZE] = 8
+ bytes = protocol._create_headers(headers, 1, end_stream=True)
+ assert len(bytes) == 3
+ assert bytes[0] == codecs.decode('000008010100000001828487408294e783', 'hex_codec')
+ assert bytes[1] == codecs.decode('0000080900000000018c767f7685ee5b10', 'hex_codec')
+ assert bytes[2] == codecs.decode('00000209040000000163d5', 'hex_codec')
+
+
+class TestCreateBody(object):
+ c = tcp.TCPClient(("127.0.0.1", 0))
+
+ def test_create_body_empty(self):
+ protocol = HTTP2Protocol(self.c)
+ bytes = protocol._create_body(b'', 1)
+ assert b''.join(bytes) == b''
+
+ def test_create_body_single_frame(self):
+ protocol = HTTP2Protocol(self.c)
+ bytes = protocol._create_body(b'foobar', 1)
+ assert b''.join(bytes) == codecs.decode('000006000100000001666f6f626172', 'hex_codec')
+
+ def test_create_body_multiple_frames(self):
+ protocol = HTTP2Protocol(self.c)
+ protocol.http2_settings[SettingsFrame.MAX_FRAME_SIZE] = 5
+ bytes = protocol._create_body(b'foobarmehm42', 1)
+ assert len(bytes) == 3
+ assert bytes[0] == codecs.decode('000005000000000001666f6f6261', 'hex_codec')
+ assert bytes[1] == codecs.decode('000005000000000001726d65686d', 'hex_codec')
+ assert bytes[2] == codecs.decode('0000020001000000013432', 'hex_codec')
+
+
+class TestReadRequest(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+
+ def handle(self):
+ self.wfile.write(
+ codecs.decode('000003010400000001828487', 'hex_codec'))
+ self.wfile.write(
+ codecs.decode('000006000100000001666f6f626172', 'hex_codec'))
+ self.wfile.flush()
+ self.rfile.safe_read(9) # just to keep the connection alive a bit longer
+
+ ssl = True
+
+ def test_read_request(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c, is_server=True)
+ protocol.connection_preface_performed = True
+
+ req = protocol.read_request(NotImplemented)
+
+ assert req.stream_id
+ assert req.headers.fields == [[b':method', b'GET'], [b':path', b'/'], [b':scheme', b'https']]
+ assert req.content == b'foobar'
+
+
+class TestReadRequestRelative(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+ def handle(self):
+ self.wfile.write(
+ codecs.decode('00000c0105000000014287d5af7e4d5a777f4481f9', 'hex_codec'))
+ self.wfile.flush()
+
+ ssl = True
+
+ def test_asterisk_form_in(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c, is_server=True)
+ protocol.connection_preface_performed = True
+
+ req = protocol.read_request(NotImplemented)
+
+ assert req.form_in == "relative"
+ assert req.method == "OPTIONS"
+ assert req.path == "*"
+
+
+class TestReadRequestAbsolute(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+ def handle(self):
+ self.wfile.write(
+ codecs.decode('00001901050000000182448d9d29aee30c0e492c2a1170426366871c92585422e085', 'hex_codec'))
+ self.wfile.flush()
+
+ ssl = True
+
+ def test_absolute_form_in(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c, is_server=True)
+ protocol.connection_preface_performed = True
+
+ req = protocol.read_request(NotImplemented)
+
+ assert req.form_in == "absolute"
+ assert req.scheme == "http"
+ assert req.host == "address"
+ assert req.port == 22
+
+
+class TestReadRequestConnect(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+ def handle(self):
+ self.wfile.write(
+ codecs.decode('00001b0105000000014287bdab4e9c17b7ff44871c92585422e08541871c92585422e085', 'hex_codec'))
+ self.wfile.write(
+ codecs.decode('00001d0105000000014287bdab4e9c17b7ff44882f91d35d055c87a741882f91d35d055c87a7', 'hex_codec'))
+ self.wfile.flush()
+
+ ssl = True
+
+ def test_connect(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c, is_server=True)
+ protocol.connection_preface_performed = True
+
+ req = protocol.read_request(NotImplemented)
+ assert req.form_in == "authority"
+ assert req.method == "CONNECT"
+ assert req.host == "address"
+ assert req.port == 22
+
+ req = protocol.read_request(NotImplemented)
+ assert req.form_in == "authority"
+ assert req.method == "CONNECT"
+ assert req.host == "example.com"
+ assert req.port == 443
+
+
+class TestReadResponse(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+ def handle(self):
+ self.wfile.write(
+ codecs.decode('00000801040000002a88628594e78c767f', 'hex_codec'))
+ self.wfile.write(
+ codecs.decode('00000600010000002a666f6f626172', 'hex_codec'))
+ self.wfile.flush()
+ self.rfile.safe_read(9) # just to keep the connection alive a bit longer
+
+ ssl = True
+
+ def test_read_response(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c)
+ protocol.connection_preface_performed = True
+
+ resp = protocol.read_response(NotImplemented, stream_id=42)
+
+ assert resp.http_version == "HTTP/2.0"
+ assert resp.status_code == 200
+ assert resp.msg == ''
+ assert resp.headers.fields == [[b':status', b'200'], [b'etag', b'foobar']]
+ assert resp.content == b'foobar'
+ assert resp.timestamp_end
+
+
+class TestReadEmptyResponse(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+ def handle(self):
+ self.wfile.write(
+ codecs.decode('00000801050000002a88628594e78c767f', 'hex_codec'))
+ self.wfile.flush()
+
+ ssl = True
+
+ def test_read_empty_response(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c)
+ protocol.connection_preface_performed = True
+
+ resp = protocol.read_response(NotImplemented, stream_id=42)
+
+ assert resp.stream_id == 42
+ assert resp.http_version == "HTTP/2.0"
+ assert resp.status_code == 200
+ assert resp.msg == ''
+ assert resp.headers.fields == [[b':status', b'200'], [b'etag', b'foobar']]
+ assert resp.content == b''
+
+
+class TestAssembleRequest(object):
+ c = tcp.TCPClient(("127.0.0.1", 0))
+
+ def test_request_simple(self):
+ bytes = HTTP2Protocol(self.c).assemble_request(http.Request(
+ b'',
+ b'GET',
+ b'https',
+ b'',
+ b'',
+ b'/',
+ b"HTTP/2.0",
+ None,
+ None,
+ ))
+ assert len(bytes) == 1
+ assert bytes[0] == codecs.decode('00000d0105000000018284874188089d5c0b8170dc07', 'hex_codec')
+
+ def test_request_with_stream_id(self):
+ req = http.Request(
+ b'',
+ b'GET',
+ b'https',
+ b'',
+ b'',
+ b'/',
+ b"HTTP/2.0",
+ None,
+ None,
+ )
+ req.stream_id = 0x42
+ bytes = HTTP2Protocol(self.c).assemble_request(req)
+ assert len(bytes) == 1
+ assert bytes[0] == codecs.decode('00000d0105000000428284874188089d5c0b8170dc07', 'hex_codec')
+
+ def test_request_with_body(self):
+ bytes = HTTP2Protocol(self.c).assemble_request(http.Request(
+ b'',
+ b'GET',
+ b'https',
+ b'',
+ b'',
+ b'/',
+ b"HTTP/2.0",
+ http.Headers([(b'foo', b'bar')]),
+ b'foobar',
+ ))
+ assert len(bytes) == 2
+ assert bytes[0] ==\
+ codecs.decode('0000150104000000018284874188089d5c0b8170dc07408294e7838c767f', 'hex_codec')
+ assert bytes[1] ==\
+ codecs.decode('000006000100000001666f6f626172', 'hex_codec')
+
+
+class TestAssembleResponse(object):
+ c = tcp.TCPClient(("127.0.0.1", 0))
+
+ def test_simple(self):
+ bytes = HTTP2Protocol(self.c, is_server=True).assemble_response(http.Response(
+ b"HTTP/2.0",
+ 200,
+ ))
+ assert len(bytes) == 1
+ assert bytes[0] ==\
+ codecs.decode('00000101050000000288', 'hex_codec')
+
+ def test_with_stream_id(self):
+ resp = http.Response(
+ b"HTTP/2.0",
+ 200,
+ )
+ resp.stream_id = 0x42
+ bytes = HTTP2Protocol(self.c, is_server=True).assemble_response(resp)
+ assert len(bytes) == 1
+ assert bytes[0] ==\
+ codecs.decode('00000101050000004288', 'hex_codec')
+
+ def test_with_body(self):
+ bytes = HTTP2Protocol(self.c, is_server=True).assemble_response(http.Response(
+ b"HTTP/2.0",
+ 200,
+ b'',
+ http.Headers(foo=b"bar"),
+ b'foobar'
+ ))
+ assert len(bytes) == 2
+ assert bytes[0] ==\
+ codecs.decode('00000901040000000288408294e7838c767f', 'hex_codec')
+ assert bytes[1] ==\
+ codecs.decode('000006000100000002666f6f626172', 'hex_codec')
diff --git a/test/netlib/http/test_authentication.py b/test/netlib/http/test_authentication.py
new file mode 100644
index 00000000..1df7cd9c
--- /dev/null
+++ b/test/netlib/http/test_authentication.py
@@ -0,0 +1,122 @@
+import binascii
+
+from netlib import tutils
+from netlib.http import authentication, Headers
+
+
+def test_parse_http_basic_auth():
+ vals = ("basic", "foo", "bar")
+ assert authentication.parse_http_basic_auth(
+ authentication.assemble_http_basic_auth(*vals)
+ ) == vals
+ assert not authentication.parse_http_basic_auth("")
+ assert not authentication.parse_http_basic_auth("foo bar")
+ v = "basic " + binascii.b2a_base64(b"foo").decode("ascii")
+ assert not authentication.parse_http_basic_auth(v)
+
+
+class TestPassManNonAnon:
+
+ def test_simple(self):
+ p = authentication.PassManNonAnon()
+ assert not p.test("", "")
+ assert p.test("user", "")
+
+
+class TestPassManHtpasswd:
+
+ def test_file_errors(self):
+ tutils.raises(
+ "malformed htpasswd file",
+ authentication.PassManHtpasswd,
+ tutils.test_data.path("data/server.crt"))
+
+ def test_simple(self):
+ pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
+
+ vals = ("basic", "test", "test")
+ authentication.assemble_http_basic_auth(*vals)
+ assert pm.test("test", "test")
+ assert not pm.test("test", "foo")
+ assert not pm.test("foo", "test")
+ assert not pm.test("test", "")
+ assert not pm.test("", "")
+
+
+class TestPassManSingleUser:
+
+ def test_simple(self):
+ pm = authentication.PassManSingleUser("test", "test")
+ assert pm.test("test", "test")
+ assert not pm.test("test", "foo")
+ assert not pm.test("foo", "test")
+
+
+class TestNullProxyAuth:
+
+ def test_simple(self):
+ na = authentication.NullProxyAuth(authentication.PassManNonAnon())
+ assert not na.auth_challenge_headers()
+ assert na.authenticate("foo")
+ na.clean({})
+
+
+class TestBasicProxyAuth:
+
+ def test_simple(self):
+ ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
+ headers = Headers()
+ assert ba.auth_challenge_headers()
+ assert not ba.authenticate(headers)
+
+ def test_authenticate_clean(self):
+ ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
+
+ headers = Headers()
+ vals = ("basic", "foo", "bar")
+ headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
+ assert ba.authenticate(headers)
+
+ ba.clean(headers)
+ assert not ba.AUTH_HEADER in headers
+
+ headers[ba.AUTH_HEADER] = ""
+ assert not ba.authenticate(headers)
+
+ headers[ba.AUTH_HEADER] = "foo"
+ assert not ba.authenticate(headers)
+
+ vals = ("foo", "foo", "bar")
+ headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
+ assert not ba.authenticate(headers)
+
+ ba = authentication.BasicProxyAuth(authentication.PassMan(), "test")
+ vals = ("basic", "foo", "bar")
+ headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
+ assert not ba.authenticate(headers)
+
+
+class Bunch:
+ pass
+
+
+class TestAuthAction:
+
+ def test_nonanonymous(self):
+ m = Bunch()
+ aa = authentication.NonanonymousAuthAction(None, "authenticator")
+ aa(None, m, None, None)
+ assert m.authenticator
+
+ def test_singleuser(self):
+ m = Bunch()
+ aa = authentication.SingleuserAuthAction(None, "authenticator")
+ aa(None, m, "foo:bar", None)
+ assert m.authenticator
+ tutils.raises("invalid", aa, None, m, "foo", None)
+
+ def test_httppasswd(self):
+ m = Bunch()
+ aa = authentication.HtpasswdAuthAction(None, "authenticator")
+ aa(None, m, tutils.test_data.path("data/htpasswd"), None)
+ assert m.authenticator
diff --git a/test/netlib/http/test_cookies.py b/test/netlib/http/test_cookies.py
new file mode 100644
index 00000000..34bb64f2
--- /dev/null
+++ b/test/netlib/http/test_cookies.py
@@ -0,0 +1,218 @@
+from netlib.http import cookies
+
+
+def test_read_token():
+ tokens = [
+ [("foo", 0), ("foo", 3)],
+ [("foo", 1), ("oo", 3)],
+ [(" foo", 1), ("foo", 4)],
+ [(" foo;", 1), ("foo", 4)],
+ [(" foo=", 1), ("foo", 4)],
+ [(" foo=bar", 1), ("foo", 4)],
+ ]
+ for q, a in tokens:
+ assert cookies._read_token(*q) == a
+
+
+def test_read_quoted_string():
+ tokens = [
+ [('"foo" x', 0), ("foo", 5)],
+ [('"f\oo" x', 0), ("foo", 6)],
+ [(r'"f\\o" x', 0), (r"f\o", 6)],
+ [(r'"f\\" x', 0), (r"f" + '\\', 5)],
+ [('"fo\\\"" x', 0), ("fo\"", 6)],
+ [('"foo" x', 7), ("", 8)],
+ ]
+ for q, a in tokens:
+ assert cookies._read_quoted_string(*q) == a
+
+
+def test_read_pairs():
+ vals = [
+ [
+ "one",
+ [["one", None]]
+ ],
+ [
+ "one=two",
+ [["one", "two"]]
+ ],
+ [
+ "one=",
+ [["one", ""]]
+ ],
+ [
+ 'one="two"',
+ [["one", "two"]]
+ ],
+ [
+ 'one="two"; three=four',
+ [["one", "two"], ["three", "four"]]
+ ],
+ [
+ 'one="two"; three=four; five',
+ [["one", "two"], ["three", "four"], ["five", None]]
+ ],
+ [
+ 'one="\\"two"; three=four',
+ [["one", '"two'], ["three", "four"]]
+ ],
+ ]
+ for s, lst in vals:
+ ret, off = cookies._read_pairs(s)
+ assert ret == lst
+
+
+def test_pairs_roundtrips():
+ pairs = [
+ [
+ "",
+ []
+ ],
+ [
+ "one=uno",
+ [["one", "uno"]]
+ ],
+ [
+ "one",
+ [["one", None]]
+ ],
+ [
+ "one=uno; two=due",
+ [["one", "uno"], ["two", "due"]]
+ ],
+ [
+ 'one="uno"; two="\due"',
+ [["one", "uno"], ["two", "due"]]
+ ],
+ [
+ 'one="un\\"o"',
+ [["one", 'un"o']]
+ ],
+ [
+ 'one="uno,due"',
+ [["one", 'uno,due']]
+ ],
+ [
+ "one=uno; two; three=tre",
+ [["one", "uno"], ["two", None], ["three", "tre"]]
+ ],
+ [
+ "_lvs2=zHai1+Hq+Tc2vmc2r4GAbdOI5Jopg3EwsdUT9g=; "
+ "_rcc2=53VdltWl+Ov6ordflA==;",
+ [
+ ["_lvs2", "zHai1+Hq+Tc2vmc2r4GAbdOI5Jopg3EwsdUT9g="],
+ ["_rcc2", "53VdltWl+Ov6ordflA=="]
+ ]
+ ]
+ ]
+ for s, lst in pairs:
+ ret, off = cookies._read_pairs(s)
+ assert ret == lst
+ s2 = cookies._format_pairs(lst)
+ ret, off = cookies._read_pairs(s2)
+ assert ret == lst
+
+
+def test_cookie_roundtrips():
+ pairs = [
+ [
+ "one=uno",
+ [["one", "uno"]]
+ ],
+ [
+ "one=uno; two=due",
+ [["one", "uno"], ["two", "due"]]
+ ],
+ ]
+ for s, lst in pairs:
+ ret = cookies.parse_cookie_header(s)
+ assert ret.lst == lst
+ s2 = cookies.format_cookie_header(ret)
+ ret = cookies.parse_cookie_header(s2)
+ assert ret.lst == lst
+
+
+def test_parse_set_cookie_pairs():
+ pairs = [
+ [
+ "one=uno",
+ [
+ ["one", "uno"]
+ ]
+ ],
+ [
+ "one=un\x20",
+ [
+ ["one", "un\x20"]
+ ]
+ ],
+ [
+ "one=uno; foo",
+ [
+ ["one", "uno"],
+ ["foo", None]
+ ]
+ ],
+ [
+ "mun=1.390.f60; "
+ "expires=sun, 11-oct-2015 12:38:31 gmt; path=/; "
+ "domain=b.aol.com",
+ [
+ ["mun", "1.390.f60"],
+ ["expires", "sun, 11-oct-2015 12:38:31 gmt"],
+ ["path", "/"],
+ ["domain", "b.aol.com"]
+ ]
+ ],
+ [
+ r'rpb=190%3d1%2616726%3d1%2634832%3d1%2634874%3d1; '
+ 'domain=.rubiconproject.com; '
+ 'expires=mon, 11-may-2015 21:54:57 gmt; '
+ 'path=/',
+ [
+ ['rpb', r'190%3d1%2616726%3d1%2634832%3d1%2634874%3d1'],
+ ['domain', '.rubiconproject.com'],
+ ['expires', 'mon, 11-may-2015 21:54:57 gmt'],
+ ['path', '/']
+ ]
+ ],
+ ]
+ for s, lst in pairs:
+ ret = cookies._parse_set_cookie_pairs(s)
+ assert ret == lst
+ s2 = cookies._format_set_cookie_pairs(ret)
+ ret2 = cookies._parse_set_cookie_pairs(s2)
+ assert ret2 == lst
+
+
+def test_parse_set_cookie_header():
+ vals = [
+ [
+ "", None
+ ],
+ [
+ ";", None
+ ],
+ [
+ "one=uno",
+ ("one", "uno", [])
+ ],
+ [
+ "one=uno; foo=bar",
+ ("one", "uno", [["foo", "bar"]])
+ ]
+ ]
+ for s, expected in vals:
+ ret = cookies.parse_set_cookie_header(s)
+ if expected:
+ assert ret[0] == expected[0]
+ assert ret[1] == expected[1]
+ assert ret[2].lst == expected[2]
+ s2 = cookies.format_set_cookie_header(*ret)
+ ret2 = cookies.parse_set_cookie_header(s2)
+ assert ret2[0] == expected[0]
+ assert ret2[1] == expected[1]
+ assert ret2[2].lst == expected[2]
+ else:
+ assert ret is None
diff --git a/test/netlib/http/test_headers.py b/test/netlib/http/test_headers.py
new file mode 100644
index 00000000..d50fee3e
--- /dev/null
+++ b/test/netlib/http/test_headers.py
@@ -0,0 +1,152 @@
+from netlib.http import Headers
+from netlib.tutils import raises
+
+
+class TestHeaders(object):
+ def _2host(self):
+ return Headers(
+ [
+ [b"Host", b"example.com"],
+ [b"host", b"example.org"]
+ ]
+ )
+
+ def test_init(self):
+ headers = Headers()
+ assert len(headers) == 0
+
+ headers = Headers([[b"Host", b"example.com"]])
+ assert len(headers) == 1
+ assert headers["Host"] == "example.com"
+
+ headers = Headers(Host="example.com")
+ assert len(headers) == 1
+ assert headers["Host"] == "example.com"
+
+ headers = Headers(
+ [[b"Host", b"invalid"]],
+ Host="example.com"
+ )
+ assert len(headers) == 1
+ assert headers["Host"] == "example.com"
+
+ headers = Headers(
+ [[b"Host", b"invalid"], [b"Accept", b"text/plain"]],
+ Host="example.com"
+ )
+ assert len(headers) == 2
+ assert headers["Host"] == "example.com"
+ assert headers["Accept"] == "text/plain"
+
+ with raises(ValueError):
+ Headers([[b"Host", u"not-bytes"]])
+
+ def test_getitem(self):
+ headers = Headers(Host="example.com")
+ assert headers["Host"] == "example.com"
+ assert headers["host"] == "example.com"
+ with raises(KeyError):
+ _ = headers["Accept"]
+
+ headers = self._2host()
+ assert headers["Host"] == "example.com, example.org"
+
+ def test_str(self):
+ headers = Headers(Host="example.com")
+ assert bytes(headers) == b"Host: example.com\r\n"
+
+ headers = Headers([
+ [b"Host", b"example.com"],
+ [b"Accept", b"text/plain"]
+ ])
+ assert bytes(headers) == b"Host: example.com\r\nAccept: text/plain\r\n"
+
+ headers = Headers()
+ assert bytes(headers) == b""
+
+ def test_setitem(self):
+ headers = Headers()
+ headers["Host"] = "example.com"
+ assert "Host" in headers
+ assert "host" in headers
+ assert headers["Host"] == "example.com"
+
+ headers["host"] = "example.org"
+ assert "Host" in headers
+ assert "host" in headers
+ assert headers["Host"] == "example.org"
+
+ headers["accept"] = "text/plain"
+ assert len(headers) == 2
+ assert "Accept" in headers
+ assert "Host" in headers
+
+ headers = self._2host()
+ assert len(headers.fields) == 2
+ headers["Host"] = "example.com"
+ assert len(headers.fields) == 1
+ assert "Host" in headers
+
+ def test_delitem(self):
+ headers = Headers(Host="example.com")
+ assert len(headers) == 1
+ del headers["host"]
+ assert len(headers) == 0
+ try:
+ del headers["host"]
+ except KeyError:
+ assert True
+ else:
+ assert False
+
+ headers = self._2host()
+ del headers["Host"]
+ assert len(headers) == 0
+
+ def test_keys(self):
+ headers = Headers(Host="example.com")
+ assert list(headers.keys()) == ["Host"]
+
+ headers = self._2host()
+ assert list(headers.keys()) == ["Host"]
+
+ def test_eq_ne(self):
+ headers1 = Headers(Host="example.com")
+ headers2 = Headers(host="example.com")
+ assert not (headers1 == headers2)
+ assert headers1 != headers2
+
+ headers1 = Headers(Host="example.com")
+ headers2 = Headers(Host="example.com")
+ assert headers1 == headers2
+ assert not (headers1 != headers2)
+
+ assert headers1 != 42
+
+ def test_get_all(self):
+ headers = self._2host()
+ assert headers.get_all("host") == ["example.com", "example.org"]
+ assert headers.get_all("accept") == []
+
+ def test_set_all(self):
+ headers = Headers(Host="example.com")
+ headers.set_all("Accept", ["text/plain"])
+ assert len(headers) == 2
+ assert "accept" in headers
+
+ headers = self._2host()
+ headers.set_all("Host", ["example.org"])
+ assert headers["host"] == "example.org"
+
+ headers.set_all("Host", ["example.org", "example.net"])
+ assert headers["host"] == "example.org, example.net"
+
+ def test_state(self):
+ headers = self._2host()
+ assert len(headers.get_state()) == 2
+ assert headers == Headers.from_state(headers.get_state())
+
+ headers2 = Headers()
+ assert headers != headers2
+ headers2.set_state(headers.get_state())
+ assert headers == headers2
diff --git a/test/netlib/http/test_message.py b/test/netlib/http/test_message.py
new file mode 100644
index 00000000..4b1f4630
--- /dev/null
+++ b/test/netlib/http/test_message.py
@@ -0,0 +1,153 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, print_function, division
+
+from netlib.http import decoded, Headers
+from netlib.tutils import tresp, raises
+
+
+def _test_passthrough_attr(message, attr):
+ assert getattr(message, attr) == getattr(message.data, attr)
+ setattr(message, attr, "foo")
+ assert getattr(message.data, attr) == "foo"
+
+
+def _test_decoded_attr(message, attr):
+ assert getattr(message, attr) == getattr(message.data, attr).decode("utf8")
+ # Set str, get raw bytes
+ setattr(message, attr, "foo")
+ assert getattr(message.data, attr) == b"foo"
+ # Set raw bytes, get decoded
+ setattr(message.data, attr, b"BAR") # use uppercase so that we can also cover request.method
+ assert getattr(message, attr) == "BAR"
+ # Set bytes, get raw bytes
+ setattr(message, attr, b"baz")
+ assert getattr(message.data, attr) == b"baz"
+
+ # Set UTF8
+ setattr(message, attr, "Non-Autorisé")
+ assert getattr(message.data, attr) == b"Non-Autoris\xc3\xa9"
+ # Don't fail on garbage
+ setattr(message.data, attr, b"FOO\xFF\x00BAR")
+ assert getattr(message, attr).startswith("FOO")
+ assert getattr(message, attr).endswith("BAR")
+ # foo.bar = foo.bar should not cause any side effects.
+ d = getattr(message, attr)
+ setattr(message, attr, d)
+ assert getattr(message.data, attr) == b"FOO\xFF\x00BAR"
+
+
+class TestMessageData(object):
+ def test_eq_ne(self):
+ data = tresp(timestamp_start=42, timestamp_end=42).data
+ same = tresp(timestamp_start=42, timestamp_end=42).data
+ assert data == same
+ assert not data != same
+
+ other = tresp(content=b"foo").data
+ assert not data == other
+ assert data != other
+
+ assert data != 0
+
+
+class TestMessage(object):
+
+ def test_init(self):
+ resp = tresp()
+ assert resp.data
+
+ def test_eq_ne(self):
+ resp = tresp(timestamp_start=42, timestamp_end=42)
+ same = tresp(timestamp_start=42, timestamp_end=42)
+ assert resp == same
+ assert not resp != same
+
+ other = tresp(timestamp_start=0, timestamp_end=0)
+ assert not resp == other
+ assert resp != other
+
+ assert resp != 0
+
+ def test_content_length_update(self):
+ resp = tresp()
+ resp.content = b"foo"
+ assert resp.data.content == b"foo"
+ assert resp.headers["content-length"] == "3"
+ resp.content = b""
+ assert resp.data.content == b""
+ assert resp.headers["content-length"] == "0"
+
+ def test_content_basic(self):
+ _test_passthrough_attr(tresp(), "content")
+
+ def test_headers(self):
+ _test_passthrough_attr(tresp(), "headers")
+
+ def test_timestamp_start(self):
+ _test_passthrough_attr(tresp(), "timestamp_start")
+
+ def test_timestamp_end(self):
+ _test_passthrough_attr(tresp(), "timestamp_end")
+
+ def teste_http_version(self):
+ _test_decoded_attr(tresp(), "http_version")
+
+
+class TestDecodedDecorator(object):
+
+ def test_simple(self):
+ r = tresp()
+ assert r.content == b"message"
+ assert "content-encoding" not in r.headers
+ assert r.encode("gzip")
+
+ assert r.headers["content-encoding"]
+ assert r.content != b"message"
+ with decoded(r):
+ assert "content-encoding" not in r.headers
+ assert r.content == b"message"
+ assert r.headers["content-encoding"]
+ assert r.content != b"message"
+
+ def test_modify(self):
+ r = tresp()
+ assert "content-encoding" not in r.headers
+ assert r.encode("gzip")
+
+ with decoded(r):
+ r.content = b"foo"
+
+ assert r.content != b"foo"
+ r.decode()
+ assert r.content == b"foo"
+
+ def test_unknown_ce(self):
+ r = tresp()
+ r.headers["content-encoding"] = "zopfli"
+ r.content = b"foo"
+ with decoded(r):
+ assert r.headers["content-encoding"]
+ assert r.content == b"foo"
+ assert r.headers["content-encoding"]
+ assert r.content == b"foo"
+
+ def test_cannot_decode(self):
+ r = tresp()
+ assert r.encode("gzip")
+ r.content = b"foo"
+ with decoded(r):
+ assert r.headers["content-encoding"]
+ assert r.content == b"foo"
+ assert r.headers["content-encoding"]
+ assert r.content != b"foo"
+ r.decode()
+ assert r.content == b"foo"
+
+ def test_cannot_encode(self):
+ r = tresp()
+ assert r.encode("gzip")
+ with decoded(r):
+ r.content = None
+
+ assert "content-encoding" not in r.headers
+ assert r.content is None
diff --git a/test/netlib/http/test_request.py b/test/netlib/http/test_request.py
new file mode 100644
index 00000000..900b2cd1
--- /dev/null
+++ b/test/netlib/http/test_request.py
@@ -0,0 +1,238 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, print_function, division
+
+import six
+
+from netlib import utils
+from netlib.http import Headers
+from netlib.odict import ODict
+from netlib.tutils import treq, raises
+from .test_message import _test_decoded_attr, _test_passthrough_attr
+
+
+class TestRequestData(object):
+ def test_init(self):
+ with raises(ValueError if six.PY2 else TypeError):
+ treq(headers="foobar")
+
+ assert isinstance(treq(headers=None).headers, Headers)
+
+
+class TestRequestCore(object):
+ """
+ Tests for builtins and the attributes that are directly proxied from the data structure
+ """
+ def test_repr(self):
+ request = treq()
+ assert repr(request) == "Request(GET address:22/path)"
+ request.host = None
+ assert repr(request) == "Request(GET /path)"
+
+ def test_first_line_format(self):
+ _test_passthrough_attr(treq(), "first_line_format")
+
+ def test_method(self):
+ _test_decoded_attr(treq(), "method")
+
+ def test_scheme(self):
+ _test_decoded_attr(treq(), "scheme")
+
+ def test_port(self):
+ _test_passthrough_attr(treq(), "port")
+
+ def test_path(self):
+ _test_decoded_attr(treq(), "path")
+
+ def test_host(self):
+ if six.PY2:
+ from unittest import SkipTest
+ raise SkipTest()
+
+ request = treq()
+ assert request.host == request.data.host.decode("idna")
+
+ # Test IDNA encoding
+ # Set str, get raw bytes
+ request.host = "ídna.example"
+ assert request.data.host == b"xn--dna-qma.example"
+ # Set raw bytes, get decoded
+ request.data.host = b"xn--idn-gla.example"
+ assert request.host == "idná.example"
+ # Set bytes, get raw bytes
+ request.host = b"xn--dn-qia9b.example"
+ assert request.data.host == b"xn--dn-qia9b.example"
+ # IDNA encoding is not bijective
+ request.host = "fußball"
+ assert request.host == "fussball"
+
+ # Don't fail on garbage
+ request.data.host = b"foo\xFF\x00bar"
+ assert request.host.startswith("foo")
+ assert request.host.endswith("bar")
+ # foo.bar = foo.bar should not cause any side effects.
+ d = request.host
+ request.host = d
+ assert request.data.host == b"foo\xFF\x00bar"
+
+ def test_host_header_update(self):
+ request = treq()
+ assert "host" not in request.headers
+ request.host = "example.com"
+ assert "host" not in request.headers
+
+ request.headers["Host"] = "foo"
+ request.host = "example.org"
+ assert request.headers["Host"] == "example.org"
+
+
+class TestRequestUtils(object):
+ """
+ Tests for additional convenience methods.
+ """
+ def test_url(self):
+ request = treq()
+ assert request.url == "http://address:22/path"
+
+ request.url = "https://otheraddress:42/foo"
+ assert request.scheme == "https"
+ assert request.host == "otheraddress"
+ assert request.port == 42
+ assert request.path == "/foo"
+
+ with raises(ValueError):
+ request.url = "not-a-url"
+
+ def test_pretty_host(self):
+ request = treq()
+ assert request.pretty_host == "address"
+ assert request.host == "address"
+ request.headers["host"] = "other"
+ assert request.pretty_host == "other"
+ assert request.host == "address"
+ request.host = None
+ assert request.pretty_host is None
+ assert request.host is None
+
+ # Invalid IDNA
+ request.headers["host"] = ".disqus.com"
+ assert request.pretty_host == ".disqus.com"
+
+ def test_pretty_url(self):
+ request = treq()
+ assert request.url == "http://address:22/path"
+ assert request.pretty_url == "http://address:22/path"
+ request.headers["host"] = "other"
+ assert request.pretty_url == "http://other:22/path"
+
+ def test_pretty_url_authority(self):
+ request = treq(first_line_format="authority")
+ assert request.pretty_url == "address:22"
+
+ def test_get_query(self):
+ request = treq()
+ assert request.query is None
+
+ request.url = "http://localhost:80/foo?bar=42"
+ assert request.query.lst == [("bar", "42")]
+
+ def test_set_query(self):
+ request = treq()
+ request.query = ODict([])
+
+ def test_get_cookies_none(self):
+ request = treq()
+ request.headers = Headers()
+ assert len(request.cookies) == 0
+
+ def test_get_cookies_single(self):
+ request = treq()
+ request.headers = Headers(cookie="cookiename=cookievalue")
+ result = request.cookies
+ assert len(result) == 1
+ assert result['cookiename'] == ['cookievalue']
+
+ def test_get_cookies_double(self):
+ request = treq()
+ request.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue")
+ result = request.cookies
+ assert len(result) == 2
+ assert result['cookiename'] == ['cookievalue']
+ assert result['othercookiename'] == ['othercookievalue']
+
+ def test_get_cookies_withequalsign(self):
+ request = treq()
+ request.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue")
+ result = request.cookies
+ assert len(result) == 2
+ assert result['cookiename'] == ['coo=kievalue']
+ assert result['othercookiename'] == ['othercookievalue']
+
+ def test_set_cookies(self):
+ request = treq()
+ request.headers = Headers(cookie="cookiename=cookievalue")
+ result = request.cookies
+ result["cookiename"] = ["foo"]
+ request.cookies = result
+ assert request.cookies["cookiename"] == ["foo"]
+
+ def test_get_path_components(self):
+ request = treq(path=b"/foo/bar")
+ assert request.path_components == ["foo", "bar"]
+
+ def test_set_path_components(self):
+ request = treq()
+ request.path_components = ["foo", "baz"]
+ assert request.path == "/foo/baz"
+ request.path_components = []
+ assert request.path == "/"
+
+ def test_anticache(self):
+ request = treq()
+ request.headers["If-Modified-Since"] = "foo"
+ request.headers["If-None-Match"] = "bar"
+ request.anticache()
+ assert "If-Modified-Since" not in request.headers
+ assert "If-None-Match" not in request.headers
+
+ def test_anticomp(self):
+ request = treq()
+ request.headers["Accept-Encoding"] = "foobar"
+ request.anticomp()
+ assert request.headers["Accept-Encoding"] == "identity"
+
+ def test_constrain_encoding(self):
+ request = treq()
+
+ h = request.headers.copy()
+ request.constrain_encoding() # no-op if there is no accept_encoding header.
+ assert request.headers == h
+
+ request.headers["Accept-Encoding"] = "identity, gzip, foo"
+ request.constrain_encoding()
+ assert "foo" not in request.headers["Accept-Encoding"]
+ assert "gzip" in request.headers["Accept-Encoding"]
+
+ def test_get_urlencoded_form(self):
+ request = treq(content="foobar")
+ assert request.urlencoded_form is None
+
+ request.headers["Content-Type"] = "application/x-www-form-urlencoded"
+ assert request.urlencoded_form == ODict(utils.urldecode(request.content))
+
+ def test_set_urlencoded_form(self):
+ request = treq()
+ request.urlencoded_form = ODict([('foo', 'bar'), ('rab', 'oof')])
+ assert request.headers["Content-Type"] == "application/x-www-form-urlencoded"
+ assert request.content
+
+ def test_get_multipart_form(self):
+ request = treq(content="foobar")
+ assert request.multipart_form is None
+
+ request.headers["Content-Type"] = "multipart/form-data"
+ assert request.multipart_form == ODict(
+ utils.multipartdecode(
+ request.headers,
+ request.content
+ )
+ )
diff --git a/test/netlib/http/test_response.py b/test/netlib/http/test_response.py
new file mode 100644
index 00000000..14588000
--- /dev/null
+++ b/test/netlib/http/test_response.py
@@ -0,0 +1,102 @@
+from __future__ import absolute_import, print_function, division
+
+import six
+
+from netlib.http import Headers
+from netlib.odict import ODict, ODictCaseless
+from netlib.tutils import raises, tresp
+from .test_message import _test_passthrough_attr, _test_decoded_attr
+
+
+class TestResponseData(object):
+ def test_init(self):
+ with raises(ValueError if six.PY2 else TypeError):
+ tresp(headers="foobar")
+
+ assert isinstance(tresp(headers=None).headers, Headers)
+
+
+class TestResponseCore(object):
+ """
+ Tests for builtins and the attributes that are directly proxied from the data structure
+ """
+ def test_repr(self):
+ response = tresp()
+ assert repr(response) == "Response(200 OK, unknown content type, 7B)"
+ response.content = None
+ assert repr(response) == "Response(200 OK, no content)"
+
+ def test_status_code(self):
+ _test_passthrough_attr(tresp(), "status_code")
+
+ def test_reason(self):
+ _test_decoded_attr(tresp(), "reason")
+
+
+class TestResponseUtils(object):
+ """
+ Tests for additional convenience methods.
+ """
+ def test_get_cookies_none(self):
+ resp = tresp()
+ resp.headers = Headers()
+ assert not resp.cookies
+
+ def test_get_cookies_empty(self):
+ resp = tresp()
+ resp.headers = Headers(set_cookie="")
+ assert not resp.cookies
+
+ def test_get_cookies_simple(self):
+ resp = tresp()
+ resp.headers = Headers(set_cookie="cookiename=cookievalue")
+ result = resp.cookies
+ assert len(result) == 1
+ assert "cookiename" in result
+ assert result["cookiename"][0] == ["cookievalue", ODict()]
+
+ def test_get_cookies_with_parameters(self):
+ resp = tresp()
+ resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly")
+ result = resp.cookies
+ assert len(result) == 1
+ assert "cookiename" in result
+ assert result["cookiename"][0][0] == "cookievalue"
+ attrs = result["cookiename"][0][1]
+ assert len(attrs) == 4
+ assert attrs["domain"] == ["example.com"]
+ assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
+ assert attrs["path"] == ["/"]
+ assert attrs["httponly"] == [None]
+
+ def test_get_cookies_no_value(self):
+ resp = tresp()
+ resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/")
+ result = resp.cookies
+ assert len(result) == 1
+ assert "cookiename" in result
+ assert result["cookiename"][0][0] == ""
+ assert len(result["cookiename"][0][1]) == 2
+
+ def test_get_cookies_twocookies(self):
+ resp = tresp()
+ resp.headers = Headers([
+ [b"Set-Cookie", b"cookiename=cookievalue"],
+ [b"Set-Cookie", b"othercookie=othervalue"]
+ ])
+ result = resp.cookies
+ assert len(result) == 2
+ assert "cookiename" in result
+ assert result["cookiename"][0] == ["cookievalue", ODict()]
+ assert "othercookie" in result
+ assert result["othercookie"][0] == ["othervalue", ODict()]
+
+ def test_set_cookies(self):
+ resp = tresp()
+ v = resp.cookies
+ v.add("foo", ["bar", ODictCaseless()])
+ resp.set_cookies(v)
+
+ v = resp.cookies
+ assert len(v) == 1
+ assert v["foo"] == [["bar", ODictCaseless()]]
diff --git a/test/netlib/http/test_status_codes.py b/test/netlib/http/test_status_codes.py
new file mode 100644
index 00000000..9fea6b70
--- /dev/null
+++ b/test/netlib/http/test_status_codes.py
@@ -0,0 +1,6 @@
+from netlib.http import status_codes
+
+
+def test_simple():
+ assert status_codes.IM_A_TEAPOT == 418
+ assert status_codes.RESPONSES[418] == "I'm a teapot"
diff --git a/test/netlib/http/test_user_agents.py b/test/netlib/http/test_user_agents.py
new file mode 100644
index 00000000..0bf1bba7
--- /dev/null
+++ b/test/netlib/http/test_user_agents.py
@@ -0,0 +1,6 @@
+from netlib.http import user_agents
+
+
+def test_get_shortcut():
+ assert user_agents.get_by_shortcut("c")[0] == "chrome"
+ assert not user_agents.get_by_shortcut("_")