aboutsummaryrefslogtreecommitdiffstats
path: root/netlib/test/http
diff options
context:
space:
mode:
Diffstat (limited to 'netlib/test/http')
-rw-r--r--netlib/test/http/__init__.py0
-rw-r--r--netlib/test/http/http1/__init__.py0
-rw-r--r--netlib/test/http/http1/test_assemble.py102
-rw-r--r--netlib/test/http/http1/test_read.py333
-rw-r--r--netlib/test/http/http2/__init__.py0
-rw-r--r--netlib/test/http/http2/test_connections.py540
-rw-r--r--netlib/test/http/test_authentication.py122
-rw-r--r--netlib/test/http/test_cookies.py218
-rw-r--r--netlib/test/http/test_headers.py152
-rw-r--r--netlib/test/http/test_message.py153
-rw-r--r--netlib/test/http/test_request.py238
-rw-r--r--netlib/test/http/test_response.py102
-rw-r--r--netlib/test/http/test_status_codes.py6
-rw-r--r--netlib/test/http/test_user_agents.py6
14 files changed, 0 insertions, 1972 deletions
diff --git a/netlib/test/http/__init__.py b/netlib/test/http/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/netlib/test/http/__init__.py
+++ /dev/null
diff --git a/netlib/test/http/http1/__init__.py b/netlib/test/http/http1/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/netlib/test/http/http1/__init__.py
+++ /dev/null
diff --git a/netlib/test/http/http1/test_assemble.py b/netlib/test/http/http1/test_assemble.py
deleted file mode 100644
index 31a62438..00000000
--- a/netlib/test/http/http1/test_assemble.py
+++ /dev/null
@@ -1,102 +0,0 @@
-from __future__ import absolute_import, print_function, division
-from netlib.exceptions import HttpException
-from netlib.http import CONTENT_MISSING, Headers
-from netlib.http.http1.assemble import (
- assemble_request, assemble_request_head, assemble_response,
- assemble_response_head, _assemble_request_line, _assemble_request_headers,
- _assemble_response_headers,
- assemble_body)
-from netlib.tutils import treq, raises, tresp
-
-
-def test_assemble_request():
- c = assemble_request(treq()) == (
- b"GET /path HTTP/1.1\r\n"
- b"header: qvalue\r\n"
- b"Host: address:22\r\n"
- b"Content-Length: 7\r\n"
- b"\r\n"
- b"content"
- )
-
- with raises(HttpException):
- assemble_request(treq(content=CONTENT_MISSING))
-
-
-def test_assemble_request_head():
- c = assemble_request_head(treq(content="foo"))
- assert b"GET" in c
- assert b"qvalue" in c
- assert b"content-length" in c
- assert b"foo" not in c
-
-
-def test_assemble_response():
- c = assemble_response(tresp()) == (
- b"HTTP/1.1 200 OK\r\n"
- b"header-response: svalue\r\n"
- b"Content-Length: 7\r\n"
- b"\r\n"
- b"message"
- )
-
- with raises(HttpException):
- assemble_response(tresp(content=CONTENT_MISSING))
-
-
-def test_assemble_response_head():
- c = assemble_response_head(tresp())
- assert b"200" in c
- assert b"svalue" in c
- assert b"message" not in c
-
-
-def test_assemble_body():
- c = list(assemble_body(Headers(), [b"body"]))
- assert c == [b"body"]
-
- c = list(assemble_body(Headers(transfer_encoding="chunked"), [b"123456789a", b""]))
- assert c == [b"a\r\n123456789a\r\n", b"0\r\n\r\n"]
-
- c = list(assemble_body(Headers(transfer_encoding="chunked"), [b"123456789a"]))
- assert c == [b"a\r\n123456789a\r\n", b"0\r\n\r\n"]
-
-
-def test_assemble_request_line():
- assert _assemble_request_line(treq().data) == b"GET /path HTTP/1.1"
-
- authority_request = treq(method=b"CONNECT", first_line_format="authority").data
- assert _assemble_request_line(authority_request) == b"CONNECT address:22 HTTP/1.1"
-
- absolute_request = treq(first_line_format="absolute").data
- assert _assemble_request_line(absolute_request) == b"GET http://address:22/path HTTP/1.1"
-
- with raises(RuntimeError):
- _assemble_request_line(treq(first_line_format="invalid_form").data)
-
-
-def test_assemble_request_headers():
- # https://github.com/mitmproxy/mitmproxy/issues/186
- r = treq(content=b"")
- r.headers["Transfer-Encoding"] = "chunked"
- c = _assemble_request_headers(r.data)
- assert b"Transfer-Encoding" in c
-
-
-def test_assemble_request_headers_host_header():
- r = treq()
- r.headers = Headers()
- c = _assemble_request_headers(r.data)
- assert b"host" in c
-
- r.host = None
- c = _assemble_request_headers(r.data)
- assert b"host" not in c
-
-
-def test_assemble_response_headers():
- # https://github.com/mitmproxy/mitmproxy/issues/186
- r = tresp(content=b"")
- r.headers["Transfer-Encoding"] = "chunked"
- c = _assemble_response_headers(r)
- assert b"Transfer-Encoding" in c
diff --git a/netlib/test/http/http1/test_read.py b/netlib/test/http/http1/test_read.py
deleted file mode 100644
index 90234070..00000000
--- a/netlib/test/http/http1/test_read.py
+++ /dev/null
@@ -1,333 +0,0 @@
-from __future__ import absolute_import, print_function, division
-from io import BytesIO
-import textwrap
-from mock import Mock
-from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect, TcpDisconnect
-from netlib.http import Headers
-from netlib.http.http1.read import (
- read_request, read_response, read_request_head,
- read_response_head, read_body, connection_close, expected_http_body_size, _get_first_line,
- _read_request_line, _parse_authority_form, _read_response_line, _check_http_version,
- _read_headers, _read_chunked
-)
-from netlib.tutils import treq, tresp, raises
-
-
-def test_read_request():
- rfile = BytesIO(b"GET / HTTP/1.1\r\n\r\nskip")
- r = read_request(rfile)
- assert r.method == "GET"
- assert r.content == b""
- assert r.timestamp_end
- assert rfile.read() == b"skip"
-
-
-def test_read_request_head():
- rfile = BytesIO(
- b"GET / HTTP/1.1\r\n"
- b"Content-Length: 4\r\n"
- b"\r\n"
- b"skip"
- )
- rfile.reset_timestamps = Mock()
- rfile.first_byte_timestamp = 42
- r = read_request_head(rfile)
- assert r.method == "GET"
- assert r.headers["Content-Length"] == "4"
- assert r.content is None
- assert rfile.reset_timestamps.called
- assert r.timestamp_start == 42
- assert rfile.read() == b"skip"
-
-
-def test_read_response():
- req = treq()
- rfile = BytesIO(b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody")
- r = read_response(rfile, req)
- assert r.status_code == 418
- assert r.content == b"body"
- assert r.timestamp_end
-
-
-def test_read_response_head():
- rfile = BytesIO(
- b"HTTP/1.1 418 I'm a teapot\r\n"
- b"Content-Length: 4\r\n"
- b"\r\n"
- b"skip"
- )
- rfile.reset_timestamps = Mock()
- rfile.first_byte_timestamp = 42
- r = read_response_head(rfile)
- assert r.status_code == 418
- assert r.headers["Content-Length"] == "4"
- assert r.content is None
- assert rfile.reset_timestamps.called
- assert r.timestamp_start == 42
- assert rfile.read() == b"skip"
-
-
-class TestReadBody(object):
- def test_chunked(self):
- rfile = BytesIO(b"3\r\nfoo\r\n0\r\n\r\nbar")
- body = b"".join(read_body(rfile, None))
- assert body == b"foo"
- assert rfile.read() == b"bar"
-
- def test_known_size(self):
- rfile = BytesIO(b"foobar")
- body = b"".join(read_body(rfile, 3))
- assert body == b"foo"
- assert rfile.read() == b"bar"
-
- def test_known_size_limit(self):
- rfile = BytesIO(b"foobar")
- with raises(HttpException):
- b"".join(read_body(rfile, 3, 2))
-
- def test_known_size_too_short(self):
- rfile = BytesIO(b"foo")
- with raises(HttpException):
- b"".join(read_body(rfile, 6))
-
- def test_unknown_size(self):
- rfile = BytesIO(b"foobar")
- body = b"".join(read_body(rfile, -1))
- assert body == b"foobar"
-
- def test_unknown_size_limit(self):
- rfile = BytesIO(b"foobar")
- with raises(HttpException):
- b"".join(read_body(rfile, -1, 3))
-
- def test_max_chunk_size(self):
- rfile = BytesIO(b"123456")
- assert list(read_body(rfile, -1, max_chunk_size=None)) == [b"123456"]
- rfile = BytesIO(b"123456")
- assert list(read_body(rfile, -1, max_chunk_size=1)) == [b"1", b"2", b"3", b"4", b"5", b"6"]
-
-def test_connection_close():
- headers = Headers()
- assert connection_close(b"HTTP/1.0", headers)
- assert not connection_close(b"HTTP/1.1", headers)
-
- headers["connection"] = "keep-alive"
- assert not connection_close(b"HTTP/1.1", headers)
-
- headers["connection"] = "close"
- assert connection_close(b"HTTP/1.1", headers)
-
- headers["connection"] = "foobar"
- assert connection_close(b"HTTP/1.0", headers)
- assert not connection_close(b"HTTP/1.1", headers)
-
-def test_expected_http_body_size():
- # Expect: 100-continue
- assert expected_http_body_size(
- treq(headers=Headers(expect="100-continue", content_length="42"))
- ) == 0
-
- # http://tools.ietf.org/html/rfc7230#section-3.3
- assert expected_http_body_size(
- treq(method=b"HEAD"),
- tresp(headers=Headers(content_length="42"))
- ) == 0
- assert expected_http_body_size(
- treq(method=b"CONNECT"),
- tresp()
- ) == 0
- for code in (100, 204, 304):
- assert expected_http_body_size(
- treq(),
- tresp(status_code=code)
- ) == 0
-
- # chunked
- assert expected_http_body_size(
- treq(headers=Headers(transfer_encoding="chunked")),
- ) is None
-
- # explicit length
- for val in (b"foo", b"-7"):
- with raises(HttpSyntaxException):
- expected_http_body_size(
- treq(headers=Headers(content_length=val))
- )
- assert expected_http_body_size(
- treq(headers=Headers(content_length="42"))
- ) == 42
-
- # no length
- assert expected_http_body_size(
- treq(headers=Headers())
- ) == 0
- assert expected_http_body_size(
- treq(headers=Headers()), tresp(headers=Headers())
- ) == -1
-
-
-def test_get_first_line():
- rfile = BytesIO(b"foo\r\nbar")
- assert _get_first_line(rfile) == b"foo"
-
- rfile = BytesIO(b"\r\nfoo\r\nbar")
- assert _get_first_line(rfile) == b"foo"
-
- with raises(HttpReadDisconnect):
- rfile = BytesIO(b"")
- _get_first_line(rfile)
-
- with raises(HttpReadDisconnect):
- rfile = Mock()
- rfile.readline.side_effect = TcpDisconnect
- _get_first_line(rfile)
-
-
-def test_read_request_line():
- def t(b):
- return _read_request_line(BytesIO(b))
-
- assert (t(b"GET / HTTP/1.1") ==
- ("relative", b"GET", None, None, None, b"/", b"HTTP/1.1"))
- assert (t(b"OPTIONS * HTTP/1.1") ==
- ("relative", b"OPTIONS", None, None, None, b"*", b"HTTP/1.1"))
- assert (t(b"CONNECT foo:42 HTTP/1.1") ==
- ("authority", b"CONNECT", None, b"foo", 42, None, b"HTTP/1.1"))
- assert (t(b"GET http://foo:42/bar HTTP/1.1") ==
- ("absolute", b"GET", b"http", b"foo", 42, b"/bar", b"HTTP/1.1"))
-
- with raises(HttpSyntaxException):
- t(b"GET / WTF/1.1")
- with raises(HttpSyntaxException):
- t(b"this is not http")
- with raises(HttpReadDisconnect):
- t(b"")
-
-def test_parse_authority_form():
- assert _parse_authority_form(b"foo:42") == (b"foo", 42)
- with raises(HttpSyntaxException):
- _parse_authority_form(b"foo")
- with raises(HttpSyntaxException):
- _parse_authority_form(b"foo:bar")
- with raises(HttpSyntaxException):
- _parse_authority_form(b"foo:99999999")
- with raises(HttpSyntaxException):
- _parse_authority_form(b"f\x00oo:80")
-
-
-def test_read_response_line():
- def t(b):
- return _read_response_line(BytesIO(b))
-
- assert t(b"HTTP/1.1 200 OK") == (b"HTTP/1.1", 200, b"OK")
- assert t(b"HTTP/1.1 200") == (b"HTTP/1.1", 200, b"")
-
- # https://github.com/mitmproxy/mitmproxy/issues/784
- assert t(b"HTTP/1.1 200 Non-Autoris\xc3\xa9") == (b"HTTP/1.1", 200, b"Non-Autoris\xc3\xa9")
-
- with raises(HttpSyntaxException):
- assert t(b"HTTP/1.1")
-
- with raises(HttpSyntaxException):
- t(b"HTTP/1.1 OK OK")
- with raises(HttpSyntaxException):
- t(b"WTF/1.1 200 OK")
- with raises(HttpReadDisconnect):
- t(b"")
-
-
-def test_check_http_version():
- _check_http_version(b"HTTP/0.9")
- _check_http_version(b"HTTP/1.0")
- _check_http_version(b"HTTP/1.1")
- _check_http_version(b"HTTP/2.0")
- with raises(HttpSyntaxException):
- _check_http_version(b"WTF/1.0")
- with raises(HttpSyntaxException):
- _check_http_version(b"HTTP/1.10")
- with raises(HttpSyntaxException):
- _check_http_version(b"HTTP/1.b")
-
-
-class TestReadHeaders(object):
- @staticmethod
- def _read(data):
- return _read_headers(BytesIO(data))
-
- def test_read_simple(self):
- data = (
- b"Header: one\r\n"
- b"Header2: two\r\n"
- b"\r\n"
- )
- headers = self._read(data)
- assert headers.fields == [[b"Header", b"one"], [b"Header2", b"two"]]
-
- def test_read_multi(self):
- data = (
- b"Header: one\r\n"
- b"Header: two\r\n"
- b"\r\n"
- )
- headers = self._read(data)
- assert headers.fields == [[b"Header", b"one"], [b"Header", b"two"]]
-
- def test_read_continued(self):
- data = (
- b"Header: one\r\n"
- b"\ttwo\r\n"
- b"Header2: three\r\n"
- b"\r\n"
- )
- headers = self._read(data)
- assert headers.fields == [[b"Header", b"one\r\n two"], [b"Header2", b"three"]]
-
- def test_read_continued_err(self):
- data = b"\tfoo: bar\r\n"
- with raises(HttpSyntaxException):
- self._read(data)
-
- def test_read_err(self):
- data = b"foo"
- with raises(HttpSyntaxException):
- self._read(data)
-
- def test_read_empty_name(self):
- data = b":foo"
- with raises(HttpSyntaxException):
- self._read(data)
-
- def test_read_empty_value(self):
- data = b"bar:"
- headers = self._read(data)
- assert headers.fields == [[b"bar", b""]]
-
-def test_read_chunked():
- req = treq(content=None)
- req.headers["Transfer-Encoding"] = "chunked"
-
- data = b"1\r\na\r\n0\r\n"
- with raises(HttpSyntaxException):
- b"".join(_read_chunked(BytesIO(data)))
-
- data = b"1\r\na\r\n0\r\n\r\n"
- assert b"".join(_read_chunked(BytesIO(data))) == b"a"
-
- data = b"\r\n\r\n1\r\na\r\n1\r\nb\r\n0\r\n\r\n"
- assert b"".join(_read_chunked(BytesIO(data))) == b"ab"
-
- data = b"\r\n"
- with raises("closed prematurely"):
- b"".join(_read_chunked(BytesIO(data)))
-
- data = b"1\r\nfoo"
- with raises("malformed chunked body"):
- b"".join(_read_chunked(BytesIO(data)))
-
- data = b"foo\r\nfoo"
- with raises(HttpSyntaxException):
- b"".join(_read_chunked(BytesIO(data)))
-
- data = b"5\r\naaaaa\r\n0\r\n\r\n"
- with raises("too large"):
- b"".join(_read_chunked(BytesIO(data), limit=2))
diff --git a/netlib/test/http/http2/__init__.py b/netlib/test/http/http2/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/netlib/test/http/http2/__init__.py
+++ /dev/null
diff --git a/netlib/test/http/http2/test_connections.py b/netlib/test/http/http2/test_connections.py
deleted file mode 100644
index a115fc7c..00000000
--- a/netlib/test/http/http2/test_connections.py
+++ /dev/null
@@ -1,540 +0,0 @@
-import OpenSSL
-import mock
-import codecs
-
-from hyperframe.frame import *
-
-from netlib import tcp, http, utils, tservers
-from netlib.tutils import raises
-from netlib.exceptions import TcpDisconnect
-from netlib.http.http2.connections import HTTP2Protocol, TCPHandler
-
-
-class TestTCPHandlerWrapper:
- def test_wrapped(self):
- h = TCPHandler(rfile='foo', wfile='bar')
- p = HTTP2Protocol(h)
- assert p.tcp_handler.rfile == 'foo'
- assert p.tcp_handler.wfile == 'bar'
-
- def test_direct(self):
- p = HTTP2Protocol(rfile='foo', wfile='bar')
- assert isinstance(p.tcp_handler, TCPHandler)
- assert p.tcp_handler.rfile == 'foo'
- assert p.tcp_handler.wfile == 'bar'
-
-
-class EchoHandler(tcp.BaseHandler):
- sni = None
-
- def handle(self):
- while True:
- v = self.rfile.safe_read(1)
- self.wfile.write(v)
- self.wfile.flush()
-
-
-class TestProtocol:
- @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_server_connection_preface")
- @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_client_connection_preface")
- def test_perform_connection_preface(self, mock_client_method, mock_server_method):
- protocol = HTTP2Protocol(is_server=False)
- protocol.connection_preface_performed = True
-
- protocol.perform_connection_preface()
- assert not mock_client_method.called
- assert not mock_server_method.called
-
- protocol.perform_connection_preface(force=True)
- assert mock_client_method.called
- assert not mock_server_method.called
-
- @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_server_connection_preface")
- @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_client_connection_preface")
- def test_perform_connection_preface_server(self, mock_client_method, mock_server_method):
- protocol = HTTP2Protocol(is_server=True)
- protocol.connection_preface_performed = True
-
- protocol.perform_connection_preface()
- assert not mock_client_method.called
- assert not mock_server_method.called
-
- protocol.perform_connection_preface(force=True)
- assert not mock_client_method.called
- assert mock_server_method.called
-
-
-class TestCheckALPNMatch(tservers.ServerTestBase):
- handler = EchoHandler
- ssl = dict(
- alpn_select=b'h2',
- )
-
- if OpenSSL._util.lib.Cryptography_HAS_ALPN:
-
- def test_check_alpn(self):
- c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl(alpn_protos=[b'h2'])
- protocol = HTTP2Protocol(c)
- assert protocol.check_alpn()
-
-
-class TestCheckALPNMismatch(tservers.ServerTestBase):
- handler = EchoHandler
- ssl = dict(
- alpn_select=None,
- )
-
- if OpenSSL._util.lib.Cryptography_HAS_ALPN:
-
- def test_check_alpn(self):
- c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl(alpn_protos=[b'h2'])
- protocol = HTTP2Protocol(c)
- with raises(NotImplementedError):
- protocol.check_alpn()
-
-
-class TestPerformServerConnectionPreface(tservers.ServerTestBase):
- class handler(tcp.BaseHandler):
-
- def handle(self):
- # send magic
- self.wfile.write(codecs.decode('505249202a20485454502f322e300d0a0d0a534d0d0a0d0a', 'hex_codec'))
- self.wfile.flush()
-
- # send empty settings frame
- self.wfile.write(codecs.decode('000000040000000000', 'hex_codec'))
- self.wfile.flush()
-
- # check empty settings frame
- raw = utils.http2_read_raw_frame(self.rfile)
- assert raw == codecs.decode('00000c040000000000000200000000000300000001', 'hex_codec')
-
- # check settings acknowledgement
- raw = utils.http2_read_raw_frame(self.rfile)
- assert raw == codecs.decode('000000040100000000', 'hex_codec')
-
- # send settings acknowledgement
- self.wfile.write(codecs.decode('000000040100000000', 'hex_codec'))
- self.wfile.flush()
-
- def test_perform_server_connection_preface(self):
- c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- protocol = HTTP2Protocol(c)
-
- assert not protocol.connection_preface_performed
- protocol.perform_server_connection_preface()
- assert protocol.connection_preface_performed
-
- with raises(TcpDisconnect):
- protocol.perform_server_connection_preface(force=True)
-
-
-class TestPerformClientConnectionPreface(tservers.ServerTestBase):
- class handler(tcp.BaseHandler):
-
- def handle(self):
- # check magic
- assert self.rfile.read(24) == HTTP2Protocol.CLIENT_CONNECTION_PREFACE
-
- # check empty settings frame
- assert self.rfile.read(9) ==\
- codecs.decode('000000040000000000', 'hex_codec')
-
- # send empty settings frame
- self.wfile.write(codecs.decode('000000040000000000', 'hex_codec'))
- self.wfile.flush()
-
- # check settings acknowledgement
- assert self.rfile.read(9) == \
- codecs.decode('000000040100000000', 'hex_codec')
-
- # send settings acknowledgement
- self.wfile.write(codecs.decode('000000040100000000', 'hex_codec'))
- self.wfile.flush()
-
- def test_perform_client_connection_preface(self):
- c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- protocol = HTTP2Protocol(c)
-
- assert not protocol.connection_preface_performed
- protocol.perform_client_connection_preface()
- assert protocol.connection_preface_performed
-
-
-class TestClientStreamIds(object):
- c = tcp.TCPClient(("127.0.0.1", 0))
- protocol = HTTP2Protocol(c)
-
- def test_client_stream_ids(self):
- assert self.protocol.current_stream_id is None
- assert self.protocol._next_stream_id() == 1
- assert self.protocol.current_stream_id == 1
- assert self.protocol._next_stream_id() == 3
- assert self.protocol.current_stream_id == 3
- assert self.protocol._next_stream_id() == 5
- assert self.protocol.current_stream_id == 5
-
-
-class TestServerStreamIds(object):
- c = tcp.TCPClient(("127.0.0.1", 0))
- protocol = HTTP2Protocol(c, is_server=True)
-
- def test_server_stream_ids(self):
- assert self.protocol.current_stream_id is None
- assert self.protocol._next_stream_id() == 2
- assert self.protocol.current_stream_id == 2
- assert self.protocol._next_stream_id() == 4
- assert self.protocol.current_stream_id == 4
- assert self.protocol._next_stream_id() == 6
- assert self.protocol.current_stream_id == 6
-
-
-class TestApplySettings(tservers.ServerTestBase):
- class handler(tcp.BaseHandler):
- def handle(self):
- # check settings acknowledgement
- assert self.rfile.read(9) == codecs.decode('000000040100000000', 'hex_codec')
- self.wfile.write("OK")
- self.wfile.flush()
- self.rfile.safe_read(9) # just to keep the connection alive a bit longer
-
- ssl = True
-
- def test_apply_settings(self):
- c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c)
-
- protocol._apply_settings({
- SettingsFrame.ENABLE_PUSH: 'foo',
- SettingsFrame.MAX_CONCURRENT_STREAMS: 'bar',
- SettingsFrame.INITIAL_WINDOW_SIZE: 'deadbeef',
- })
-
- assert c.rfile.safe_read(2) == b"OK"
-
- assert protocol.http2_settings[
- SettingsFrame.ENABLE_PUSH] == 'foo'
- assert protocol.http2_settings[
- SettingsFrame.MAX_CONCURRENT_STREAMS] == 'bar'
- assert protocol.http2_settings[
- SettingsFrame.INITIAL_WINDOW_SIZE] == 'deadbeef'
-
-
-class TestCreateHeaders(object):
- c = tcp.TCPClient(("127.0.0.1", 0))
-
- def test_create_headers(self):
- headers = http.Headers([
- (b':method', b'GET'),
- (b':path', b'index.html'),
- (b':scheme', b'https'),
- (b'foo', b'bar')])
-
- bytes = HTTP2Protocol(self.c)._create_headers(
- headers, 1, end_stream=True)
- assert b''.join(bytes) ==\
- codecs.decode('000014010500000001824488355217caf3a69a3f87408294e7838c767f', 'hex_codec')
-
- bytes = HTTP2Protocol(self.c)._create_headers(
- headers, 1, end_stream=False)
- assert b''.join(bytes) ==\
- codecs.decode('000014010400000001824488355217caf3a69a3f87408294e7838c767f', 'hex_codec')
-
- def test_create_headers_multiple_frames(self):
- headers = http.Headers([
- (b':method', b'GET'),
- (b':path', b'/'),
- (b':scheme', b'https'),
- (b'foo', b'bar'),
- (b'server', b'version')])
-
- protocol = HTTP2Protocol(self.c)
- protocol.http2_settings[SettingsFrame.MAX_FRAME_SIZE] = 8
- bytes = protocol._create_headers(headers, 1, end_stream=True)
- assert len(bytes) == 3
- assert bytes[0] == codecs.decode('000008010100000001828487408294e783', 'hex_codec')
- assert bytes[1] == codecs.decode('0000080900000000018c767f7685ee5b10', 'hex_codec')
- assert bytes[2] == codecs.decode('00000209040000000163d5', 'hex_codec')
-
-
-class TestCreateBody(object):
- c = tcp.TCPClient(("127.0.0.1", 0))
-
- def test_create_body_empty(self):
- protocol = HTTP2Protocol(self.c)
- bytes = protocol._create_body(b'', 1)
- assert b''.join(bytes) == b''
-
- def test_create_body_single_frame(self):
- protocol = HTTP2Protocol(self.c)
- bytes = protocol._create_body(b'foobar', 1)
- assert b''.join(bytes) == codecs.decode('000006000100000001666f6f626172', 'hex_codec')
-
- def test_create_body_multiple_frames(self):
- protocol = HTTP2Protocol(self.c)
- protocol.http2_settings[SettingsFrame.MAX_FRAME_SIZE] = 5
- bytes = protocol._create_body(b'foobarmehm42', 1)
- assert len(bytes) == 3
- assert bytes[0] == codecs.decode('000005000000000001666f6f6261', 'hex_codec')
- assert bytes[1] == codecs.decode('000005000000000001726d65686d', 'hex_codec')
- assert bytes[2] == codecs.decode('0000020001000000013432', 'hex_codec')
-
-
-class TestReadRequest(tservers.ServerTestBase):
- class handler(tcp.BaseHandler):
-
- def handle(self):
- self.wfile.write(
- codecs.decode('000003010400000001828487', 'hex_codec'))
- self.wfile.write(
- codecs.decode('000006000100000001666f6f626172', 'hex_codec'))
- self.wfile.flush()
- self.rfile.safe_read(9) # just to keep the connection alive a bit longer
-
- ssl = True
-
- def test_read_request(self):
- c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c, is_server=True)
- protocol.connection_preface_performed = True
-
- req = protocol.read_request(NotImplemented)
-
- assert req.stream_id
- assert req.headers.fields == [[b':method', b'GET'], [b':path', b'/'], [b':scheme', b'https']]
- assert req.content == b'foobar'
-
-
-class TestReadRequestRelative(tservers.ServerTestBase):
- class handler(tcp.BaseHandler):
- def handle(self):
- self.wfile.write(
- codecs.decode('00000c0105000000014287d5af7e4d5a777f4481f9', 'hex_codec'))
- self.wfile.flush()
-
- ssl = True
-
- def test_asterisk_form_in(self):
- c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c, is_server=True)
- protocol.connection_preface_performed = True
-
- req = protocol.read_request(NotImplemented)
-
- assert req.form_in == "relative"
- assert req.method == "OPTIONS"
- assert req.path == "*"
-
-
-class TestReadRequestAbsolute(tservers.ServerTestBase):
- class handler(tcp.BaseHandler):
- def handle(self):
- self.wfile.write(
- codecs.decode('00001901050000000182448d9d29aee30c0e492c2a1170426366871c92585422e085', 'hex_codec'))
- self.wfile.flush()
-
- ssl = True
-
- def test_absolute_form_in(self):
- c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c, is_server=True)
- protocol.connection_preface_performed = True
-
- req = protocol.read_request(NotImplemented)
-
- assert req.form_in == "absolute"
- assert req.scheme == "http"
- assert req.host == "address"
- assert req.port == 22
-
-
-class TestReadRequestConnect(tservers.ServerTestBase):
- class handler(tcp.BaseHandler):
- def handle(self):
- self.wfile.write(
- codecs.decode('00001b0105000000014287bdab4e9c17b7ff44871c92585422e08541871c92585422e085', 'hex_codec'))
- self.wfile.write(
- codecs.decode('00001d0105000000014287bdab4e9c17b7ff44882f91d35d055c87a741882f91d35d055c87a7', 'hex_codec'))
- self.wfile.flush()
-
- ssl = True
-
- def test_connect(self):
- c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c, is_server=True)
- protocol.connection_preface_performed = True
-
- req = protocol.read_request(NotImplemented)
- assert req.form_in == "authority"
- assert req.method == "CONNECT"
- assert req.host == "address"
- assert req.port == 22
-
- req = protocol.read_request(NotImplemented)
- assert req.form_in == "authority"
- assert req.method == "CONNECT"
- assert req.host == "example.com"
- assert req.port == 443
-
-
-class TestReadResponse(tservers.ServerTestBase):
- class handler(tcp.BaseHandler):
- def handle(self):
- self.wfile.write(
- codecs.decode('00000801040000002a88628594e78c767f', 'hex_codec'))
- self.wfile.write(
- codecs.decode('00000600010000002a666f6f626172', 'hex_codec'))
- self.wfile.flush()
- self.rfile.safe_read(9) # just to keep the connection alive a bit longer
-
- ssl = True
-
- def test_read_response(self):
- c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c)
- protocol.connection_preface_performed = True
-
- resp = protocol.read_response(NotImplemented, stream_id=42)
-
- assert resp.http_version == "HTTP/2.0"
- assert resp.status_code == 200
- assert resp.msg == ''
- assert resp.headers.fields == [[b':status', b'200'], [b'etag', b'foobar']]
- assert resp.content == b'foobar'
- assert resp.timestamp_end
-
-
-class TestReadEmptyResponse(tservers.ServerTestBase):
- class handler(tcp.BaseHandler):
- def handle(self):
- self.wfile.write(
- codecs.decode('00000801050000002a88628594e78c767f', 'hex_codec'))
- self.wfile.flush()
-
- ssl = True
-
- def test_read_empty_response(self):
- c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c)
- protocol.connection_preface_performed = True
-
- resp = protocol.read_response(NotImplemented, stream_id=42)
-
- assert resp.stream_id == 42
- assert resp.http_version == "HTTP/2.0"
- assert resp.status_code == 200
- assert resp.msg == ''
- assert resp.headers.fields == [[b':status', b'200'], [b'etag', b'foobar']]
- assert resp.content == b''
-
-
-class TestAssembleRequest(object):
- c = tcp.TCPClient(("127.0.0.1", 0))
-
- def test_request_simple(self):
- bytes = HTTP2Protocol(self.c).assemble_request(http.Request(
- b'',
- b'GET',
- b'https',
- b'',
- b'',
- b'/',
- b"HTTP/2.0",
- None,
- None,
- ))
- assert len(bytes) == 1
- assert bytes[0] == codecs.decode('00000d0105000000018284874188089d5c0b8170dc07', 'hex_codec')
-
- def test_request_with_stream_id(self):
- req = http.Request(
- b'',
- b'GET',
- b'https',
- b'',
- b'',
- b'/',
- b"HTTP/2.0",
- None,
- None,
- )
- req.stream_id = 0x42
- bytes = HTTP2Protocol(self.c).assemble_request(req)
- assert len(bytes) == 1
- assert bytes[0] == codecs.decode('00000d0105000000428284874188089d5c0b8170dc07', 'hex_codec')
-
- def test_request_with_body(self):
- bytes = HTTP2Protocol(self.c).assemble_request(http.Request(
- b'',
- b'GET',
- b'https',
- b'',
- b'',
- b'/',
- b"HTTP/2.0",
- http.Headers([(b'foo', b'bar')]),
- b'foobar',
- ))
- assert len(bytes) == 2
- assert bytes[0] ==\
- codecs.decode('0000150104000000018284874188089d5c0b8170dc07408294e7838c767f', 'hex_codec')
- assert bytes[1] ==\
- codecs.decode('000006000100000001666f6f626172', 'hex_codec')
-
-
-class TestAssembleResponse(object):
- c = tcp.TCPClient(("127.0.0.1", 0))
-
- def test_simple(self):
- bytes = HTTP2Protocol(self.c, is_server=True).assemble_response(http.Response(
- b"HTTP/2.0",
- 200,
- ))
- assert len(bytes) == 1
- assert bytes[0] ==\
- codecs.decode('00000101050000000288', 'hex_codec')
-
- def test_with_stream_id(self):
- resp = http.Response(
- b"HTTP/2.0",
- 200,
- )
- resp.stream_id = 0x42
- bytes = HTTP2Protocol(self.c, is_server=True).assemble_response(resp)
- assert len(bytes) == 1
- assert bytes[0] ==\
- codecs.decode('00000101050000004288', 'hex_codec')
-
- def test_with_body(self):
- bytes = HTTP2Protocol(self.c, is_server=True).assemble_response(http.Response(
- b"HTTP/2.0",
- 200,
- b'',
- http.Headers(foo=b"bar"),
- b'foobar'
- ))
- assert len(bytes) == 2
- assert bytes[0] ==\
- codecs.decode('00000901040000000288408294e7838c767f', 'hex_codec')
- assert bytes[1] ==\
- codecs.decode('000006000100000002666f6f626172', 'hex_codec')
diff --git a/netlib/test/http/test_authentication.py b/netlib/test/http/test_authentication.py
deleted file mode 100644
index 1df7cd9c..00000000
--- a/netlib/test/http/test_authentication.py
+++ /dev/null
@@ -1,122 +0,0 @@
-import binascii
-
-from netlib import tutils
-from netlib.http import authentication, Headers
-
-
-def test_parse_http_basic_auth():
- vals = ("basic", "foo", "bar")
- assert authentication.parse_http_basic_auth(
- authentication.assemble_http_basic_auth(*vals)
- ) == vals
- assert not authentication.parse_http_basic_auth("")
- assert not authentication.parse_http_basic_auth("foo bar")
- v = "basic " + binascii.b2a_base64(b"foo").decode("ascii")
- assert not authentication.parse_http_basic_auth(v)
-
-
-class TestPassManNonAnon:
-
- def test_simple(self):
- p = authentication.PassManNonAnon()
- assert not p.test("", "")
- assert p.test("user", "")
-
-
-class TestPassManHtpasswd:
-
- def test_file_errors(self):
- tutils.raises(
- "malformed htpasswd file",
- authentication.PassManHtpasswd,
- tutils.test_data.path("data/server.crt"))
-
- def test_simple(self):
- pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
-
- vals = ("basic", "test", "test")
- authentication.assemble_http_basic_auth(*vals)
- assert pm.test("test", "test")
- assert not pm.test("test", "foo")
- assert not pm.test("foo", "test")
- assert not pm.test("test", "")
- assert not pm.test("", "")
-
-
-class TestPassManSingleUser:
-
- def test_simple(self):
- pm = authentication.PassManSingleUser("test", "test")
- assert pm.test("test", "test")
- assert not pm.test("test", "foo")
- assert not pm.test("foo", "test")
-
-
-class TestNullProxyAuth:
-
- def test_simple(self):
- na = authentication.NullProxyAuth(authentication.PassManNonAnon())
- assert not na.auth_challenge_headers()
- assert na.authenticate("foo")
- na.clean({})
-
-
-class TestBasicProxyAuth:
-
- def test_simple(self):
- ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
- headers = Headers()
- assert ba.auth_challenge_headers()
- assert not ba.authenticate(headers)
-
- def test_authenticate_clean(self):
- ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
-
- headers = Headers()
- vals = ("basic", "foo", "bar")
- headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
- assert ba.authenticate(headers)
-
- ba.clean(headers)
- assert not ba.AUTH_HEADER in headers
-
- headers[ba.AUTH_HEADER] = ""
- assert not ba.authenticate(headers)
-
- headers[ba.AUTH_HEADER] = "foo"
- assert not ba.authenticate(headers)
-
- vals = ("foo", "foo", "bar")
- headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
- assert not ba.authenticate(headers)
-
- ba = authentication.BasicProxyAuth(authentication.PassMan(), "test")
- vals = ("basic", "foo", "bar")
- headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
- assert not ba.authenticate(headers)
-
-
-class Bunch:
- pass
-
-
-class TestAuthAction:
-
- def test_nonanonymous(self):
- m = Bunch()
- aa = authentication.NonanonymousAuthAction(None, "authenticator")
- aa(None, m, None, None)
- assert m.authenticator
-
- def test_singleuser(self):
- m = Bunch()
- aa = authentication.SingleuserAuthAction(None, "authenticator")
- aa(None, m, "foo:bar", None)
- assert m.authenticator
- tutils.raises("invalid", aa, None, m, "foo", None)
-
- def test_httppasswd(self):
- m = Bunch()
- aa = authentication.HtpasswdAuthAction(None, "authenticator")
- aa(None, m, tutils.test_data.path("data/htpasswd"), None)
- assert m.authenticator
diff --git a/netlib/test/http/test_cookies.py b/netlib/test/http/test_cookies.py
deleted file mode 100644
index 34bb64f2..00000000
--- a/netlib/test/http/test_cookies.py
+++ /dev/null
@@ -1,218 +0,0 @@
-from netlib.http import cookies
-
-
-def test_read_token():
- tokens = [
- [("foo", 0), ("foo", 3)],
- [("foo", 1), ("oo", 3)],
- [(" foo", 1), ("foo", 4)],
- [(" foo;", 1), ("foo", 4)],
- [(" foo=", 1), ("foo", 4)],
- [(" foo=bar", 1), ("foo", 4)],
- ]
- for q, a in tokens:
- assert cookies._read_token(*q) == a
-
-
-def test_read_quoted_string():
- tokens = [
- [('"foo" x', 0), ("foo", 5)],
- [('"f\oo" x', 0), ("foo", 6)],
- [(r'"f\\o" x', 0), (r"f\o", 6)],
- [(r'"f\\" x', 0), (r"f" + '\\', 5)],
- [('"fo\\\"" x', 0), ("fo\"", 6)],
- [('"foo" x', 7), ("", 8)],
- ]
- for q, a in tokens:
- assert cookies._read_quoted_string(*q) == a
-
-
-def test_read_pairs():
- vals = [
- [
- "one",
- [["one", None]]
- ],
- [
- "one=two",
- [["one", "two"]]
- ],
- [
- "one=",
- [["one", ""]]
- ],
- [
- 'one="two"',
- [["one", "two"]]
- ],
- [
- 'one="two"; three=four',
- [["one", "two"], ["three", "four"]]
- ],
- [
- 'one="two"; three=four; five',
- [["one", "two"], ["three", "four"], ["five", None]]
- ],
- [
- 'one="\\"two"; three=four',
- [["one", '"two'], ["three", "four"]]
- ],
- ]
- for s, lst in vals:
- ret, off = cookies._read_pairs(s)
- assert ret == lst
-
-
-def test_pairs_roundtrips():
- pairs = [
- [
- "",
- []
- ],
- [
- "one=uno",
- [["one", "uno"]]
- ],
- [
- "one",
- [["one", None]]
- ],
- [
- "one=uno; two=due",
- [["one", "uno"], ["two", "due"]]
- ],
- [
- 'one="uno"; two="\due"',
- [["one", "uno"], ["two", "due"]]
- ],
- [
- 'one="un\\"o"',
- [["one", 'un"o']]
- ],
- [
- 'one="uno,due"',
- [["one", 'uno,due']]
- ],
- [
- "one=uno; two; three=tre",
- [["one", "uno"], ["two", None], ["three", "tre"]]
- ],
- [
- "_lvs2=zHai1+Hq+Tc2vmc2r4GAbdOI5Jopg3EwsdUT9g=; "
- "_rcc2=53VdltWl+Ov6ordflA==;",
- [
- ["_lvs2", "zHai1+Hq+Tc2vmc2r4GAbdOI5Jopg3EwsdUT9g="],
- ["_rcc2", "53VdltWl+Ov6ordflA=="]
- ]
- ]
- ]
- for s, lst in pairs:
- ret, off = cookies._read_pairs(s)
- assert ret == lst
- s2 = cookies._format_pairs(lst)
- ret, off = cookies._read_pairs(s2)
- assert ret == lst
-
-
-def test_cookie_roundtrips():
- pairs = [
- [
- "one=uno",
- [["one", "uno"]]
- ],
- [
- "one=uno; two=due",
- [["one", "uno"], ["two", "due"]]
- ],
- ]
- for s, lst in pairs:
- ret = cookies.parse_cookie_header(s)
- assert ret.lst == lst
- s2 = cookies.format_cookie_header(ret)
- ret = cookies.parse_cookie_header(s2)
- assert ret.lst == lst
-
-
-def test_parse_set_cookie_pairs():
- pairs = [
- [
- "one=uno",
- [
- ["one", "uno"]
- ]
- ],
- [
- "one=un\x20",
- [
- ["one", "un\x20"]
- ]
- ],
- [
- "one=uno; foo",
- [
- ["one", "uno"],
- ["foo", None]
- ]
- ],
- [
- "mun=1.390.f60; "
- "expires=sun, 11-oct-2015 12:38:31 gmt; path=/; "
- "domain=b.aol.com",
- [
- ["mun", "1.390.f60"],
- ["expires", "sun, 11-oct-2015 12:38:31 gmt"],
- ["path", "/"],
- ["domain", "b.aol.com"]
- ]
- ],
- [
- r'rpb=190%3d1%2616726%3d1%2634832%3d1%2634874%3d1; '
- 'domain=.rubiconproject.com; '
- 'expires=mon, 11-may-2015 21:54:57 gmt; '
- 'path=/',
- [
- ['rpb', r'190%3d1%2616726%3d1%2634832%3d1%2634874%3d1'],
- ['domain', '.rubiconproject.com'],
- ['expires', 'mon, 11-may-2015 21:54:57 gmt'],
- ['path', '/']
- ]
- ],
- ]
- for s, lst in pairs:
- ret = cookies._parse_set_cookie_pairs(s)
- assert ret == lst
- s2 = cookies._format_set_cookie_pairs(ret)
- ret2 = cookies._parse_set_cookie_pairs(s2)
- assert ret2 == lst
-
-
-def test_parse_set_cookie_header():
- vals = [
- [
- "", None
- ],
- [
- ";", None
- ],
- [
- "one=uno",
- ("one", "uno", [])
- ],
- [
- "one=uno; foo=bar",
- ("one", "uno", [["foo", "bar"]])
- ]
- ]
- for s, expected in vals:
- ret = cookies.parse_set_cookie_header(s)
- if expected:
- assert ret[0] == expected[0]
- assert ret[1] == expected[1]
- assert ret[2].lst == expected[2]
- s2 = cookies.format_set_cookie_header(*ret)
- ret2 = cookies.parse_set_cookie_header(s2)
- assert ret2[0] == expected[0]
- assert ret2[1] == expected[1]
- assert ret2[2].lst == expected[2]
- else:
- assert ret is None
diff --git a/netlib/test/http/test_headers.py b/netlib/test/http/test_headers.py
deleted file mode 100644
index d50fee3e..00000000
--- a/netlib/test/http/test_headers.py
+++ /dev/null
@@ -1,152 +0,0 @@
-from netlib.http import Headers
-from netlib.tutils import raises
-
-
-class TestHeaders(object):
- def _2host(self):
- return Headers(
- [
- [b"Host", b"example.com"],
- [b"host", b"example.org"]
- ]
- )
-
- def test_init(self):
- headers = Headers()
- assert len(headers) == 0
-
- headers = Headers([[b"Host", b"example.com"]])
- assert len(headers) == 1
- assert headers["Host"] == "example.com"
-
- headers = Headers(Host="example.com")
- assert len(headers) == 1
- assert headers["Host"] == "example.com"
-
- headers = Headers(
- [[b"Host", b"invalid"]],
- Host="example.com"
- )
- assert len(headers) == 1
- assert headers["Host"] == "example.com"
-
- headers = Headers(
- [[b"Host", b"invalid"], [b"Accept", b"text/plain"]],
- Host="example.com"
- )
- assert len(headers) == 2
- assert headers["Host"] == "example.com"
- assert headers["Accept"] == "text/plain"
-
- with raises(ValueError):
- Headers([[b"Host", u"not-bytes"]])
-
- def test_getitem(self):
- headers = Headers(Host="example.com")
- assert headers["Host"] == "example.com"
- assert headers["host"] == "example.com"
- with raises(KeyError):
- _ = headers["Accept"]
-
- headers = self._2host()
- assert headers["Host"] == "example.com, example.org"
-
- def test_str(self):
- headers = Headers(Host="example.com")
- assert bytes(headers) == b"Host: example.com\r\n"
-
- headers = Headers([
- [b"Host", b"example.com"],
- [b"Accept", b"text/plain"]
- ])
- assert bytes(headers) == b"Host: example.com\r\nAccept: text/plain\r\n"
-
- headers = Headers()
- assert bytes(headers) == b""
-
- def test_setitem(self):
- headers = Headers()
- headers["Host"] = "example.com"
- assert "Host" in headers
- assert "host" in headers
- assert headers["Host"] == "example.com"
-
- headers["host"] = "example.org"
- assert "Host" in headers
- assert "host" in headers
- assert headers["Host"] == "example.org"
-
- headers["accept"] = "text/plain"
- assert len(headers) == 2
- assert "Accept" in headers
- assert "Host" in headers
-
- headers = self._2host()
- assert len(headers.fields) == 2
- headers["Host"] = "example.com"
- assert len(headers.fields) == 1
- assert "Host" in headers
-
- def test_delitem(self):
- headers = Headers(Host="example.com")
- assert len(headers) == 1
- del headers["host"]
- assert len(headers) == 0
- try:
- del headers["host"]
- except KeyError:
- assert True
- else:
- assert False
-
- headers = self._2host()
- del headers["Host"]
- assert len(headers) == 0
-
- def test_keys(self):
- headers = Headers(Host="example.com")
- assert list(headers.keys()) == ["Host"]
-
- headers = self._2host()
- assert list(headers.keys()) == ["Host"]
-
- def test_eq_ne(self):
- headers1 = Headers(Host="example.com")
- headers2 = Headers(host="example.com")
- assert not (headers1 == headers2)
- assert headers1 != headers2
-
- headers1 = Headers(Host="example.com")
- headers2 = Headers(Host="example.com")
- assert headers1 == headers2
- assert not (headers1 != headers2)
-
- assert headers1 != 42
-
- def test_get_all(self):
- headers = self._2host()
- assert headers.get_all("host") == ["example.com", "example.org"]
- assert headers.get_all("accept") == []
-
- def test_set_all(self):
- headers = Headers(Host="example.com")
- headers.set_all("Accept", ["text/plain"])
- assert len(headers) == 2
- assert "accept" in headers
-
- headers = self._2host()
- headers.set_all("Host", ["example.org"])
- assert headers["host"] == "example.org"
-
- headers.set_all("Host", ["example.org", "example.net"])
- assert headers["host"] == "example.org, example.net"
-
- def test_state(self):
- headers = self._2host()
- assert len(headers.get_state()) == 2
- assert headers == Headers.from_state(headers.get_state())
-
- headers2 = Headers()
- assert headers != headers2
- headers2.set_state(headers.get_state())
- assert headers == headers2
diff --git a/netlib/test/http/test_message.py b/netlib/test/http/test_message.py
deleted file mode 100644
index 4b1f4630..00000000
--- a/netlib/test/http/test_message.py
+++ /dev/null
@@ -1,153 +0,0 @@
-# -*- coding: utf-8 -*-
-from __future__ import absolute_import, print_function, division
-
-from netlib.http import decoded, Headers
-from netlib.tutils import tresp, raises
-
-
-def _test_passthrough_attr(message, attr):
- assert getattr(message, attr) == getattr(message.data, attr)
- setattr(message, attr, "foo")
- assert getattr(message.data, attr) == "foo"
-
-
-def _test_decoded_attr(message, attr):
- assert getattr(message, attr) == getattr(message.data, attr).decode("utf8")
- # Set str, get raw bytes
- setattr(message, attr, "foo")
- assert getattr(message.data, attr) == b"foo"
- # Set raw bytes, get decoded
- setattr(message.data, attr, b"BAR") # use uppercase so that we can also cover request.method
- assert getattr(message, attr) == "BAR"
- # Set bytes, get raw bytes
- setattr(message, attr, b"baz")
- assert getattr(message.data, attr) == b"baz"
-
- # Set UTF8
- setattr(message, attr, "Non-Autorisé")
- assert getattr(message.data, attr) == b"Non-Autoris\xc3\xa9"
- # Don't fail on garbage
- setattr(message.data, attr, b"FOO\xFF\x00BAR")
- assert getattr(message, attr).startswith("FOO")
- assert getattr(message, attr).endswith("BAR")
- # foo.bar = foo.bar should not cause any side effects.
- d = getattr(message, attr)
- setattr(message, attr, d)
- assert getattr(message.data, attr) == b"FOO\xFF\x00BAR"
-
-
-class TestMessageData(object):
- def test_eq_ne(self):
- data = tresp(timestamp_start=42, timestamp_end=42).data
- same = tresp(timestamp_start=42, timestamp_end=42).data
- assert data == same
- assert not data != same
-
- other = tresp(content=b"foo").data
- assert not data == other
- assert data != other
-
- assert data != 0
-
-
-class TestMessage(object):
-
- def test_init(self):
- resp = tresp()
- assert resp.data
-
- def test_eq_ne(self):
- resp = tresp(timestamp_start=42, timestamp_end=42)
- same = tresp(timestamp_start=42, timestamp_end=42)
- assert resp == same
- assert not resp != same
-
- other = tresp(timestamp_start=0, timestamp_end=0)
- assert not resp == other
- assert resp != other
-
- assert resp != 0
-
- def test_content_length_update(self):
- resp = tresp()
- resp.content = b"foo"
- assert resp.data.content == b"foo"
- assert resp.headers["content-length"] == "3"
- resp.content = b""
- assert resp.data.content == b""
- assert resp.headers["content-length"] == "0"
-
- def test_content_basic(self):
- _test_passthrough_attr(tresp(), "content")
-
- def test_headers(self):
- _test_passthrough_attr(tresp(), "headers")
-
- def test_timestamp_start(self):
- _test_passthrough_attr(tresp(), "timestamp_start")
-
- def test_timestamp_end(self):
- _test_passthrough_attr(tresp(), "timestamp_end")
-
- def teste_http_version(self):
- _test_decoded_attr(tresp(), "http_version")
-
-
-class TestDecodedDecorator(object):
-
- def test_simple(self):
- r = tresp()
- assert r.content == b"message"
- assert "content-encoding" not in r.headers
- assert r.encode("gzip")
-
- assert r.headers["content-encoding"]
- assert r.content != b"message"
- with decoded(r):
- assert "content-encoding" not in r.headers
- assert r.content == b"message"
- assert r.headers["content-encoding"]
- assert r.content != b"message"
-
- def test_modify(self):
- r = tresp()
- assert "content-encoding" not in r.headers
- assert r.encode("gzip")
-
- with decoded(r):
- r.content = b"foo"
-
- assert r.content != b"foo"
- r.decode()
- assert r.content == b"foo"
-
- def test_unknown_ce(self):
- r = tresp()
- r.headers["content-encoding"] = "zopfli"
- r.content = b"foo"
- with decoded(r):
- assert r.headers["content-encoding"]
- assert r.content == b"foo"
- assert r.headers["content-encoding"]
- assert r.content == b"foo"
-
- def test_cannot_decode(self):
- r = tresp()
- assert r.encode("gzip")
- r.content = b"foo"
- with decoded(r):
- assert r.headers["content-encoding"]
- assert r.content == b"foo"
- assert r.headers["content-encoding"]
- assert r.content != b"foo"
- r.decode()
- assert r.content == b"foo"
-
- def test_cannot_encode(self):
- r = tresp()
- assert r.encode("gzip")
- with decoded(r):
- r.content = None
-
- assert "content-encoding" not in r.headers
- assert r.content is None
diff --git a/netlib/test/http/test_request.py b/netlib/test/http/test_request.py
deleted file mode 100644
index 900b2cd1..00000000
--- a/netlib/test/http/test_request.py
+++ /dev/null
@@ -1,238 +0,0 @@
-# -*- coding: utf-8 -*-
-from __future__ import absolute_import, print_function, division
-
-import six
-
-from netlib import utils
-from netlib.http import Headers
-from netlib.odict import ODict
-from netlib.tutils import treq, raises
-from .test_message import _test_decoded_attr, _test_passthrough_attr
-
-
-class TestRequestData(object):
- def test_init(self):
- with raises(ValueError if six.PY2 else TypeError):
- treq(headers="foobar")
-
- assert isinstance(treq(headers=None).headers, Headers)
-
-
-class TestRequestCore(object):
- """
- Tests for builtins and the attributes that are directly proxied from the data structure
- """
- def test_repr(self):
- request = treq()
- assert repr(request) == "Request(GET address:22/path)"
- request.host = None
- assert repr(request) == "Request(GET /path)"
-
- def test_first_line_format(self):
- _test_passthrough_attr(treq(), "first_line_format")
-
- def test_method(self):
- _test_decoded_attr(treq(), "method")
-
- def test_scheme(self):
- _test_decoded_attr(treq(), "scheme")
-
- def test_port(self):
- _test_passthrough_attr(treq(), "port")
-
- def test_path(self):
- _test_decoded_attr(treq(), "path")
-
- def test_host(self):
- if six.PY2:
- from unittest import SkipTest
- raise SkipTest()
-
- request = treq()
- assert request.host == request.data.host.decode("idna")
-
- # Test IDNA encoding
- # Set str, get raw bytes
- request.host = "ídna.example"
- assert request.data.host == b"xn--dna-qma.example"
- # Set raw bytes, get decoded
- request.data.host = b"xn--idn-gla.example"
- assert request.host == "idná.example"
- # Set bytes, get raw bytes
- request.host = b"xn--dn-qia9b.example"
- assert request.data.host == b"xn--dn-qia9b.example"
- # IDNA encoding is not bijective
- request.host = "fußball"
- assert request.host == "fussball"
-
- # Don't fail on garbage
- request.data.host = b"foo\xFF\x00bar"
- assert request.host.startswith("foo")
- assert request.host.endswith("bar")
- # foo.bar = foo.bar should not cause any side effects.
- d = request.host
- request.host = d
- assert request.data.host == b"foo\xFF\x00bar"
-
- def test_host_header_update(self):
- request = treq()
- assert "host" not in request.headers
- request.host = "example.com"
- assert "host" not in request.headers
-
- request.headers["Host"] = "foo"
- request.host = "example.org"
- assert request.headers["Host"] == "example.org"
-
-
-class TestRequestUtils(object):
- """
- Tests for additional convenience methods.
- """
- def test_url(self):
- request = treq()
- assert request.url == "http://address:22/path"
-
- request.url = "https://otheraddress:42/foo"
- assert request.scheme == "https"
- assert request.host == "otheraddress"
- assert request.port == 42
- assert request.path == "/foo"
-
- with raises(ValueError):
- request.url = "not-a-url"
-
- def test_pretty_host(self):
- request = treq()
- assert request.pretty_host == "address"
- assert request.host == "address"
- request.headers["host"] = "other"
- assert request.pretty_host == "other"
- assert request.host == "address"
- request.host = None
- assert request.pretty_host is None
- assert request.host is None
-
- # Invalid IDNA
- request.headers["host"] = ".disqus.com"
- assert request.pretty_host == ".disqus.com"
-
- def test_pretty_url(self):
- request = treq()
- assert request.url == "http://address:22/path"
- assert request.pretty_url == "http://address:22/path"
- request.headers["host"] = "other"
- assert request.pretty_url == "http://other:22/path"
-
- def test_pretty_url_authority(self):
- request = treq(first_line_format="authority")
- assert request.pretty_url == "address:22"
-
- def test_get_query(self):
- request = treq()
- assert request.query is None
-
- request.url = "http://localhost:80/foo?bar=42"
- assert request.query.lst == [("bar", "42")]
-
- def test_set_query(self):
- request = treq()
- request.query = ODict([])
-
- def test_get_cookies_none(self):
- request = treq()
- request.headers = Headers()
- assert len(request.cookies) == 0
-
- def test_get_cookies_single(self):
- request = treq()
- request.headers = Headers(cookie="cookiename=cookievalue")
- result = request.cookies
- assert len(result) == 1
- assert result['cookiename'] == ['cookievalue']
-
- def test_get_cookies_double(self):
- request = treq()
- request.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue")
- result = request.cookies
- assert len(result) == 2
- assert result['cookiename'] == ['cookievalue']
- assert result['othercookiename'] == ['othercookievalue']
-
- def test_get_cookies_withequalsign(self):
- request = treq()
- request.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue")
- result = request.cookies
- assert len(result) == 2
- assert result['cookiename'] == ['coo=kievalue']
- assert result['othercookiename'] == ['othercookievalue']
-
- def test_set_cookies(self):
- request = treq()
- request.headers = Headers(cookie="cookiename=cookievalue")
- result = request.cookies
- result["cookiename"] = ["foo"]
- request.cookies = result
- assert request.cookies["cookiename"] == ["foo"]
-
- def test_get_path_components(self):
- request = treq(path=b"/foo/bar")
- assert request.path_components == ["foo", "bar"]
-
- def test_set_path_components(self):
- request = treq()
- request.path_components = ["foo", "baz"]
- assert request.path == "/foo/baz"
- request.path_components = []
- assert request.path == "/"
-
- def test_anticache(self):
- request = treq()
- request.headers["If-Modified-Since"] = "foo"
- request.headers["If-None-Match"] = "bar"
- request.anticache()
- assert "If-Modified-Since" not in request.headers
- assert "If-None-Match" not in request.headers
-
- def test_anticomp(self):
- request = treq()
- request.headers["Accept-Encoding"] = "foobar"
- request.anticomp()
- assert request.headers["Accept-Encoding"] == "identity"
-
- def test_constrain_encoding(self):
- request = treq()
-
- h = request.headers.copy()
- request.constrain_encoding() # no-op if there is no accept_encoding header.
- assert request.headers == h
-
- request.headers["Accept-Encoding"] = "identity, gzip, foo"
- request.constrain_encoding()
- assert "foo" not in request.headers["Accept-Encoding"]
- assert "gzip" in request.headers["Accept-Encoding"]
-
- def test_get_urlencoded_form(self):
- request = treq(content="foobar")
- assert request.urlencoded_form is None
-
- request.headers["Content-Type"] = "application/x-www-form-urlencoded"
- assert request.urlencoded_form == ODict(utils.urldecode(request.content))
-
- def test_set_urlencoded_form(self):
- request = treq()
- request.urlencoded_form = ODict([('foo', 'bar'), ('rab', 'oof')])
- assert request.headers["Content-Type"] == "application/x-www-form-urlencoded"
- assert request.content
-
- def test_get_multipart_form(self):
- request = treq(content="foobar")
- assert request.multipart_form is None
-
- request.headers["Content-Type"] = "multipart/form-data"
- assert request.multipart_form == ODict(
- utils.multipartdecode(
- request.headers,
- request.content
- )
- )
diff --git a/netlib/test/http/test_response.py b/netlib/test/http/test_response.py
deleted file mode 100644
index 14588000..00000000
--- a/netlib/test/http/test_response.py
+++ /dev/null
@@ -1,102 +0,0 @@
-from __future__ import absolute_import, print_function, division
-
-import six
-
-from netlib.http import Headers
-from netlib.odict import ODict, ODictCaseless
-from netlib.tutils import raises, tresp
-from .test_message import _test_passthrough_attr, _test_decoded_attr
-
-
-class TestResponseData(object):
- def test_init(self):
- with raises(ValueError if six.PY2 else TypeError):
- tresp(headers="foobar")
-
- assert isinstance(tresp(headers=None).headers, Headers)
-
-
-class TestResponseCore(object):
- """
- Tests for builtins and the attributes that are directly proxied from the data structure
- """
- def test_repr(self):
- response = tresp()
- assert repr(response) == "Response(200 OK, unknown content type, 7B)"
- response.content = None
- assert repr(response) == "Response(200 OK, no content)"
-
- def test_status_code(self):
- _test_passthrough_attr(tresp(), "status_code")
-
- def test_reason(self):
- _test_decoded_attr(tresp(), "reason")
-
-
-class TestResponseUtils(object):
- """
- Tests for additional convenience methods.
- """
- def test_get_cookies_none(self):
- resp = tresp()
- resp.headers = Headers()
- assert not resp.cookies
-
- def test_get_cookies_empty(self):
- resp = tresp()
- resp.headers = Headers(set_cookie="")
- assert not resp.cookies
-
- def test_get_cookies_simple(self):
- resp = tresp()
- resp.headers = Headers(set_cookie="cookiename=cookievalue")
- result = resp.cookies
- assert len(result) == 1
- assert "cookiename" in result
- assert result["cookiename"][0] == ["cookievalue", ODict()]
-
- def test_get_cookies_with_parameters(self):
- resp = tresp()
- resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly")
- result = resp.cookies
- assert len(result) == 1
- assert "cookiename" in result
- assert result["cookiename"][0][0] == "cookievalue"
- attrs = result["cookiename"][0][1]
- assert len(attrs) == 4
- assert attrs["domain"] == ["example.com"]
- assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
- assert attrs["path"] == ["/"]
- assert attrs["httponly"] == [None]
-
- def test_get_cookies_no_value(self):
- resp = tresp()
- resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/")
- result = resp.cookies
- assert len(result) == 1
- assert "cookiename" in result
- assert result["cookiename"][0][0] == ""
- assert len(result["cookiename"][0][1]) == 2
-
- def test_get_cookies_twocookies(self):
- resp = tresp()
- resp.headers = Headers([
- [b"Set-Cookie", b"cookiename=cookievalue"],
- [b"Set-Cookie", b"othercookie=othervalue"]
- ])
- result = resp.cookies
- assert len(result) == 2
- assert "cookiename" in result
- assert result["cookiename"][0] == ["cookievalue", ODict()]
- assert "othercookie" in result
- assert result["othercookie"][0] == ["othervalue", ODict()]
-
- def test_set_cookies(self):
- resp = tresp()
- v = resp.cookies
- v.add("foo", ["bar", ODictCaseless()])
- resp.set_cookies(v)
-
- v = resp.cookies
- assert len(v) == 1
- assert v["foo"] == [["bar", ODictCaseless()]]
diff --git a/netlib/test/http/test_status_codes.py b/netlib/test/http/test_status_codes.py
deleted file mode 100644
index 9fea6b70..00000000
--- a/netlib/test/http/test_status_codes.py
+++ /dev/null
@@ -1,6 +0,0 @@
-from netlib.http import status_codes
-
-
-def test_simple():
- assert status_codes.IM_A_TEAPOT == 418
- assert status_codes.RESPONSES[418] == "I'm a teapot"
diff --git a/netlib/test/http/test_user_agents.py b/netlib/test/http/test_user_agents.py
deleted file mode 100644
index 0bf1bba7..00000000
--- a/netlib/test/http/test_user_agents.py
+++ /dev/null
@@ -1,6 +0,0 @@
-from netlib.http import user_agents
-
-
-def test_get_shortcut():
- assert user_agents.get_by_shortcut("c")[0] == "chrome"
- assert not user_agents.get_by_shortcut("_")