aboutsummaryrefslogtreecommitdiffstats
path: root/test/netlib/http
diff options
context:
space:
mode:
Diffstat (limited to 'test/netlib/http')
-rw-r--r--test/netlib/http/__init__.py0
-rw-r--r--test/netlib/http/http1/__init__.py0
-rw-r--r--test/netlib/http/http1/test_assemble.py101
-rw-r--r--test/netlib/http/http1/test_read.py371
-rw-r--r--test/netlib/http/http2/__init__.py0
-rw-r--r--test/netlib/http/http2/test_framereader.py1
-rw-r--r--test/netlib/http/test_authentication.py122
-rw-r--r--test/netlib/http/test_cookies.py365
-rw-r--r--test/netlib/http/test_encoding.py73
-rw-r--r--test/netlib/http/test_headers.py106
-rw-r--r--test/netlib/http/test_message.py271
-rw-r--r--test/netlib/http/test_multipart.py24
-rw-r--r--test/netlib/http/test_request.py271
-rw-r--r--test/netlib/http/test_response.py145
-rw-r--r--test/netlib/http/test_status_codes.py6
-rw-r--r--test/netlib/http/test_url.py102
-rw-r--r--test/netlib/http/test_user_agents.py6
17 files changed, 0 insertions, 1964 deletions
diff --git a/test/netlib/http/__init__.py b/test/netlib/http/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/test/netlib/http/__init__.py
+++ /dev/null
diff --git a/test/netlib/http/http1/__init__.py b/test/netlib/http/http1/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/test/netlib/http/http1/__init__.py
+++ /dev/null
diff --git a/test/netlib/http/http1/test_assemble.py b/test/netlib/http/http1/test_assemble.py
deleted file mode 100644
index dac5fdad..00000000
--- a/test/netlib/http/http1/test_assemble.py
+++ /dev/null
@@ -1,101 +0,0 @@
-from mitmproxy import exceptions
-from netlib.http import Headers
-from netlib.http.http1.assemble import (
- assemble_request, assemble_request_head, assemble_response,
- assemble_response_head, _assemble_request_line, _assemble_request_headers,
- _assemble_response_headers,
- assemble_body)
-from mitmproxy.test.tutils import treq, raises, tresp
-
-
-def test_assemble_request():
- assert assemble_request(treq()) == (
- b"GET /path HTTP/1.1\r\n"
- b"header: qvalue\r\n"
- b"content-length: 7\r\n"
- b"host: address:22\r\n"
- b"\r\n"
- b"content"
- )
-
- with raises(exceptions.HttpException):
- assemble_request(treq(content=None))
-
-
-def test_assemble_request_head():
- c = assemble_request_head(treq(content=b"foo"))
- assert b"GET" in c
- assert b"qvalue" in c
- assert b"content-length" in c
- assert b"foo" not in c
-
-
-def test_assemble_response():
- assert assemble_response(tresp()) == (
- b"HTTP/1.1 200 OK\r\n"
- b"header-response: svalue\r\n"
- b"content-length: 7\r\n"
- b"\r\n"
- b"message"
- )
-
- with raises(exceptions.HttpException):
- assemble_response(tresp(content=None))
-
-
-def test_assemble_response_head():
- c = assemble_response_head(tresp())
- assert b"200" in c
- assert b"svalue" in c
- assert b"message" not in c
-
-
-def test_assemble_body():
- c = list(assemble_body(Headers(), [b"body"]))
- assert c == [b"body"]
-
- c = list(assemble_body(Headers(transfer_encoding="chunked"), [b"123456789a", b""]))
- assert c == [b"a\r\n123456789a\r\n", b"0\r\n\r\n"]
-
- c = list(assemble_body(Headers(transfer_encoding="chunked"), [b"123456789a"]))
- assert c == [b"a\r\n123456789a\r\n", b"0\r\n\r\n"]
-
-
-def test_assemble_request_line():
- assert _assemble_request_line(treq().data) == b"GET /path HTTP/1.1"
-
- authority_request = treq(method=b"CONNECT", first_line_format="authority").data
- assert _assemble_request_line(authority_request) == b"CONNECT address:22 HTTP/1.1"
-
- absolute_request = treq(first_line_format="absolute").data
- assert _assemble_request_line(absolute_request) == b"GET http://address:22/path HTTP/1.1"
-
- with raises(RuntimeError):
- _assemble_request_line(treq(first_line_format="invalid_form").data)
-
-
-def test_assemble_request_headers():
- # https://github.com/mitmproxy/mitmproxy/issues/186
- r = treq(content=b"")
- r.headers["Transfer-Encoding"] = "chunked"
- c = _assemble_request_headers(r.data)
- assert b"Transfer-Encoding" in c
-
-
-def test_assemble_request_headers_host_header():
- r = treq()
- r.headers = Headers()
- c = _assemble_request_headers(r.data)
- assert b"host" in c
-
- r.host = None
- c = _assemble_request_headers(r.data)
- assert b"host" not in c
-
-
-def test_assemble_response_headers():
- # https://github.com/mitmproxy/mitmproxy/issues/186
- r = tresp(content=b"")
- r.headers["Transfer-Encoding"] = "chunked"
- c = _assemble_response_headers(r)
- assert b"Transfer-Encoding" in c
diff --git a/test/netlib/http/http1/test_read.py b/test/netlib/http/http1/test_read.py
deleted file mode 100644
index eb96968c..00000000
--- a/test/netlib/http/http1/test_read.py
+++ /dev/null
@@ -1,371 +0,0 @@
-from io import BytesIO
-from mock import Mock
-import pytest
-
-from mitmproxy import exceptions
-from netlib.http import Headers
-from netlib.http.http1.read import (
- read_request, read_response, read_request_head,
- read_response_head, read_body, connection_close, expected_http_body_size, _get_first_line,
- _read_request_line, _parse_authority_form, _read_response_line, _check_http_version,
- _read_headers, _read_chunked, get_header_tokens
-)
-from mitmproxy.test.tutils import treq, tresp, raises
-
-
-def test_get_header_tokens():
- headers = Headers()
- assert get_header_tokens(headers, "foo") == []
- headers["foo"] = "bar"
- assert get_header_tokens(headers, "foo") == ["bar"]
- headers["foo"] = "bar, voing"
- assert get_header_tokens(headers, "foo") == ["bar", "voing"]
- headers.set_all("foo", ["bar, voing", "oink"])
- assert get_header_tokens(headers, "foo") == ["bar", "voing", "oink"]
-
-
-@pytest.mark.parametrize("input", [
- b"GET / HTTP/1.1\r\n\r\nskip",
- b"GET / HTTP/1.1\r\n\r\nskip",
- b"GET / HTTP/1.1\r\n\r\nskip",
- b"GET / HTTP/1.1 \r\n\r\nskip",
-])
-def test_read_request(input):
- rfile = BytesIO(input)
- r = read_request(rfile)
- assert r.method == "GET"
- assert r.content == b""
- assert r.http_version == "HTTP/1.1"
- assert r.timestamp_end
- assert rfile.read() == b"skip"
-
-
-@pytest.mark.parametrize("input", [
- b"CONNECT :0 0",
-])
-def test_read_request_error(input):
- rfile = BytesIO(input)
- raises(exceptions.HttpException, read_request, rfile)
-
-
-def test_read_request_head():
- rfile = BytesIO(
- b"GET / HTTP/1.1\r\n"
- b"Content-Length: 4\r\n"
- b"\r\n"
- b"skip"
- )
- rfile.reset_timestamps = Mock()
- rfile.first_byte_timestamp = 42
- r = read_request_head(rfile)
- assert r.method == "GET"
- assert r.headers["Content-Length"] == "4"
- assert r.content is None
- assert rfile.reset_timestamps.called
- assert r.timestamp_start == 42
- assert rfile.read() == b"skip"
-
-
-@pytest.mark.parametrize("input", [
- b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody",
- b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody",
- b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody",
- b"HTTP/1.1 418 I'm a teapot \r\n\r\nbody",
-])
-def test_read_response(input):
- req = treq()
- rfile = BytesIO(input)
- r = read_response(rfile, req)
- assert r.http_version == "HTTP/1.1"
- assert r.status_code == 418
- assert r.reason == "I'm a teapot"
- assert r.content == b"body"
- assert r.timestamp_end
-
-
-def test_read_response_head():
- rfile = BytesIO(
- b"HTTP/1.1 418 I'm a teapot\r\n"
- b"Content-Length: 4\r\n"
- b"\r\n"
- b"skip"
- )
- rfile.reset_timestamps = Mock()
- rfile.first_byte_timestamp = 42
- r = read_response_head(rfile)
- assert r.status_code == 418
- assert r.headers["Content-Length"] == "4"
- assert r.content is None
- assert rfile.reset_timestamps.called
- assert r.timestamp_start == 42
- assert rfile.read() == b"skip"
-
-
-class TestReadBody:
- def test_chunked(self):
- rfile = BytesIO(b"3\r\nfoo\r\n0\r\n\r\nbar")
- body = b"".join(read_body(rfile, None))
- assert body == b"foo"
- assert rfile.read() == b"bar"
-
- def test_known_size(self):
- rfile = BytesIO(b"foobar")
- body = b"".join(read_body(rfile, 3))
- assert body == b"foo"
- assert rfile.read() == b"bar"
-
- def test_known_size_limit(self):
- rfile = BytesIO(b"foobar")
- with raises(exceptions.HttpException):
- b"".join(read_body(rfile, 3, 2))
-
- def test_known_size_too_short(self):
- rfile = BytesIO(b"foo")
- with raises(exceptions.HttpException):
- b"".join(read_body(rfile, 6))
-
- def test_unknown_size(self):
- rfile = BytesIO(b"foobar")
- body = b"".join(read_body(rfile, -1))
- assert body == b"foobar"
-
- def test_unknown_size_limit(self):
- rfile = BytesIO(b"foobar")
- with raises(exceptions.HttpException):
- b"".join(read_body(rfile, -1, 3))
-
- def test_max_chunk_size(self):
- rfile = BytesIO(b"123456")
- assert list(read_body(rfile, -1, max_chunk_size=None)) == [b"123456"]
- rfile = BytesIO(b"123456")
- assert list(read_body(rfile, -1, max_chunk_size=1)) == [b"1", b"2", b"3", b"4", b"5", b"6"]
-
-
-def test_connection_close():
- headers = Headers()
- assert connection_close(b"HTTP/1.0", headers)
- assert not connection_close(b"HTTP/1.1", headers)
-
- headers["connection"] = "keep-alive"
- assert not connection_close(b"HTTP/1.1", headers)
-
- headers["connection"] = "close"
- assert connection_close(b"HTTP/1.1", headers)
-
- headers["connection"] = "foobar"
- assert connection_close(b"HTTP/1.0", headers)
- assert not connection_close(b"HTTP/1.1", headers)
-
-
-def test_expected_http_body_size():
- # Expect: 100-continue
- assert expected_http_body_size(
- treq(headers=Headers(expect="100-continue", content_length="42"))
- ) == 0
-
- # http://tools.ietf.org/html/rfc7230#section-3.3
- assert expected_http_body_size(
- treq(method=b"HEAD"),
- tresp(headers=Headers(content_length="42"))
- ) == 0
- assert expected_http_body_size(
- treq(method=b"CONNECT"),
- tresp()
- ) == 0
- for code in (100, 204, 304):
- assert expected_http_body_size(
- treq(),
- tresp(status_code=code)
- ) == 0
-
- # chunked
- assert expected_http_body_size(
- treq(headers=Headers(transfer_encoding="chunked")),
- ) is None
-
- # explicit length
- for val in (b"foo", b"-7"):
- with raises(exceptions.HttpSyntaxException):
- expected_http_body_size(
- treq(headers=Headers(content_length=val))
- )
- assert expected_http_body_size(
- treq(headers=Headers(content_length="42"))
- ) == 42
-
- # no length
- assert expected_http_body_size(
- treq(headers=Headers())
- ) == 0
- assert expected_http_body_size(
- treq(headers=Headers()), tresp(headers=Headers())
- ) == -1
-
-
-def test_get_first_line():
- rfile = BytesIO(b"foo\r\nbar")
- assert _get_first_line(rfile) == b"foo"
-
- rfile = BytesIO(b"\r\nfoo\r\nbar")
- assert _get_first_line(rfile) == b"foo"
-
- with raises(exceptions.HttpReadDisconnect):
- rfile = BytesIO(b"")
- _get_first_line(rfile)
-
- with raises(exceptions.HttpReadDisconnect):
- rfile = Mock()
- rfile.readline.side_effect = exceptions.TcpDisconnect
- _get_first_line(rfile)
-
-
-def test_read_request_line():
- def t(b):
- return _read_request_line(BytesIO(b))
-
- assert (t(b"GET / HTTP/1.1") ==
- ("relative", b"GET", None, None, None, b"/", b"HTTP/1.1"))
- assert (t(b"OPTIONS * HTTP/1.1") ==
- ("relative", b"OPTIONS", None, None, None, b"*", b"HTTP/1.1"))
- assert (t(b"CONNECT foo:42 HTTP/1.1") ==
- ("authority", b"CONNECT", None, b"foo", 42, None, b"HTTP/1.1"))
- assert (t(b"GET http://foo:42/bar HTTP/1.1") ==
- ("absolute", b"GET", b"http", b"foo", 42, b"/bar", b"HTTP/1.1"))
-
- with raises(exceptions.HttpSyntaxException):
- t(b"GET / WTF/1.1")
- with raises(exceptions.HttpSyntaxException):
- t(b"this is not http")
- with raises(exceptions.HttpReadDisconnect):
- t(b"")
-
-
-def test_parse_authority_form():
- assert _parse_authority_form(b"foo:42") == (b"foo", 42)
- with raises(exceptions.HttpSyntaxException):
- _parse_authority_form(b"foo")
- with raises(exceptions.HttpSyntaxException):
- _parse_authority_form(b"foo:bar")
- with raises(exceptions.HttpSyntaxException):
- _parse_authority_form(b"foo:99999999")
- with raises(exceptions.HttpSyntaxException):
- _parse_authority_form(b"f\x00oo:80")
-
-
-def test_read_response_line():
- def t(b):
- return _read_response_line(BytesIO(b))
-
- assert t(b"HTTP/1.1 200 OK") == (b"HTTP/1.1", 200, b"OK")
- assert t(b"HTTP/1.1 200") == (b"HTTP/1.1", 200, b"")
-
- # https://github.com/mitmproxy/mitmproxy/issues/784
- assert t(b"HTTP/1.1 200 Non-Autoris\xc3\xa9") == (b"HTTP/1.1", 200, b"Non-Autoris\xc3\xa9")
-
- with raises(exceptions.HttpSyntaxException):
- assert t(b"HTTP/1.1")
-
- with raises(exceptions.HttpSyntaxException):
- t(b"HTTP/1.1 OK OK")
- with raises(exceptions.HttpSyntaxException):
- t(b"WTF/1.1 200 OK")
- with raises(exceptions.HttpReadDisconnect):
- t(b"")
-
-
-def test_check_http_version():
- _check_http_version(b"HTTP/0.9")
- _check_http_version(b"HTTP/1.0")
- _check_http_version(b"HTTP/1.1")
- _check_http_version(b"HTTP/2.0")
- with raises(exceptions.HttpSyntaxException):
- _check_http_version(b"WTF/1.0")
- with raises(exceptions.HttpSyntaxException):
- _check_http_version(b"HTTP/1.10")
- with raises(exceptions.HttpSyntaxException):
- _check_http_version(b"HTTP/1.b")
-
-
-class TestReadHeaders:
- @staticmethod
- def _read(data):
- return _read_headers(BytesIO(data))
-
- def test_read_simple(self):
- data = (
- b"Header: one\r\n"
- b"Header2: two\r\n"
- b"\r\n"
- )
- headers = self._read(data)
- assert headers.fields == ((b"Header", b"one"), (b"Header2", b"two"))
-
- def test_read_multi(self):
- data = (
- b"Header: one\r\n"
- b"Header: two\r\n"
- b"\r\n"
- )
- headers = self._read(data)
- assert headers.fields == ((b"Header", b"one"), (b"Header", b"two"))
-
- def test_read_continued(self):
- data = (
- b"Header: one\r\n"
- b"\ttwo\r\n"
- b"Header2: three\r\n"
- b"\r\n"
- )
- headers = self._read(data)
- assert headers.fields == ((b"Header", b"one\r\n two"), (b"Header2", b"three"))
-
- def test_read_continued_err(self):
- data = b"\tfoo: bar\r\n"
- with raises(exceptions.HttpSyntaxException):
- self._read(data)
-
- def test_read_err(self):
- data = b"foo"
- with raises(exceptions.HttpSyntaxException):
- self._read(data)
-
- def test_read_empty_name(self):
- data = b":foo"
- with raises(exceptions.HttpSyntaxException):
- self._read(data)
-
- def test_read_empty_value(self):
- data = b"bar:"
- headers = self._read(data)
- assert headers.fields == ((b"bar", b""),)
-
-
-def test_read_chunked():
- req = treq(content=None)
- req.headers["Transfer-Encoding"] = "chunked"
-
- data = b"1\r\na\r\n0\r\n"
- with raises(exceptions.HttpSyntaxException):
- b"".join(_read_chunked(BytesIO(data)))
-
- data = b"1\r\na\r\n0\r\n\r\n"
- assert b"".join(_read_chunked(BytesIO(data))) == b"a"
-
- data = b"\r\n\r\n1\r\na\r\n1\r\nb\r\n0\r\n\r\n"
- assert b"".join(_read_chunked(BytesIO(data))) == b"ab"
-
- data = b"\r\n"
- with raises("closed prematurely"):
- b"".join(_read_chunked(BytesIO(data)))
-
- data = b"1\r\nfoo"
- with raises("malformed chunked body"):
- b"".join(_read_chunked(BytesIO(data)))
-
- data = b"foo\r\nfoo"
- with raises(exceptions.HttpSyntaxException):
- b"".join(_read_chunked(BytesIO(data)))
-
- data = b"5\r\naaaaa\r\n0\r\n\r\n"
- with raises("too large"):
- b"".join(_read_chunked(BytesIO(data), limit=2))
diff --git a/test/netlib/http/http2/__init__.py b/test/netlib/http/http2/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/test/netlib/http/http2/__init__.py
+++ /dev/null
diff --git a/test/netlib/http/http2/test_framereader.py b/test/netlib/http/http2/test_framereader.py
deleted file mode 100644
index 41b73189..00000000
--- a/test/netlib/http/http2/test_framereader.py
+++ /dev/null
@@ -1 +0,0 @@
-# foobar
diff --git a/test/netlib/http/test_authentication.py b/test/netlib/http/test_authentication.py
deleted file mode 100644
index 5e04bbc5..00000000
--- a/test/netlib/http/test_authentication.py
+++ /dev/null
@@ -1,122 +0,0 @@
-import binascii
-
-from mitmproxy.test import tutils
-from netlib.http import authentication, Headers
-
-
-def test_parse_http_basic_auth():
- vals = ("basic", "foo", "bar")
- assert authentication.parse_http_basic_auth(
- authentication.assemble_http_basic_auth(*vals)
- ) == vals
- assert not authentication.parse_http_basic_auth("")
- assert not authentication.parse_http_basic_auth("foo bar")
- v = "basic " + binascii.b2a_base64(b"foo").decode("ascii")
- assert not authentication.parse_http_basic_auth(v)
-
-
-class TestPassManNonAnon:
-
- def test_simple(self):
- p = authentication.PassManNonAnon()
- assert not p.test("", "")
- assert p.test("user", "")
-
-
-class TestPassManHtpasswd:
-
- def test_file_errors(self):
- tutils.raises(
- "malformed htpasswd file",
- authentication.PassManHtpasswd,
- tutils.test_data.path("data/server.crt"))
-
- def test_simple(self):
- pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
-
- vals = ("basic", "test", "test")
- authentication.assemble_http_basic_auth(*vals)
- assert pm.test("test", "test")
- assert not pm.test("test", "foo")
- assert not pm.test("foo", "test")
- assert not pm.test("test", "")
- assert not pm.test("", "")
-
-
-class TestPassManSingleUser:
-
- def test_simple(self):
- pm = authentication.PassManSingleUser("test", "test")
- assert pm.test("test", "test")
- assert not pm.test("test", "foo")
- assert not pm.test("foo", "test")
-
-
-class TestNullProxyAuth:
-
- def test_simple(self):
- na = authentication.NullProxyAuth(authentication.PassManNonAnon())
- assert not na.auth_challenge_headers()
- assert na.authenticate("foo")
- na.clean({})
-
-
-class TestBasicProxyAuth:
-
- def test_simple(self):
- ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
- headers = Headers()
- assert ba.auth_challenge_headers()
- assert not ba.authenticate(headers)
-
- def test_authenticate_clean(self):
- ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
-
- headers = Headers()
- vals = ("basic", "foo", "bar")
- headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
- assert ba.authenticate(headers)
-
- ba.clean(headers)
- assert ba.AUTH_HEADER not in headers
-
- headers[ba.AUTH_HEADER] = ""
- assert not ba.authenticate(headers)
-
- headers[ba.AUTH_HEADER] = "foo"
- assert not ba.authenticate(headers)
-
- vals = ("foo", "foo", "bar")
- headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
- assert not ba.authenticate(headers)
-
- ba = authentication.BasicProxyAuth(authentication.PassMan(), "test")
- vals = ("basic", "foo", "bar")
- headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
- assert not ba.authenticate(headers)
-
-
-class Bunch:
- pass
-
-
-class TestAuthAction:
-
- def test_nonanonymous(self):
- m = Bunch()
- aa = authentication.NonanonymousAuthAction(None, "authenticator")
- aa(None, m, None, None)
- assert m.authenticator
-
- def test_singleuser(self):
- m = Bunch()
- aa = authentication.SingleuserAuthAction(None, "authenticator")
- aa(None, m, "foo:bar", None)
- assert m.authenticator
- tutils.raises("invalid", aa, None, m, "foo", None)
-
- def test_httppasswd(self):
- m = Bunch()
- aa = authentication.HtpasswdAuthAction(None, "authenticator")
- aa(None, m, tutils.test_data.path("data/htpasswd"), None)
- assert m.authenticator
diff --git a/test/netlib/http/test_cookies.py b/test/netlib/http/test_cookies.py
deleted file mode 100644
index ca10a69c..00000000
--- a/test/netlib/http/test_cookies.py
+++ /dev/null
@@ -1,365 +0,0 @@
-import time
-
-from netlib.http import cookies
-from mitmproxy.test.tutils import raises
-
-import mock
-
-cookie_pairs = [
- [
- "",
- []
- ],
- [
- "one=uno",
- [["one", "uno"]]
- ],
- [
- "one",
- [["one", None]]
- ],
- [
- "one=uno; two=due",
- [["one", "uno"], ["two", "due"]]
- ],
- [
- 'one="uno"; two="\due"',
- [["one", "uno"], ["two", "due"]]
- ],
- [
- 'one="un\\"o"',
- [["one", 'un"o']]
- ],
- [
- 'one="uno,due"',
- [["one", 'uno,due']]
- ],
- [
- "one=uno; two; three=tre",
- [["one", "uno"], ["two", None], ["three", "tre"]]
- ],
- [
- "_lvs2=zHai1+Hq+Tc2vmc2r4GAbdOI5Jopg3EwsdUT9g=; "
- "_rcc2=53VdltWl+Ov6ordflA==;",
- [
- ["_lvs2", "zHai1+Hq+Tc2vmc2r4GAbdOI5Jopg3EwsdUT9g="],
- ["_rcc2", "53VdltWl+Ov6ordflA=="]
- ]
- ]
-]
-
-
-def test_read_key():
- tokens = [
- [("foo", 0), ("foo", 3)],
- [("foo", 1), ("oo", 3)],
- [(" foo", 0), (" foo", 4)],
- [(" foo", 1), ("foo", 4)],
- [(" foo;", 1), ("foo", 4)],
- [(" foo=", 1), ("foo", 4)],
- [(" foo=bar", 1), ("foo", 4)],
- ]
- for q, a in tokens:
- assert cookies._read_key(*q) == a
-
-
-def test_read_quoted_string():
- tokens = [
- [('"foo" x', 0), ("foo", 5)],
- [('"f\oo" x', 0), ("foo", 6)],
- [(r'"f\\o" x', 0), (r"f\o", 6)],
- [(r'"f\\" x', 0), (r"f" + '\\', 5)],
- [('"fo\\\"" x', 0), ("fo\"", 6)],
- [('"foo" x', 7), ("", 8)],
- ]
- for q, a in tokens:
- assert cookies._read_quoted_string(*q) == a
-
-
-def test_read_cookie_pairs():
- vals = [
- [
- "one",
- [["one", None]]
- ],
- [
- "one=two",
- [["one", "two"]]
- ],
- [
- "one=",
- [["one", ""]]
- ],
- [
- 'one="two"',
- [["one", "two"]]
- ],
- [
- 'one="two"; three=four',
- [["one", "two"], ["three", "four"]]
- ],
- [
- 'one="two"; three=four; five',
- [["one", "two"], ["three", "four"], ["five", None]]
- ],
- [
- 'one="\\"two"; three=four',
- [["one", '"two'], ["three", "four"]]
- ],
- ]
- for s, lst in vals:
- ret, off = cookies._read_cookie_pairs(s)
- assert ret == lst
-
-
-def test_pairs_roundtrips():
- for s, expected in cookie_pairs:
- ret, off = cookies._read_cookie_pairs(s)
- assert ret == expected
-
- s2 = cookies._format_pairs(expected)
- ret, off = cookies._read_cookie_pairs(s2)
- assert ret == expected
-
-
-def test_cookie_roundtrips():
- for s, expected in cookie_pairs:
- ret = cookies.parse_cookie_header(s)
- assert ret == expected
-
- s2 = cookies.format_cookie_header(expected)
- ret = cookies.parse_cookie_header(s2)
- assert ret == expected
-
-
-def test_parse_set_cookie_pairs():
- pairs = [
- [
- "one=uno",
- [[
- ["one", "uno"]
- ]]
- ],
- [
- "one=un\x20",
- [[
- ["one", "un\x20"]
- ]]
- ],
- [
- "one=uno; foo",
- [[
- ["one", "uno"],
- ["foo", None]
- ]]
- ],
- [
- "mun=1.390.f60; "
- "expires=sun, 11-oct-2015 12:38:31 gmt; path=/; "
- "domain=b.aol.com",
- [[
- ["mun", "1.390.f60"],
- ["expires", "sun, 11-oct-2015 12:38:31 gmt"],
- ["path", "/"],
- ["domain", "b.aol.com"]
- ]]
- ],
- [
- r'rpb=190%3d1%2616726%3d1%2634832%3d1%2634874%3d1; '
- 'domain=.rubiconproject.com; '
- 'expires=mon, 11-may-2015 21:54:57 gmt; '
- 'path=/',
- [[
- ['rpb', r'190%3d1%2616726%3d1%2634832%3d1%2634874%3d1'],
- ['domain', '.rubiconproject.com'],
- ['expires', 'mon, 11-may-2015 21:54:57 gmt'],
- ['path', '/']
- ]]
- ],
- ]
- for s, expected in pairs:
- ret, off = cookies._read_set_cookie_pairs(s)
- assert ret == expected
-
- s2 = cookies._format_set_cookie_pairs(expected[0])
- ret2, off = cookies._read_set_cookie_pairs(s2)
- assert ret2 == expected
-
-
-def test_parse_set_cookie_header():
- def set_cookie_equal(obs, exp):
- assert obs[0] == exp[0]
- assert obs[1] == exp[1]
- assert obs[2].items(multi=True) == exp[2]
-
- vals = [
- [
- "", []
- ],
- [
- ";", []
- ],
- [
- "one=uno",
- [
- ("one", "uno", ())
- ]
- ],
- [
- "one=uno; foo=bar",
- [
- ("one", "uno", (("foo", "bar"),))
- ]
- ],
- [
- "one=uno; foo=bar; foo=baz",
- [
- ("one", "uno", (("foo", "bar"), ("foo", "baz")))
- ]
- ],
- # Comma Separated Variant of Set-Cookie Headers
- [
- "foo=bar, doo=dar",
- [
- ("foo", "bar", ()),
- ("doo", "dar", ()),
- ]
- ],
- [
- "foo=bar; path=/, doo=dar; roo=rar; zoo=zar",
- [
- ("foo", "bar", (("path", "/"),)),
- ("doo", "dar", (("roo", "rar"), ("zoo", "zar"))),
- ]
- ],
- [
- "foo=bar; expires=Mon, 24 Aug 2037",
- [
- ("foo", "bar", (("expires", "Mon, 24 Aug 2037"),)),
- ]
- ],
- [
- "foo=bar; expires=Mon, 24 Aug 2037 00:00:00 GMT, doo=dar",
- [
- ("foo", "bar", (("expires", "Mon, 24 Aug 2037 00:00:00 GMT"),)),
- ("doo", "dar", ()),
- ]
- ],
- ]
- for s, expected in vals:
- ret = cookies.parse_set_cookie_header(s)
- if expected:
- for i in range(len(expected)):
- set_cookie_equal(ret[i], expected[i])
-
- s2 = cookies.format_set_cookie_header(ret)
- ret2 = cookies.parse_set_cookie_header(s2)
- for i in range(len(expected)):
- set_cookie_equal(ret2[i], expected[i])
- else:
- assert not ret
-
-
-def test_refresh_cookie():
-
- # Invalid expires format, sent to us by Reddit.
- c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/"
- assert cookies.refresh_set_cookie_header(c, 60)
-
- c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
- assert "00:21:38" in cookies.refresh_set_cookie_header(c, 60)
-
- c = "foo,bar"
- with raises(ValueError):
- cookies.refresh_set_cookie_header(c, 60)
-
- # https://github.com/mitmproxy/mitmproxy/issues/773
- c = ">=A"
- assert cookies.refresh_set_cookie_header(c, 60)
-
- # https://github.com/mitmproxy/mitmproxy/issues/1118
- c = "foo:bar=bla"
- assert cookies.refresh_set_cookie_header(c, 0)
- c = "foo/bar=bla"
- assert cookies.refresh_set_cookie_header(c, 0)
-
-
-@mock.patch('time.time')
-def test_get_expiration_ts(*args):
- # Freeze time
- now_ts = 17
- time.time.return_value = now_ts
-
- CA = cookies.CookieAttrs
- F = cookies.get_expiration_ts
-
- assert F(CA([("Expires", "Thu, 01-Jan-1970 00:00:00 GMT")])) == 0
- assert F(CA([("Expires", "Mon, 24-Aug-2037 00:00:00 GMT")])) == 2134684800
-
- assert F(CA([("Max-Age", "0")])) == now_ts
- assert F(CA([("Max-Age", "31")])) == now_ts + 31
-
-
-def test_is_expired():
- CA = cookies.CookieAttrs
-
- # A cookie can be expired
- # by setting the expire time in the past
- assert cookies.is_expired(CA([("Expires", "Thu, 01-Jan-1970 00:00:00 GMT")]))
-
- # or by setting Max-Age to 0
- assert cookies.is_expired(CA([("Max-Age", "0")]))
-
- # or both
- assert cookies.is_expired(CA([("Expires", "Thu, 01-Jan-1970 00:00:00 GMT"), ("Max-Age", "0")]))
-
- assert not cookies.is_expired(CA([("Expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))
- assert not cookies.is_expired(CA([("Max-Age", "1")]))
- assert not cookies.is_expired(CA([("Expires", "Wed, 15-Jul-2037 00:00:00 GMT"), ("Max-Age", "1")]))
-
- assert not cookies.is_expired(CA([("Max-Age", "nan")]))
- assert not cookies.is_expired(CA([("Expires", "false")]))
-
-
-def test_group_cookies():
- CA = cookies.CookieAttrs
- groups = [
- [
- "one=uno; foo=bar; foo=baz",
- [
- ('one', 'uno', CA([])),
- ('foo', 'bar', CA([])),
- ('foo', 'baz', CA([]))
- ]
- ],
- [
- "one=uno; Path=/; foo=bar; Max-Age=0; foo=baz; expires=24-08-1993",
- [
- ('one', 'uno', CA([('Path', '/')])),
- ('foo', 'bar', CA([('Max-Age', '0')])),
- ('foo', 'baz', CA([('expires', '24-08-1993')]))
- ]
- ],
- [
- "one=uno;",
- [
- ('one', 'uno', CA([]))
- ]
- ],
- [
- "one=uno; Path=/; Max-Age=0; Expires=24-08-1993",
- [
- ('one', 'uno', CA([('Path', '/'), ('Max-Age', '0'), ('Expires', '24-08-1993')]))
- ]
- ],
- [
- "path=val; Path=/",
- [
- ('path', 'val', CA([('Path', '/')]))
- ]
- ]
- ]
-
- for c, expected in groups:
- observed = cookies.group_cookies(cookies.parse_cookie_header(c))
- assert observed == expected
diff --git a/test/netlib/http/test_encoding.py b/test/netlib/http/test_encoding.py
deleted file mode 100644
index 89600709..00000000
--- a/test/netlib/http/test_encoding.py
+++ /dev/null
@@ -1,73 +0,0 @@
-import mock
-import pytest
-
-from netlib.http import encoding
-from mitmproxy.test import tutils
-
-
-@pytest.mark.parametrize("encoder", [
- 'identity',
- 'none',
-])
-def test_identity(encoder):
- assert b"string" == encoding.decode(b"string", encoder)
- assert b"string" == encoding.encode(b"string", encoder)
- with tutils.raises(ValueError):
- encoding.encode(b"string", "nonexistent encoding")
-
-
-@pytest.mark.parametrize("encoder", [
- 'gzip',
- 'br',
- 'deflate',
-])
-def test_encoders(encoder):
- assert "" == encoding.decode("", encoder)
- assert b"" == encoding.decode(b"", encoder)
-
- assert "string" == encoding.decode(
- encoding.encode(
- "string",
- encoder
- ),
- encoder
- )
- assert b"string" == encoding.decode(
- encoding.encode(
- b"string",
- encoder
- ),
- encoder
- )
-
- with tutils.raises(ValueError):
- encoding.decode(b"foobar", encoder)
-
-
-def test_cache():
- decode_gzip = mock.MagicMock()
- decode_gzip.return_value = b"decoded"
- encode_gzip = mock.MagicMock()
- encode_gzip.return_value = b"encoded"
-
- with mock.patch.dict(encoding.custom_decode, gzip=decode_gzip):
- with mock.patch.dict(encoding.custom_encode, gzip=encode_gzip):
- assert encoding.decode(b"encoded", "gzip") == b"decoded"
- assert decode_gzip.call_count == 1
-
- # should be cached
- assert encoding.decode(b"encoded", "gzip") == b"decoded"
- assert decode_gzip.call_count == 1
-
- # the other way around as well
- assert encoding.encode(b"decoded", "gzip") == b"encoded"
- assert encode_gzip.call_count == 0
-
- # different encoding
- decode_gzip.return_value = b"bar"
- assert encoding.encode(b"decoded", "deflate") != b"decoded"
- assert encode_gzip.call_count == 0
-
- # This is not in the cache anymore
- assert encoding.encode(b"decoded", "gzip") == b"encoded"
- assert encode_gzip.call_count == 1
diff --git a/test/netlib/http/test_headers.py b/test/netlib/http/test_headers.py
deleted file mode 100644
index cac77d57..00000000
--- a/test/netlib/http/test_headers.py
+++ /dev/null
@@ -1,106 +0,0 @@
-import collections
-
-from netlib.http.headers import Headers, parse_content_type, assemble_content_type
-from mitmproxy.test.tutils import raises
-
-
-class TestHeaders:
- def _2host(self):
- return Headers(
- (
- (b"Host", b"example.com"),
- (b"host", b"example.org")
- )
- )
-
- def test_init(self):
- headers = Headers()
- assert len(headers) == 0
-
- headers = Headers([[b"Host", b"example.com"]])
- assert len(headers) == 1
- assert headers["Host"] == "example.com"
-
- headers = Headers(Host="example.com")
- assert len(headers) == 1
- assert headers["Host"] == "example.com"
-
- headers = Headers(
- [[b"Host", b"invalid"]],
- Host="example.com"
- )
- assert len(headers) == 1
- assert headers["Host"] == "example.com"
-
- headers = Headers(
- [[b"Host", b"invalid"], [b"Accept", b"text/plain"]],
- Host="example.com"
- )
- assert len(headers) == 2
- assert headers["Host"] == "example.com"
- assert headers["Accept"] == "text/plain"
-
- with raises(TypeError):
- Headers([[b"Host", u"not-bytes"]])
-
- def test_set(self):
- headers = Headers()
- headers[u"foo"] = u"1"
- headers[b"bar"] = b"2"
- headers["baz"] = b"3"
- with raises(TypeError):
- headers["foobar"] = 42
- assert len(headers) == 3
-
- def test_bytes(self):
- headers = Headers(Host="example.com")
- assert bytes(headers) == b"Host: example.com\r\n"
-
- headers = Headers([
- [b"Host", b"example.com"],
- [b"Accept", b"text/plain"]
- ])
- assert bytes(headers) == b"Host: example.com\r\nAccept: text/plain\r\n"
-
- headers = Headers()
- assert bytes(headers) == b""
-
- def test_replace_simple(self):
- headers = Headers(Host="example.com", Accept="text/plain")
- replacements = headers.replace("Host: ", "X-Host: ")
- assert replacements == 1
- assert headers["X-Host"] == "example.com"
- assert "Host" not in headers
- assert headers["Accept"] == "text/plain"
-
- def test_replace_multi(self):
- headers = self._2host()
- headers.replace(r"Host: example\.com", r"Host: example.de")
- assert headers.get_all("Host") == ["example.de", "example.org"]
-
- def test_replace_remove_spacer(self):
- headers = Headers(Host="example.com")
- replacements = headers.replace(r"Host: ", "X-Host ")
- assert replacements == 0
- assert headers["Host"] == "example.com"
-
- def test_replace_with_count(self):
- headers = Headers(Host="foobarfoo.com", Accept="foo/bar")
- replacements = headers.replace("foo", "bar", count=1)
- assert replacements == 1
-
-
-def test_parse_content_type():
- p = parse_content_type
- assert p("text/html") == ("text", "html", {})
- assert p("text") is None
-
- v = p("text/html; charset=UTF-8")
- assert v == ('text', 'html', {'charset': 'UTF-8'})
-
-
-def test_assemble_content_type():
- p = assemble_content_type
- assert p("text", "html", {}) == "text/html"
- assert p("text", "html", {"charset": "utf8"}) == "text/html; charset=utf8"
- assert p("text", "html", collections.OrderedDict([("charset", "utf8"), ("foo", "bar")])) == "text/html; charset=utf8; foo=bar"
diff --git a/test/netlib/http/test_message.py b/test/netlib/http/test_message.py
deleted file mode 100644
index 2bc8824f..00000000
--- a/test/netlib/http/test_message.py
+++ /dev/null
@@ -1,271 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from mitmproxy.test import tutils
-from netlib import http
-
-
-def _test_passthrough_attr(message, attr):
- assert getattr(message, attr) == getattr(message.data, attr)
- setattr(message, attr, b"foo")
- assert getattr(message.data, attr) == b"foo"
-
-
-def _test_decoded_attr(message, attr):
- assert getattr(message, attr) == getattr(message.data, attr).decode("utf8")
- # Set str, get raw bytes
- setattr(message, attr, "foo")
- assert getattr(message.data, attr) == b"foo"
- # Set raw bytes, get decoded
- setattr(message.data, attr, b"BAR") # use uppercase so that we can also cover request.method
- assert getattr(message, attr) == "BAR"
- # Set bytes, get raw bytes
- setattr(message, attr, b"baz")
- assert getattr(message.data, attr) == b"baz"
-
- # Set UTF8
- setattr(message, attr, "Non-Autorisé")
- assert getattr(message.data, attr) == b"Non-Autoris\xc3\xa9"
- # Don't fail on garbage
- setattr(message.data, attr, b"FOO\xBF\x00BAR")
- assert getattr(message, attr).startswith("FOO")
- assert getattr(message, attr).endswith("BAR")
- # foo.bar = foo.bar should not cause any side effects.
- d = getattr(message, attr)
- setattr(message, attr, d)
- assert getattr(message.data, attr) == b"FOO\xBF\x00BAR"
-
-
-class TestMessageData:
- def test_eq_ne(self):
- data = tutils.tresp(timestamp_start=42, timestamp_end=42).data
- same = tutils.tresp(timestamp_start=42, timestamp_end=42).data
- assert data == same
- assert not data != same
-
- other = tutils.tresp(content=b"foo").data
- assert not data == other
- assert data != other
-
- assert data != 0
-
-
-class TestMessage:
-
- def test_init(self):
- resp = tutils.tresp()
- assert resp.data
-
- def test_eq_ne(self):
- resp = tutils.tresp(timestamp_start=42, timestamp_end=42)
- same = tutils.tresp(timestamp_start=42, timestamp_end=42)
- assert resp == same
- assert not resp != same
-
- other = tutils.tresp(timestamp_start=0, timestamp_end=0)
- assert not resp == other
- assert resp != other
-
- assert resp != 0
-
- def test_serializable(self):
- resp = tutils.tresp()
- resp2 = http.Response.from_state(resp.get_state())
- assert resp == resp2
-
- def test_content_length_update(self):
- resp = tutils.tresp()
- resp.content = b"foo"
- assert resp.data.content == b"foo"
- assert resp.headers["content-length"] == "3"
- resp.content = b""
- assert resp.data.content == b""
- assert resp.headers["content-length"] == "0"
- resp.raw_content = b"bar"
- assert resp.data.content == b"bar"
- assert resp.headers["content-length"] == "0"
-
- def test_headers(self):
- _test_passthrough_attr(tutils.tresp(), "headers")
-
- def test_timestamp_start(self):
- _test_passthrough_attr(tutils.tresp(), "timestamp_start")
-
- def test_timestamp_end(self):
- _test_passthrough_attr(tutils.tresp(), "timestamp_end")
-
- def test_http_version(self):
- _test_decoded_attr(tutils.tresp(), "http_version")
-
- def test_replace(self):
- r = tutils.tresp()
- r.content = b"foofootoo"
- r.replace(b"foo", "gg")
- assert r.content == b"ggggtoo"
-
- r.content = b"foofootoo"
- r.replace(b"foo", "gg", count=1)
- assert r.content == b"ggfootoo"
-
-
-class TestMessageContentEncoding:
- def test_simple(self):
- r = tutils.tresp()
- assert r.raw_content == b"message"
- assert "content-encoding" not in r.headers
- r.encode("gzip")
-
- assert r.headers["content-encoding"]
- assert r.raw_content != b"message"
- assert r.content == b"message"
- assert r.raw_content != b"message"
-
- def test_modify(self):
- r = tutils.tresp()
- assert "content-encoding" not in r.headers
- r.encode("gzip")
-
- r.content = b"foo"
- assert r.raw_content != b"foo"
- r.decode()
- assert r.raw_content == b"foo"
-
- with tutils.raises(TypeError):
- r.content = u"foo"
-
- def test_unknown_ce(self):
- r = tutils.tresp()
- r.headers["content-encoding"] = "zopfli"
- r.raw_content = b"foo"
- with tutils.raises(ValueError):
- assert r.content
- assert r.headers["content-encoding"]
- assert r.get_content(strict=False) == b"foo"
-
- def test_cannot_decode(self):
- r = tutils.tresp()
- r.encode("gzip")
- r.raw_content = b"foo"
- with tutils.raises(ValueError):
- assert r.content
- assert r.headers["content-encoding"]
- assert r.get_content(strict=False) == b"foo"
-
- with tutils.raises(ValueError):
- r.decode()
- assert r.raw_content == b"foo"
- assert "content-encoding" in r.headers
-
- r.decode(strict=False)
- assert r.content == b"foo"
- assert "content-encoding" not in r.headers
-
- def test_none(self):
- r = tutils.tresp(content=None)
- assert r.content is None
- r.content = b"foo"
- assert r.content is not None
- r.content = None
- assert r.content is None
-
- def test_cannot_encode(self):
- r = tutils.tresp()
- r.encode("gzip")
- r.content = None
- assert r.headers["content-encoding"]
- assert r.raw_content is None
-
- r.headers["content-encoding"] = "zopfli"
- r.content = b"foo"
- assert "content-encoding" not in r.headers
- assert r.raw_content == b"foo"
-
- with tutils.raises(ValueError):
- r.encode("zopfli")
- assert r.raw_content == b"foo"
- assert "content-encoding" not in r.headers
-
-
-class TestMessageText:
- def test_simple(self):
- r = tutils.tresp(content=b'\xfc')
- assert r.raw_content == b"\xfc"
- assert r.content == b"\xfc"
- assert r.text == u"ü"
-
- r.encode("gzip")
- assert r.text == u"ü"
- r.decode()
- assert r.text == u"ü"
-
- r.headers["content-type"] = "text/html; charset=latin1"
- r.content = b"\xc3\xbc"
- assert r.text == u"ü"
- r.headers["content-type"] = "text/html; charset=utf8"
- assert r.text == u"ü"
-
- def test_guess_json(self):
- r = tutils.tresp(content=b'"\xc3\xbc"')
- r.headers["content-type"] = "application/json"
- assert r.text == u'"ü"'
-
- def test_none(self):
- r = tutils.tresp(content=None)
- assert r.text is None
- r.text = u"foo"
- assert r.text is not None
- r.text = None
- assert r.text is None
-
- def test_modify(self):
- r = tutils.tresp()
-
- r.text = u"ü"
- assert r.raw_content == b"\xfc"
-
- r.headers["content-type"] = "text/html; charset=utf8"
- r.text = u"ü"
- assert r.raw_content == b"\xc3\xbc"
- assert r.headers["content-length"] == "2"
-
- def test_unknown_ce(self):
- r = tutils.tresp()
- r.headers["content-type"] = "text/html; charset=wtf"
- r.raw_content = b"foo"
- with tutils.raises(ValueError):
- assert r.text == u"foo"
- assert r.get_text(strict=False) == u"foo"
-
- def test_cannot_decode(self):
- r = tutils.tresp()
- r.headers["content-type"] = "text/html; charset=utf8"
- r.raw_content = b"\xFF"
- with tutils.raises(ValueError):
- assert r.text
-
- assert r.get_text(strict=False) == '\udcff'
-
- def test_cannot_encode(self):
- r = tutils.tresp()
- r.content = None
- assert "content-type" not in r.headers
- assert r.raw_content is None
-
- r.headers["content-type"] = "text/html; charset=latin1; foo=bar"
- r.text = u"☃"
- assert r.headers["content-type"] == "text/html; charset=utf-8; foo=bar"
- assert r.raw_content == b'\xe2\x98\x83'
-
- r.headers["content-type"] = "gibberish"
- r.text = u"☃"
- assert r.headers["content-type"] == "text/plain; charset=utf-8"
- assert r.raw_content == b'\xe2\x98\x83'
-
- del r.headers["content-type"]
- r.text = u"☃"
- assert r.headers["content-type"] == "text/plain; charset=utf-8"
- assert r.raw_content == b'\xe2\x98\x83'
-
- r.headers["content-type"] = "text/html; charset=latin1"
- r.text = u'\udcff'
- assert r.headers["content-type"] == "text/html; charset=utf-8"
- assert r.raw_content == b"\xFF"
diff --git a/test/netlib/http/test_multipart.py b/test/netlib/http/test_multipart.py
deleted file mode 100644
index 1d7e0062..00000000
--- a/test/netlib/http/test_multipart.py
+++ /dev/null
@@ -1,24 +0,0 @@
-from netlib.http import Headers
-from netlib.http import multipart
-
-
-def test_decode():
- boundary = 'somefancyboundary'
- headers = Headers(
- content_type='multipart/form-data; boundary=' + boundary
- )
- content = (
- "--{0}\n"
- "Content-Disposition: form-data; name=\"field1\"\n\n"
- "value1\n"
- "--{0}\n"
- "Content-Disposition: form-data; name=\"field2\"\n\n"
- "value2\n"
- "--{0}--".format(boundary).encode()
- )
-
- form = multipart.decode(headers, content)
-
- assert len(form) == 2
- assert form[0] == (b"field1", b"value1")
- assert form[1] == (b"field2", b"value2")
diff --git a/test/netlib/http/test_request.py b/test/netlib/http/test_request.py
deleted file mode 100644
index ecfc1ba6..00000000
--- a/test/netlib/http/test_request.py
+++ /dev/null
@@ -1,271 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from netlib.http import Headers
-from mitmproxy.test.tutils import treq, raises
-from .test_message import _test_decoded_attr, _test_passthrough_attr
-
-
-class TestRequestData:
- def test_init(self):
- with raises(ValueError):
- treq(headers="foobar")
-
- assert isinstance(treq(headers=()).headers, Headers)
-
-
-class TestRequestCore:
- """
- Tests for addons and the attributes that are directly proxied from the data structure
- """
- def test_repr(self):
- request = treq()
- assert repr(request) == "Request(GET address:22/path)"
- request.host = None
- assert repr(request) == "Request(GET /path)"
-
- def replace(self):
- r = treq()
- r.path = b"foobarfoo"
- r.replace(b"foo", "bar")
- assert r.path == b"barbarbar"
-
- r.path = b"foobarfoo"
- r.replace(b"foo", "bar", count=1)
- assert r.path == b"barbarfoo"
-
- def test_first_line_format(self):
- _test_passthrough_attr(treq(), "first_line_format")
-
- def test_method(self):
- _test_decoded_attr(treq(), "method")
-
- def test_scheme(self):
- _test_decoded_attr(treq(), "scheme")
-
- def test_port(self):
- _test_passthrough_attr(treq(), "port")
-
- def test_path(self):
- req = treq()
- _test_decoded_attr(req, "path")
- # path can also be None.
- req.path = None
- assert req.path is None
- assert req.data.path is None
-
- def test_host(self):
- request = treq()
- assert request.host == request.data.host.decode("idna")
-
- # Test IDNA encoding
- # Set str, get raw bytes
- request.host = "ídna.example"
- assert request.data.host == b"xn--dna-qma.example"
- # Set raw bytes, get decoded
- request.data.host = b"xn--idn-gla.example"
- assert request.host == "idná.example"
- # Set bytes, get raw bytes
- request.host = b"xn--dn-qia9b.example"
- assert request.data.host == b"xn--dn-qia9b.example"
- # IDNA encoding is not bijective
- request.host = "fußball"
- assert request.host == "fussball"
-
- # Don't fail on garbage
- request.data.host = b"foo\xFF\x00bar"
- assert request.host.startswith("foo")
- assert request.host.endswith("bar")
- # foo.bar = foo.bar should not cause any side effects.
- d = request.host
- request.host = d
- assert request.data.host == b"foo\xFF\x00bar"
-
- def test_host_header_update(self):
- request = treq()
- assert "host" not in request.headers
- request.host = "example.com"
- assert "host" not in request.headers
-
- request.headers["Host"] = "foo"
- request.host = "example.org"
- assert request.headers["Host"] == "example.org"
-
-
-class TestRequestUtils:
- """
- Tests for additional convenience methods.
- """
- def test_url(self):
- request = treq()
- assert request.url == "http://address:22/path"
-
- request.url = "https://otheraddress:42/foo"
- assert request.scheme == "https"
- assert request.host == "otheraddress"
- assert request.port == 42
- assert request.path == "/foo"
-
- with raises(ValueError):
- request.url = "not-a-url"
-
- def test_url_options(self):
- request = treq(method=b"OPTIONS", path=b"*")
- assert request.url == "http://address:22"
-
- def test_url_authority(self):
- request = treq(first_line_format="authority")
- assert request.url == "address:22"
-
- def test_pretty_host(self):
- request = treq()
- # Without host header
- assert request.pretty_host == "address"
- assert request.host == "address"
- # Same port as self.port (22)
- request.headers["host"] = "other:22"
- assert request.pretty_host == "other"
- # Different ports
- request.headers["host"] = "other"
- assert request.pretty_host == "address"
- assert request.host == "address"
- # Empty host
- request.host = None
- assert request.pretty_host is None
- assert request.host is None
-
- # Invalid IDNA
- request.headers["host"] = ".disqus.com:22"
- assert request.pretty_host == ".disqus.com"
-
- def test_pretty_url(self):
- request = treq()
- # Without host header
- assert request.url == "http://address:22/path"
- assert request.pretty_url == "http://address:22/path"
- # Same port as self.port (22)
- request.headers["host"] = "other:22"
- assert request.pretty_url == "http://other:22/path"
- # Different ports
- request.headers["host"] = "other"
- assert request.pretty_url == "http://address:22/path"
-
- def test_pretty_url_options(self):
- request = treq(method=b"OPTIONS", path=b"*")
- assert request.pretty_url == "http://address:22"
-
- def test_pretty_url_authority(self):
- request = treq(first_line_format="authority")
- assert request.pretty_url == "address:22"
-
- def test_get_query(self):
- request = treq()
- assert not request.query
-
- request.url = "http://localhost:80/foo?bar=42"
- assert dict(request.query) == {"bar": "42"}
-
- def test_set_query(self):
- request = treq()
- assert not request.query
- request.query["foo"] = "bar"
- assert request.query["foo"] == "bar"
- assert request.path == "/path?foo=bar"
-
- def test_get_cookies_none(self):
- request = treq()
- request.headers = Headers()
- assert not request.cookies
-
- def test_get_cookies_single(self):
- request = treq()
- request.headers = Headers(cookie="cookiename=cookievalue")
- assert len(request.cookies) == 1
- assert request.cookies['cookiename'] == 'cookievalue'
-
- def test_get_cookies_double(self):
- request = treq()
- request.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue")
- result = request.cookies
- assert len(result) == 2
- assert result['cookiename'] == 'cookievalue'
- assert result['othercookiename'] == 'othercookievalue'
-
- def test_get_cookies_withequalsign(self):
- request = treq()
- request.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue")
- result = request.cookies
- assert len(result) == 2
- assert result['cookiename'] == 'coo=kievalue'
- assert result['othercookiename'] == 'othercookievalue'
-
- def test_set_cookies(self):
- request = treq()
- request.headers = Headers(cookie="cookiename=cookievalue")
- result = request.cookies
- result["cookiename"] = "foo"
- assert request.cookies["cookiename"] == "foo"
-
- def test_get_path_components(self):
- request = treq(path=b"/foo/bar")
- assert request.path_components == ("foo", "bar")
-
- def test_set_path_components(self):
- request = treq()
- request.path_components = ["foo", "baz"]
- assert request.path == "/foo/baz"
-
- request.path_components = []
- assert request.path == "/"
-
- request.path_components = ["foo", "baz"]
- request.query["hello"] = "hello"
- assert request.path_components == ("foo", "baz")
-
- request.path_components = ["abc"]
- assert request.path == "/abc?hello=hello"
-
- def test_anticache(self):
- request = treq()
- request.headers["If-Modified-Since"] = "foo"
- request.headers["If-None-Match"] = "bar"
- request.anticache()
- assert "If-Modified-Since" not in request.headers
- assert "If-None-Match" not in request.headers
-
- def test_anticomp(self):
- request = treq()
- request.headers["Accept-Encoding"] = "foobar"
- request.anticomp()
- assert request.headers["Accept-Encoding"] == "identity"
-
- def test_constrain_encoding(self):
- request = treq()
-
- h = request.headers.copy()
- request.constrain_encoding() # no-op if there is no accept_encoding header.
- assert request.headers == h
-
- request.headers["Accept-Encoding"] = "identity, gzip, foo"
- request.constrain_encoding()
- assert "foo" not in request.headers["Accept-Encoding"]
- assert "gzip" in request.headers["Accept-Encoding"]
-
- def test_get_urlencoded_form(self):
- request = treq(content=b"foobar=baz")
- assert not request.urlencoded_form
-
- request.headers["Content-Type"] = "application/x-www-form-urlencoded"
- assert list(request.urlencoded_form.items()) == [(b"foobar", b"baz")]
-
- def test_set_urlencoded_form(self):
- request = treq()
- request.urlencoded_form = [(b'foo', b'bar'), (b'rab', b'oof')]
- assert request.headers["Content-Type"] == "application/x-www-form-urlencoded"
- assert request.content
-
- def test_get_multipart_form(self):
- request = treq(content=b"foobar")
- assert not request.multipart_form
-
- request.headers["Content-Type"] = "multipart/form-data"
- assert list(request.multipart_form.items()) == []
diff --git a/test/netlib/http/test_response.py b/test/netlib/http/test_response.py
deleted file mode 100644
index 4a6fac62..00000000
--- a/test/netlib/http/test_response.py
+++ /dev/null
@@ -1,145 +0,0 @@
-import email
-
-import time
-
-from netlib.http import Headers
-from netlib.http import Response
-from netlib.http.cookies import CookieAttrs
-from mitmproxy.test.tutils import raises, tresp
-from .test_message import _test_passthrough_attr, _test_decoded_attr
-
-
-class TestResponseData:
- def test_init(self):
- with raises(ValueError):
- tresp(headers="foobar")
-
- assert isinstance(tresp(headers=()).headers, Headers)
-
-
-class TestResponseCore:
- """
- Tests for addons and the attributes that are directly proxied from the data structure
- """
- def test_repr(self):
- response = tresp()
- assert repr(response) == "Response(200 OK, unknown content type, 7b)"
- response.content = None
- assert repr(response) == "Response(200 OK, no content)"
-
- def test_make(self):
- r = Response.make()
- assert r.status_code == 200
- assert r.content == b""
-
- r = Response.make(418, "teatime")
- assert r.status_code == 418
- assert r.content == b"teatime"
- assert r.headers["content-length"] == "7"
-
- Response.make(content=b"foo")
- Response.make(content="foo")
- with raises(TypeError):
- Response.make(content=42)
-
- r = Response.make(headers=[(b"foo", b"bar")])
- assert r.headers["foo"] == "bar"
-
- r = Response.make(headers=({"foo": "baz"}))
- assert r.headers["foo"] == "baz"
-
- with raises(TypeError):
- Response.make(headers=42)
-
- def test_status_code(self):
- _test_passthrough_attr(tresp(), "status_code")
-
- def test_reason(self):
- _test_decoded_attr(tresp(), "reason")
-
-
-class TestResponseUtils:
- """
- Tests for additional convenience methods.
- """
- def test_get_cookies_none(self):
- resp = tresp()
- resp.headers = Headers()
- assert not resp.cookies
-
- def test_get_cookies_empty(self):
- resp = tresp()
- resp.headers = Headers(set_cookie="")
- assert not resp.cookies
-
- def test_get_cookies_simple(self):
- resp = tresp()
- resp.headers = Headers(set_cookie="cookiename=cookievalue")
- result = resp.cookies
- assert len(result) == 1
- assert "cookiename" in result
- assert result["cookiename"] == ("cookievalue", CookieAttrs())
-
- def test_get_cookies_with_parameters(self):
- resp = tresp()
- cookie = "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"
- resp.headers = Headers(set_cookie=cookie)
- result = resp.cookies
- assert len(result) == 1
- assert "cookiename" in result
- assert result["cookiename"][0] == "cookievalue"
- attrs = result["cookiename"][1]
- assert len(attrs) == 4
- assert attrs["domain"] == "example.com"
- assert attrs["expires"] == "Wed Oct 21 16:29:41 2015"
- assert attrs["path"] == "/"
- assert attrs["httponly"] is None
-
- def test_get_cookies_no_value(self):
- resp = tresp()
- resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/")
- result = resp.cookies
- assert len(result) == 1
- assert "cookiename" in result
- assert result["cookiename"][0] == ""
- assert len(result["cookiename"][1]) == 2
-
- def test_get_cookies_twocookies(self):
- resp = tresp()
- resp.headers = Headers([
- [b"Set-Cookie", b"cookiename=cookievalue"],
- [b"Set-Cookie", b"othercookie=othervalue"]
- ])
- result = resp.cookies
- assert len(result) == 2
- assert "cookiename" in result
- assert result["cookiename"] == ("cookievalue", CookieAttrs())
- assert "othercookie" in result
- assert result["othercookie"] == ("othervalue", CookieAttrs())
-
- def test_set_cookies(self):
- resp = tresp()
- resp.cookies["foo"] = ("bar", {})
-
- assert len(resp.cookies) == 1
- assert resp.cookies["foo"] == ("bar", CookieAttrs())
-
- def test_refresh(self):
- r = tresp()
- n = time.time()
- r.headers["date"] = email.utils.formatdate(n)
- pre = r.headers["date"]
- r.refresh(n)
- assert pre == r.headers["date"]
- r.refresh(n + 60)
-
- d = email.utils.parsedate_tz(r.headers["date"])
- d = email.utils.mktime_tz(d)
- # Weird that this is not exact...
- assert abs(60 - (d - n)) <= 1
-
- cookie = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
- r.headers["set-cookie"] = cookie
- r.refresh()
- # Cookie refreshing is tested in test_cookies, we just make sure that it's triggered here.
- assert cookie != r.headers["set-cookie"]
diff --git a/test/netlib/http/test_status_codes.py b/test/netlib/http/test_status_codes.py
deleted file mode 100644
index 9fea6b70..00000000
--- a/test/netlib/http/test_status_codes.py
+++ /dev/null
@@ -1,6 +0,0 @@
-from netlib.http import status_codes
-
-
-def test_simple():
- assert status_codes.IM_A_TEAPOT == 418
- assert status_codes.RESPONSES[418] == "I'm a teapot"
diff --git a/test/netlib/http/test_url.py b/test/netlib/http/test_url.py
deleted file mode 100644
index 7cea6c58..00000000
--- a/test/netlib/http/test_url.py
+++ /dev/null
@@ -1,102 +0,0 @@
-from mitmproxy.test import tutils
-from netlib.http import url
-
-
-def test_parse():
- with tutils.raises(ValueError):
- url.parse("")
-
- s, h, po, pa = url.parse(b"http://foo.com:8888/test")
- assert s == b"http"
- assert h == b"foo.com"
- assert po == 8888
- assert pa == b"/test"
-
- s, h, po, pa = url.parse("http://foo/bar")
- assert s == b"http"
- assert h == b"foo"
- assert po == 80
- assert pa == b"/bar"
-
- s, h, po, pa = url.parse(b"http://user:pass@foo/bar")
- assert s == b"http"
- assert h == b"foo"
- assert po == 80
- assert pa == b"/bar"
-
- s, h, po, pa = url.parse(b"http://foo")
- assert pa == b"/"
-
- s, h, po, pa = url.parse(b"https://foo")
- assert po == 443
-
- with tutils.raises(ValueError):
- url.parse(b"https://foo:bar")
-
- # Invalid IDNA
- with tutils.raises(ValueError):
- url.parse("http://\xfafoo")
- # Invalid PATH
- with tutils.raises(ValueError):
- url.parse("http:/\xc6/localhost:56121")
- # Null byte in host
- with tutils.raises(ValueError):
- url.parse("http://foo\0")
- # Port out of range
- _, _, port, _ = url.parse("http://foo:999999")
- assert port == 80
- # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
- with tutils.raises(ValueError):
- url.parse('http://lo[calhost')
-
-
-def test_unparse():
- assert url.unparse("http", "foo.com", 99, "") == "http://foo.com:99"
- assert url.unparse("http", "foo.com", 80, "/bar") == "http://foo.com/bar"
- assert url.unparse("https", "foo.com", 80, "") == "https://foo.com:80"
- assert url.unparse("https", "foo.com", 443, "") == "https://foo.com"
-
-
-surrogates = bytes(range(256)).decode("utf8", "surrogateescape")
-
-surrogates_quoted = (
- '%00%01%02%03%04%05%06%07%08%09%0A%0B%0C%0D%0E%0F'
- '%10%11%12%13%14%15%16%17%18%19%1A%1B%1C%1D%1E%1F'
- '%20%21%22%23%24%25%26%27%28%29%2A%2B%2C-./'
- '0123456789%3A%3B%3C%3D%3E%3F'
- '%40ABCDEFGHIJKLMNO'
- 'PQRSTUVWXYZ%5B%5C%5D%5E_'
- '%60abcdefghijklmno'
- 'pqrstuvwxyz%7B%7C%7D%7E%7F'
- '%80%81%82%83%84%85%86%87%88%89%8A%8B%8C%8D%8E%8F'
- '%90%91%92%93%94%95%96%97%98%99%9A%9B%9C%9D%9E%9F'
- '%A0%A1%A2%A3%A4%A5%A6%A7%A8%A9%AA%AB%AC%AD%AE%AF'
- '%B0%B1%B2%B3%B4%B5%B6%B7%B8%B9%BA%BB%BC%BD%BE%BF'
- '%C0%C1%C2%C3%C4%C5%C6%C7%C8%C9%CA%CB%CC%CD%CE%CF'
- '%D0%D1%D2%D3%D4%D5%D6%D7%D8%D9%DA%DB%DC%DD%DE%DF'
- '%E0%E1%E2%E3%E4%E5%E6%E7%E8%E9%EA%EB%EC%ED%EE%EF'
- '%F0%F1%F2%F3%F4%F5%F6%F7%F8%F9%FA%FB%FC%FD%FE%FF'
-)
-
-
-def test_encode():
- assert url.encode([('foo', 'bar')])
- assert url.encode([('foo', surrogates)])
-
-
-def test_decode():
- s = "one=two&three=four"
- assert len(url.decode(s)) == 2
- assert url.decode(surrogates)
-
-
-def test_quote():
- assert url.quote("foo") == "foo"
- assert url.quote("foo bar") == "foo%20bar"
- assert url.quote(surrogates) == surrogates_quoted
-
-
-def test_unquote():
- assert url.unquote("foo") == "foo"
- assert url.unquote("foo%20bar") == "foo bar"
- assert url.unquote(surrogates_quoted) == surrogates
diff --git a/test/netlib/http/test_user_agents.py b/test/netlib/http/test_user_agents.py
deleted file mode 100644
index 0bf1bba7..00000000
--- a/test/netlib/http/test_user_agents.py
+++ /dev/null
@@ -1,6 +0,0 @@
-from netlib.http import user_agents
-
-
-def test_get_shortcut():
- assert user_agents.get_by_shortcut("c")[0] == "chrome"
- assert not user_agents.get_by_shortcut("_")