diff options
Diffstat (limited to 'test/netlib/http')
-rw-r--r-- | test/netlib/http/__init__.py | 0 | ||||
-rw-r--r-- | test/netlib/http/http1/__init__.py | 0 | ||||
-rw-r--r-- | test/netlib/http/http1/test_assemble.py | 101 | ||||
-rw-r--r-- | test/netlib/http/http1/test_read.py | 371 | ||||
-rw-r--r-- | test/netlib/http/http2/__init__.py | 0 | ||||
-rw-r--r-- | test/netlib/http/http2/test_framereader.py | 1 | ||||
-rw-r--r-- | test/netlib/http/test_authentication.py | 122 | ||||
-rw-r--r-- | test/netlib/http/test_cookies.py | 365 | ||||
-rw-r--r-- | test/netlib/http/test_encoding.py | 73 | ||||
-rw-r--r-- | test/netlib/http/test_headers.py | 106 | ||||
-rw-r--r-- | test/netlib/http/test_message.py | 271 | ||||
-rw-r--r-- | test/netlib/http/test_multipart.py | 24 | ||||
-rw-r--r-- | test/netlib/http/test_request.py | 271 | ||||
-rw-r--r-- | test/netlib/http/test_response.py | 145 | ||||
-rw-r--r-- | test/netlib/http/test_status_codes.py | 6 | ||||
-rw-r--r-- | test/netlib/http/test_url.py | 102 | ||||
-rw-r--r-- | test/netlib/http/test_user_agents.py | 6 |
17 files changed, 0 insertions, 1964 deletions
diff --git a/test/netlib/http/__init__.py b/test/netlib/http/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/test/netlib/http/__init__.py +++ /dev/null diff --git a/test/netlib/http/http1/__init__.py b/test/netlib/http/http1/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/test/netlib/http/http1/__init__.py +++ /dev/null diff --git a/test/netlib/http/http1/test_assemble.py b/test/netlib/http/http1/test_assemble.py deleted file mode 100644 index dac5fdad..00000000 --- a/test/netlib/http/http1/test_assemble.py +++ /dev/null @@ -1,101 +0,0 @@ -from mitmproxy import exceptions -from netlib.http import Headers -from netlib.http.http1.assemble import ( - assemble_request, assemble_request_head, assemble_response, - assemble_response_head, _assemble_request_line, _assemble_request_headers, - _assemble_response_headers, - assemble_body) -from mitmproxy.test.tutils import treq, raises, tresp - - -def test_assemble_request(): - assert assemble_request(treq()) == ( - b"GET /path HTTP/1.1\r\n" - b"header: qvalue\r\n" - b"content-length: 7\r\n" - b"host: address:22\r\n" - b"\r\n" - b"content" - ) - - with raises(exceptions.HttpException): - assemble_request(treq(content=None)) - - -def test_assemble_request_head(): - c = assemble_request_head(treq(content=b"foo")) - assert b"GET" in c - assert b"qvalue" in c - assert b"content-length" in c - assert b"foo" not in c - - -def test_assemble_response(): - assert assemble_response(tresp()) == ( - b"HTTP/1.1 200 OK\r\n" - b"header-response: svalue\r\n" - b"content-length: 7\r\n" - b"\r\n" - b"message" - ) - - with raises(exceptions.HttpException): - assemble_response(tresp(content=None)) - - -def test_assemble_response_head(): - c = assemble_response_head(tresp()) - assert b"200" in c - assert b"svalue" in c - assert b"message" not in c - - -def test_assemble_body(): - c = list(assemble_body(Headers(), [b"body"])) - assert c == [b"body"] - - c = list(assemble_body(Headers(transfer_encoding="chunked"), [b"123456789a", b""])) - assert c == [b"a\r\n123456789a\r\n", b"0\r\n\r\n"] - - c = list(assemble_body(Headers(transfer_encoding="chunked"), [b"123456789a"])) - assert c == [b"a\r\n123456789a\r\n", b"0\r\n\r\n"] - - -def test_assemble_request_line(): - assert _assemble_request_line(treq().data) == b"GET /path HTTP/1.1" - - authority_request = treq(method=b"CONNECT", first_line_format="authority").data - assert _assemble_request_line(authority_request) == b"CONNECT address:22 HTTP/1.1" - - absolute_request = treq(first_line_format="absolute").data - assert _assemble_request_line(absolute_request) == b"GET http://address:22/path HTTP/1.1" - - with raises(RuntimeError): - _assemble_request_line(treq(first_line_format="invalid_form").data) - - -def test_assemble_request_headers(): - # https://github.com/mitmproxy/mitmproxy/issues/186 - r = treq(content=b"") - r.headers["Transfer-Encoding"] = "chunked" - c = _assemble_request_headers(r.data) - assert b"Transfer-Encoding" in c - - -def test_assemble_request_headers_host_header(): - r = treq() - r.headers = Headers() - c = _assemble_request_headers(r.data) - assert b"host" in c - - r.host = None - c = _assemble_request_headers(r.data) - assert b"host" not in c - - -def test_assemble_response_headers(): - # https://github.com/mitmproxy/mitmproxy/issues/186 - r = tresp(content=b"") - r.headers["Transfer-Encoding"] = "chunked" - c = _assemble_response_headers(r) - assert b"Transfer-Encoding" in c diff --git a/test/netlib/http/http1/test_read.py b/test/netlib/http/http1/test_read.py deleted file mode 100644 index eb96968c..00000000 --- a/test/netlib/http/http1/test_read.py +++ /dev/null @@ -1,371 +0,0 @@ -from io import BytesIO -from mock import Mock -import pytest - -from mitmproxy import exceptions -from netlib.http import Headers -from netlib.http.http1.read import ( - read_request, read_response, read_request_head, - read_response_head, read_body, connection_close, expected_http_body_size, _get_first_line, - _read_request_line, _parse_authority_form, _read_response_line, _check_http_version, - _read_headers, _read_chunked, get_header_tokens -) -from mitmproxy.test.tutils import treq, tresp, raises - - -def test_get_header_tokens(): - headers = Headers() - assert get_header_tokens(headers, "foo") == [] - headers["foo"] = "bar" - assert get_header_tokens(headers, "foo") == ["bar"] - headers["foo"] = "bar, voing" - assert get_header_tokens(headers, "foo") == ["bar", "voing"] - headers.set_all("foo", ["bar, voing", "oink"]) - assert get_header_tokens(headers, "foo") == ["bar", "voing", "oink"] - - -@pytest.mark.parametrize("input", [ - b"GET / HTTP/1.1\r\n\r\nskip", - b"GET / HTTP/1.1\r\n\r\nskip", - b"GET / HTTP/1.1\r\n\r\nskip", - b"GET / HTTP/1.1 \r\n\r\nskip", -]) -def test_read_request(input): - rfile = BytesIO(input) - r = read_request(rfile) - assert r.method == "GET" - assert r.content == b"" - assert r.http_version == "HTTP/1.1" - assert r.timestamp_end - assert rfile.read() == b"skip" - - -@pytest.mark.parametrize("input", [ - b"CONNECT :0 0", -]) -def test_read_request_error(input): - rfile = BytesIO(input) - raises(exceptions.HttpException, read_request, rfile) - - -def test_read_request_head(): - rfile = BytesIO( - b"GET / HTTP/1.1\r\n" - b"Content-Length: 4\r\n" - b"\r\n" - b"skip" - ) - rfile.reset_timestamps = Mock() - rfile.first_byte_timestamp = 42 - r = read_request_head(rfile) - assert r.method == "GET" - assert r.headers["Content-Length"] == "4" - assert r.content is None - assert rfile.reset_timestamps.called - assert r.timestamp_start == 42 - assert rfile.read() == b"skip" - - -@pytest.mark.parametrize("input", [ - b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody", - b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody", - b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody", - b"HTTP/1.1 418 I'm a teapot \r\n\r\nbody", -]) -def test_read_response(input): - req = treq() - rfile = BytesIO(input) - r = read_response(rfile, req) - assert r.http_version == "HTTP/1.1" - assert r.status_code == 418 - assert r.reason == "I'm a teapot" - assert r.content == b"body" - assert r.timestamp_end - - -def test_read_response_head(): - rfile = BytesIO( - b"HTTP/1.1 418 I'm a teapot\r\n" - b"Content-Length: 4\r\n" - b"\r\n" - b"skip" - ) - rfile.reset_timestamps = Mock() - rfile.first_byte_timestamp = 42 - r = read_response_head(rfile) - assert r.status_code == 418 - assert r.headers["Content-Length"] == "4" - assert r.content is None - assert rfile.reset_timestamps.called - assert r.timestamp_start == 42 - assert rfile.read() == b"skip" - - -class TestReadBody: - def test_chunked(self): - rfile = BytesIO(b"3\r\nfoo\r\n0\r\n\r\nbar") - body = b"".join(read_body(rfile, None)) - assert body == b"foo" - assert rfile.read() == b"bar" - - def test_known_size(self): - rfile = BytesIO(b"foobar") - body = b"".join(read_body(rfile, 3)) - assert body == b"foo" - assert rfile.read() == b"bar" - - def test_known_size_limit(self): - rfile = BytesIO(b"foobar") - with raises(exceptions.HttpException): - b"".join(read_body(rfile, 3, 2)) - - def test_known_size_too_short(self): - rfile = BytesIO(b"foo") - with raises(exceptions.HttpException): - b"".join(read_body(rfile, 6)) - - def test_unknown_size(self): - rfile = BytesIO(b"foobar") - body = b"".join(read_body(rfile, -1)) - assert body == b"foobar" - - def test_unknown_size_limit(self): - rfile = BytesIO(b"foobar") - with raises(exceptions.HttpException): - b"".join(read_body(rfile, -1, 3)) - - def test_max_chunk_size(self): - rfile = BytesIO(b"123456") - assert list(read_body(rfile, -1, max_chunk_size=None)) == [b"123456"] - rfile = BytesIO(b"123456") - assert list(read_body(rfile, -1, max_chunk_size=1)) == [b"1", b"2", b"3", b"4", b"5", b"6"] - - -def test_connection_close(): - headers = Headers() - assert connection_close(b"HTTP/1.0", headers) - assert not connection_close(b"HTTP/1.1", headers) - - headers["connection"] = "keep-alive" - assert not connection_close(b"HTTP/1.1", headers) - - headers["connection"] = "close" - assert connection_close(b"HTTP/1.1", headers) - - headers["connection"] = "foobar" - assert connection_close(b"HTTP/1.0", headers) - assert not connection_close(b"HTTP/1.1", headers) - - -def test_expected_http_body_size(): - # Expect: 100-continue - assert expected_http_body_size( - treq(headers=Headers(expect="100-continue", content_length="42")) - ) == 0 - - # http://tools.ietf.org/html/rfc7230#section-3.3 - assert expected_http_body_size( - treq(method=b"HEAD"), - tresp(headers=Headers(content_length="42")) - ) == 0 - assert expected_http_body_size( - treq(method=b"CONNECT"), - tresp() - ) == 0 - for code in (100, 204, 304): - assert expected_http_body_size( - treq(), - tresp(status_code=code) - ) == 0 - - # chunked - assert expected_http_body_size( - treq(headers=Headers(transfer_encoding="chunked")), - ) is None - - # explicit length - for val in (b"foo", b"-7"): - with raises(exceptions.HttpSyntaxException): - expected_http_body_size( - treq(headers=Headers(content_length=val)) - ) - assert expected_http_body_size( - treq(headers=Headers(content_length="42")) - ) == 42 - - # no length - assert expected_http_body_size( - treq(headers=Headers()) - ) == 0 - assert expected_http_body_size( - treq(headers=Headers()), tresp(headers=Headers()) - ) == -1 - - -def test_get_first_line(): - rfile = BytesIO(b"foo\r\nbar") - assert _get_first_line(rfile) == b"foo" - - rfile = BytesIO(b"\r\nfoo\r\nbar") - assert _get_first_line(rfile) == b"foo" - - with raises(exceptions.HttpReadDisconnect): - rfile = BytesIO(b"") - _get_first_line(rfile) - - with raises(exceptions.HttpReadDisconnect): - rfile = Mock() - rfile.readline.side_effect = exceptions.TcpDisconnect - _get_first_line(rfile) - - -def test_read_request_line(): - def t(b): - return _read_request_line(BytesIO(b)) - - assert (t(b"GET / HTTP/1.1") == - ("relative", b"GET", None, None, None, b"/", b"HTTP/1.1")) - assert (t(b"OPTIONS * HTTP/1.1") == - ("relative", b"OPTIONS", None, None, None, b"*", b"HTTP/1.1")) - assert (t(b"CONNECT foo:42 HTTP/1.1") == - ("authority", b"CONNECT", None, b"foo", 42, None, b"HTTP/1.1")) - assert (t(b"GET http://foo:42/bar HTTP/1.1") == - ("absolute", b"GET", b"http", b"foo", 42, b"/bar", b"HTTP/1.1")) - - with raises(exceptions.HttpSyntaxException): - t(b"GET / WTF/1.1") - with raises(exceptions.HttpSyntaxException): - t(b"this is not http") - with raises(exceptions.HttpReadDisconnect): - t(b"") - - -def test_parse_authority_form(): - assert _parse_authority_form(b"foo:42") == (b"foo", 42) - with raises(exceptions.HttpSyntaxException): - _parse_authority_form(b"foo") - with raises(exceptions.HttpSyntaxException): - _parse_authority_form(b"foo:bar") - with raises(exceptions.HttpSyntaxException): - _parse_authority_form(b"foo:99999999") - with raises(exceptions.HttpSyntaxException): - _parse_authority_form(b"f\x00oo:80") - - -def test_read_response_line(): - def t(b): - return _read_response_line(BytesIO(b)) - - assert t(b"HTTP/1.1 200 OK") == (b"HTTP/1.1", 200, b"OK") - assert t(b"HTTP/1.1 200") == (b"HTTP/1.1", 200, b"") - - # https://github.com/mitmproxy/mitmproxy/issues/784 - assert t(b"HTTP/1.1 200 Non-Autoris\xc3\xa9") == (b"HTTP/1.1", 200, b"Non-Autoris\xc3\xa9") - - with raises(exceptions.HttpSyntaxException): - assert t(b"HTTP/1.1") - - with raises(exceptions.HttpSyntaxException): - t(b"HTTP/1.1 OK OK") - with raises(exceptions.HttpSyntaxException): - t(b"WTF/1.1 200 OK") - with raises(exceptions.HttpReadDisconnect): - t(b"") - - -def test_check_http_version(): - _check_http_version(b"HTTP/0.9") - _check_http_version(b"HTTP/1.0") - _check_http_version(b"HTTP/1.1") - _check_http_version(b"HTTP/2.0") - with raises(exceptions.HttpSyntaxException): - _check_http_version(b"WTF/1.0") - with raises(exceptions.HttpSyntaxException): - _check_http_version(b"HTTP/1.10") - with raises(exceptions.HttpSyntaxException): - _check_http_version(b"HTTP/1.b") - - -class TestReadHeaders: - @staticmethod - def _read(data): - return _read_headers(BytesIO(data)) - - def test_read_simple(self): - data = ( - b"Header: one\r\n" - b"Header2: two\r\n" - b"\r\n" - ) - headers = self._read(data) - assert headers.fields == ((b"Header", b"one"), (b"Header2", b"two")) - - def test_read_multi(self): - data = ( - b"Header: one\r\n" - b"Header: two\r\n" - b"\r\n" - ) - headers = self._read(data) - assert headers.fields == ((b"Header", b"one"), (b"Header", b"two")) - - def test_read_continued(self): - data = ( - b"Header: one\r\n" - b"\ttwo\r\n" - b"Header2: three\r\n" - b"\r\n" - ) - headers = self._read(data) - assert headers.fields == ((b"Header", b"one\r\n two"), (b"Header2", b"three")) - - def test_read_continued_err(self): - data = b"\tfoo: bar\r\n" - with raises(exceptions.HttpSyntaxException): - self._read(data) - - def test_read_err(self): - data = b"foo" - with raises(exceptions.HttpSyntaxException): - self._read(data) - - def test_read_empty_name(self): - data = b":foo" - with raises(exceptions.HttpSyntaxException): - self._read(data) - - def test_read_empty_value(self): - data = b"bar:" - headers = self._read(data) - assert headers.fields == ((b"bar", b""),) - - -def test_read_chunked(): - req = treq(content=None) - req.headers["Transfer-Encoding"] = "chunked" - - data = b"1\r\na\r\n0\r\n" - with raises(exceptions.HttpSyntaxException): - b"".join(_read_chunked(BytesIO(data))) - - data = b"1\r\na\r\n0\r\n\r\n" - assert b"".join(_read_chunked(BytesIO(data))) == b"a" - - data = b"\r\n\r\n1\r\na\r\n1\r\nb\r\n0\r\n\r\n" - assert b"".join(_read_chunked(BytesIO(data))) == b"ab" - - data = b"\r\n" - with raises("closed prematurely"): - b"".join(_read_chunked(BytesIO(data))) - - data = b"1\r\nfoo" - with raises("malformed chunked body"): - b"".join(_read_chunked(BytesIO(data))) - - data = b"foo\r\nfoo" - with raises(exceptions.HttpSyntaxException): - b"".join(_read_chunked(BytesIO(data))) - - data = b"5\r\naaaaa\r\n0\r\n\r\n" - with raises("too large"): - b"".join(_read_chunked(BytesIO(data), limit=2)) diff --git a/test/netlib/http/http2/__init__.py b/test/netlib/http/http2/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/test/netlib/http/http2/__init__.py +++ /dev/null diff --git a/test/netlib/http/http2/test_framereader.py b/test/netlib/http/http2/test_framereader.py deleted file mode 100644 index 41b73189..00000000 --- a/test/netlib/http/http2/test_framereader.py +++ /dev/null @@ -1 +0,0 @@ -# foobar diff --git a/test/netlib/http/test_authentication.py b/test/netlib/http/test_authentication.py deleted file mode 100644 index 5e04bbc5..00000000 --- a/test/netlib/http/test_authentication.py +++ /dev/null @@ -1,122 +0,0 @@ -import binascii - -from mitmproxy.test import tutils -from netlib.http import authentication, Headers - - -def test_parse_http_basic_auth(): - vals = ("basic", "foo", "bar") - assert authentication.parse_http_basic_auth( - authentication.assemble_http_basic_auth(*vals) - ) == vals - assert not authentication.parse_http_basic_auth("") - assert not authentication.parse_http_basic_auth("foo bar") - v = "basic " + binascii.b2a_base64(b"foo").decode("ascii") - assert not authentication.parse_http_basic_auth(v) - - -class TestPassManNonAnon: - - def test_simple(self): - p = authentication.PassManNonAnon() - assert not p.test("", "") - assert p.test("user", "") - - -class TestPassManHtpasswd: - - def test_file_errors(self): - tutils.raises( - "malformed htpasswd file", - authentication.PassManHtpasswd, - tutils.test_data.path("data/server.crt")) - - def test_simple(self): - pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd")) - - vals = ("basic", "test", "test") - authentication.assemble_http_basic_auth(*vals) - assert pm.test("test", "test") - assert not pm.test("test", "foo") - assert not pm.test("foo", "test") - assert not pm.test("test", "") - assert not pm.test("", "") - - -class TestPassManSingleUser: - - def test_simple(self): - pm = authentication.PassManSingleUser("test", "test") - assert pm.test("test", "test") - assert not pm.test("test", "foo") - assert not pm.test("foo", "test") - - -class TestNullProxyAuth: - - def test_simple(self): - na = authentication.NullProxyAuth(authentication.PassManNonAnon()) - assert not na.auth_challenge_headers() - assert na.authenticate("foo") - na.clean({}) - - -class TestBasicProxyAuth: - - def test_simple(self): - ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test") - headers = Headers() - assert ba.auth_challenge_headers() - assert not ba.authenticate(headers) - - def test_authenticate_clean(self): - ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test") - - headers = Headers() - vals = ("basic", "foo", "bar") - headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals) - assert ba.authenticate(headers) - - ba.clean(headers) - assert ba.AUTH_HEADER not in headers - - headers[ba.AUTH_HEADER] = "" - assert not ba.authenticate(headers) - - headers[ba.AUTH_HEADER] = "foo" - assert not ba.authenticate(headers) - - vals = ("foo", "foo", "bar") - headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals) - assert not ba.authenticate(headers) - - ba = authentication.BasicProxyAuth(authentication.PassMan(), "test") - vals = ("basic", "foo", "bar") - headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals) - assert not ba.authenticate(headers) - - -class Bunch: - pass - - -class TestAuthAction: - - def test_nonanonymous(self): - m = Bunch() - aa = authentication.NonanonymousAuthAction(None, "authenticator") - aa(None, m, None, None) - assert m.authenticator - - def test_singleuser(self): - m = Bunch() - aa = authentication.SingleuserAuthAction(None, "authenticator") - aa(None, m, "foo:bar", None) - assert m.authenticator - tutils.raises("invalid", aa, None, m, "foo", None) - - def test_httppasswd(self): - m = Bunch() - aa = authentication.HtpasswdAuthAction(None, "authenticator") - aa(None, m, tutils.test_data.path("data/htpasswd"), None) - assert m.authenticator diff --git a/test/netlib/http/test_cookies.py b/test/netlib/http/test_cookies.py deleted file mode 100644 index ca10a69c..00000000 --- a/test/netlib/http/test_cookies.py +++ /dev/null @@ -1,365 +0,0 @@ -import time - -from netlib.http import cookies -from mitmproxy.test.tutils import raises - -import mock - -cookie_pairs = [ - [ - "", - [] - ], - [ - "one=uno", - [["one", "uno"]] - ], - [ - "one", - [["one", None]] - ], - [ - "one=uno; two=due", - [["one", "uno"], ["two", "due"]] - ], - [ - 'one="uno"; two="\due"', - [["one", "uno"], ["two", "due"]] - ], - [ - 'one="un\\"o"', - [["one", 'un"o']] - ], - [ - 'one="uno,due"', - [["one", 'uno,due']] - ], - [ - "one=uno; two; three=tre", - [["one", "uno"], ["two", None], ["three", "tre"]] - ], - [ - "_lvs2=zHai1+Hq+Tc2vmc2r4GAbdOI5Jopg3EwsdUT9g=; " - "_rcc2=53VdltWl+Ov6ordflA==;", - [ - ["_lvs2", "zHai1+Hq+Tc2vmc2r4GAbdOI5Jopg3EwsdUT9g="], - ["_rcc2", "53VdltWl+Ov6ordflA=="] - ] - ] -] - - -def test_read_key(): - tokens = [ - [("foo", 0), ("foo", 3)], - [("foo", 1), ("oo", 3)], - [(" foo", 0), (" foo", 4)], - [(" foo", 1), ("foo", 4)], - [(" foo;", 1), ("foo", 4)], - [(" foo=", 1), ("foo", 4)], - [(" foo=bar", 1), ("foo", 4)], - ] - for q, a in tokens: - assert cookies._read_key(*q) == a - - -def test_read_quoted_string(): - tokens = [ - [('"foo" x', 0), ("foo", 5)], - [('"f\oo" x', 0), ("foo", 6)], - [(r'"f\\o" x', 0), (r"f\o", 6)], - [(r'"f\\" x', 0), (r"f" + '\\', 5)], - [('"fo\\\"" x', 0), ("fo\"", 6)], - [('"foo" x', 7), ("", 8)], - ] - for q, a in tokens: - assert cookies._read_quoted_string(*q) == a - - -def test_read_cookie_pairs(): - vals = [ - [ - "one", - [["one", None]] - ], - [ - "one=two", - [["one", "two"]] - ], - [ - "one=", - [["one", ""]] - ], - [ - 'one="two"', - [["one", "two"]] - ], - [ - 'one="two"; three=four', - [["one", "two"], ["three", "four"]] - ], - [ - 'one="two"; three=four; five', - [["one", "two"], ["three", "four"], ["five", None]] - ], - [ - 'one="\\"two"; three=four', - [["one", '"two'], ["three", "four"]] - ], - ] - for s, lst in vals: - ret, off = cookies._read_cookie_pairs(s) - assert ret == lst - - -def test_pairs_roundtrips(): - for s, expected in cookie_pairs: - ret, off = cookies._read_cookie_pairs(s) - assert ret == expected - - s2 = cookies._format_pairs(expected) - ret, off = cookies._read_cookie_pairs(s2) - assert ret == expected - - -def test_cookie_roundtrips(): - for s, expected in cookie_pairs: - ret = cookies.parse_cookie_header(s) - assert ret == expected - - s2 = cookies.format_cookie_header(expected) - ret = cookies.parse_cookie_header(s2) - assert ret == expected - - -def test_parse_set_cookie_pairs(): - pairs = [ - [ - "one=uno", - [[ - ["one", "uno"] - ]] - ], - [ - "one=un\x20", - [[ - ["one", "un\x20"] - ]] - ], - [ - "one=uno; foo", - [[ - ["one", "uno"], - ["foo", None] - ]] - ], - [ - "mun=1.390.f60; " - "expires=sun, 11-oct-2015 12:38:31 gmt; path=/; " - "domain=b.aol.com", - [[ - ["mun", "1.390.f60"], - ["expires", "sun, 11-oct-2015 12:38:31 gmt"], - ["path", "/"], - ["domain", "b.aol.com"] - ]] - ], - [ - r'rpb=190%3d1%2616726%3d1%2634832%3d1%2634874%3d1; ' - 'domain=.rubiconproject.com; ' - 'expires=mon, 11-may-2015 21:54:57 gmt; ' - 'path=/', - [[ - ['rpb', r'190%3d1%2616726%3d1%2634832%3d1%2634874%3d1'], - ['domain', '.rubiconproject.com'], - ['expires', 'mon, 11-may-2015 21:54:57 gmt'], - ['path', '/'] - ]] - ], - ] - for s, expected in pairs: - ret, off = cookies._read_set_cookie_pairs(s) - assert ret == expected - - s2 = cookies._format_set_cookie_pairs(expected[0]) - ret2, off = cookies._read_set_cookie_pairs(s2) - assert ret2 == expected - - -def test_parse_set_cookie_header(): - def set_cookie_equal(obs, exp): - assert obs[0] == exp[0] - assert obs[1] == exp[1] - assert obs[2].items(multi=True) == exp[2] - - vals = [ - [ - "", [] - ], - [ - ";", [] - ], - [ - "one=uno", - [ - ("one", "uno", ()) - ] - ], - [ - "one=uno; foo=bar", - [ - ("one", "uno", (("foo", "bar"),)) - ] - ], - [ - "one=uno; foo=bar; foo=baz", - [ - ("one", "uno", (("foo", "bar"), ("foo", "baz"))) - ] - ], - # Comma Separated Variant of Set-Cookie Headers - [ - "foo=bar, doo=dar", - [ - ("foo", "bar", ()), - ("doo", "dar", ()), - ] - ], - [ - "foo=bar; path=/, doo=dar; roo=rar; zoo=zar", - [ - ("foo", "bar", (("path", "/"),)), - ("doo", "dar", (("roo", "rar"), ("zoo", "zar"))), - ] - ], - [ - "foo=bar; expires=Mon, 24 Aug 2037", - [ - ("foo", "bar", (("expires", "Mon, 24 Aug 2037"),)), - ] - ], - [ - "foo=bar; expires=Mon, 24 Aug 2037 00:00:00 GMT, doo=dar", - [ - ("foo", "bar", (("expires", "Mon, 24 Aug 2037 00:00:00 GMT"),)), - ("doo", "dar", ()), - ] - ], - ] - for s, expected in vals: - ret = cookies.parse_set_cookie_header(s) - if expected: - for i in range(len(expected)): - set_cookie_equal(ret[i], expected[i]) - - s2 = cookies.format_set_cookie_header(ret) - ret2 = cookies.parse_set_cookie_header(s2) - for i in range(len(expected)): - set_cookie_equal(ret2[i], expected[i]) - else: - assert not ret - - -def test_refresh_cookie(): - - # Invalid expires format, sent to us by Reddit. - c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/" - assert cookies.refresh_set_cookie_header(c, 60) - - c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure" - assert "00:21:38" in cookies.refresh_set_cookie_header(c, 60) - - c = "foo,bar" - with raises(ValueError): - cookies.refresh_set_cookie_header(c, 60) - - # https://github.com/mitmproxy/mitmproxy/issues/773 - c = ">=A" - assert cookies.refresh_set_cookie_header(c, 60) - - # https://github.com/mitmproxy/mitmproxy/issues/1118 - c = "foo:bar=bla" - assert cookies.refresh_set_cookie_header(c, 0) - c = "foo/bar=bla" - assert cookies.refresh_set_cookie_header(c, 0) - - -@mock.patch('time.time') -def test_get_expiration_ts(*args): - # Freeze time - now_ts = 17 - time.time.return_value = now_ts - - CA = cookies.CookieAttrs - F = cookies.get_expiration_ts - - assert F(CA([("Expires", "Thu, 01-Jan-1970 00:00:00 GMT")])) == 0 - assert F(CA([("Expires", "Mon, 24-Aug-2037 00:00:00 GMT")])) == 2134684800 - - assert F(CA([("Max-Age", "0")])) == now_ts - assert F(CA([("Max-Age", "31")])) == now_ts + 31 - - -def test_is_expired(): - CA = cookies.CookieAttrs - - # A cookie can be expired - # by setting the expire time in the past - assert cookies.is_expired(CA([("Expires", "Thu, 01-Jan-1970 00:00:00 GMT")])) - - # or by setting Max-Age to 0 - assert cookies.is_expired(CA([("Max-Age", "0")])) - - # or both - assert cookies.is_expired(CA([("Expires", "Thu, 01-Jan-1970 00:00:00 GMT"), ("Max-Age", "0")])) - - assert not cookies.is_expired(CA([("Expires", "Mon, 24-Aug-2037 00:00:00 GMT")])) - assert not cookies.is_expired(CA([("Max-Age", "1")])) - assert not cookies.is_expired(CA([("Expires", "Wed, 15-Jul-2037 00:00:00 GMT"), ("Max-Age", "1")])) - - assert not cookies.is_expired(CA([("Max-Age", "nan")])) - assert not cookies.is_expired(CA([("Expires", "false")])) - - -def test_group_cookies(): - CA = cookies.CookieAttrs - groups = [ - [ - "one=uno; foo=bar; foo=baz", - [ - ('one', 'uno', CA([])), - ('foo', 'bar', CA([])), - ('foo', 'baz', CA([])) - ] - ], - [ - "one=uno; Path=/; foo=bar; Max-Age=0; foo=baz; expires=24-08-1993", - [ - ('one', 'uno', CA([('Path', '/')])), - ('foo', 'bar', CA([('Max-Age', '0')])), - ('foo', 'baz', CA([('expires', '24-08-1993')])) - ] - ], - [ - "one=uno;", - [ - ('one', 'uno', CA([])) - ] - ], - [ - "one=uno; Path=/; Max-Age=0; Expires=24-08-1993", - [ - ('one', 'uno', CA([('Path', '/'), ('Max-Age', '0'), ('Expires', '24-08-1993')])) - ] - ], - [ - "path=val; Path=/", - [ - ('path', 'val', CA([('Path', '/')])) - ] - ] - ] - - for c, expected in groups: - observed = cookies.group_cookies(cookies.parse_cookie_header(c)) - assert observed == expected diff --git a/test/netlib/http/test_encoding.py b/test/netlib/http/test_encoding.py deleted file mode 100644 index 89600709..00000000 --- a/test/netlib/http/test_encoding.py +++ /dev/null @@ -1,73 +0,0 @@ -import mock -import pytest - -from netlib.http import encoding -from mitmproxy.test import tutils - - -@pytest.mark.parametrize("encoder", [ - 'identity', - 'none', -]) -def test_identity(encoder): - assert b"string" == encoding.decode(b"string", encoder) - assert b"string" == encoding.encode(b"string", encoder) - with tutils.raises(ValueError): - encoding.encode(b"string", "nonexistent encoding") - - -@pytest.mark.parametrize("encoder", [ - 'gzip', - 'br', - 'deflate', -]) -def test_encoders(encoder): - assert "" == encoding.decode("", encoder) - assert b"" == encoding.decode(b"", encoder) - - assert "string" == encoding.decode( - encoding.encode( - "string", - encoder - ), - encoder - ) - assert b"string" == encoding.decode( - encoding.encode( - b"string", - encoder - ), - encoder - ) - - with tutils.raises(ValueError): - encoding.decode(b"foobar", encoder) - - -def test_cache(): - decode_gzip = mock.MagicMock() - decode_gzip.return_value = b"decoded" - encode_gzip = mock.MagicMock() - encode_gzip.return_value = b"encoded" - - with mock.patch.dict(encoding.custom_decode, gzip=decode_gzip): - with mock.patch.dict(encoding.custom_encode, gzip=encode_gzip): - assert encoding.decode(b"encoded", "gzip") == b"decoded" - assert decode_gzip.call_count == 1 - - # should be cached - assert encoding.decode(b"encoded", "gzip") == b"decoded" - assert decode_gzip.call_count == 1 - - # the other way around as well - assert encoding.encode(b"decoded", "gzip") == b"encoded" - assert encode_gzip.call_count == 0 - - # different encoding - decode_gzip.return_value = b"bar" - assert encoding.encode(b"decoded", "deflate") != b"decoded" - assert encode_gzip.call_count == 0 - - # This is not in the cache anymore - assert encoding.encode(b"decoded", "gzip") == b"encoded" - assert encode_gzip.call_count == 1 diff --git a/test/netlib/http/test_headers.py b/test/netlib/http/test_headers.py deleted file mode 100644 index cac77d57..00000000 --- a/test/netlib/http/test_headers.py +++ /dev/null @@ -1,106 +0,0 @@ -import collections - -from netlib.http.headers import Headers, parse_content_type, assemble_content_type -from mitmproxy.test.tutils import raises - - -class TestHeaders: - def _2host(self): - return Headers( - ( - (b"Host", b"example.com"), - (b"host", b"example.org") - ) - ) - - def test_init(self): - headers = Headers() - assert len(headers) == 0 - - headers = Headers([[b"Host", b"example.com"]]) - assert len(headers) == 1 - assert headers["Host"] == "example.com" - - headers = Headers(Host="example.com") - assert len(headers) == 1 - assert headers["Host"] == "example.com" - - headers = Headers( - [[b"Host", b"invalid"]], - Host="example.com" - ) - assert len(headers) == 1 - assert headers["Host"] == "example.com" - - headers = Headers( - [[b"Host", b"invalid"], [b"Accept", b"text/plain"]], - Host="example.com" - ) - assert len(headers) == 2 - assert headers["Host"] == "example.com" - assert headers["Accept"] == "text/plain" - - with raises(TypeError): - Headers([[b"Host", u"not-bytes"]]) - - def test_set(self): - headers = Headers() - headers[u"foo"] = u"1" - headers[b"bar"] = b"2" - headers["baz"] = b"3" - with raises(TypeError): - headers["foobar"] = 42 - assert len(headers) == 3 - - def test_bytes(self): - headers = Headers(Host="example.com") - assert bytes(headers) == b"Host: example.com\r\n" - - headers = Headers([ - [b"Host", b"example.com"], - [b"Accept", b"text/plain"] - ]) - assert bytes(headers) == b"Host: example.com\r\nAccept: text/plain\r\n" - - headers = Headers() - assert bytes(headers) == b"" - - def test_replace_simple(self): - headers = Headers(Host="example.com", Accept="text/plain") - replacements = headers.replace("Host: ", "X-Host: ") - assert replacements == 1 - assert headers["X-Host"] == "example.com" - assert "Host" not in headers - assert headers["Accept"] == "text/plain" - - def test_replace_multi(self): - headers = self._2host() - headers.replace(r"Host: example\.com", r"Host: example.de") - assert headers.get_all("Host") == ["example.de", "example.org"] - - def test_replace_remove_spacer(self): - headers = Headers(Host="example.com") - replacements = headers.replace(r"Host: ", "X-Host ") - assert replacements == 0 - assert headers["Host"] == "example.com" - - def test_replace_with_count(self): - headers = Headers(Host="foobarfoo.com", Accept="foo/bar") - replacements = headers.replace("foo", "bar", count=1) - assert replacements == 1 - - -def test_parse_content_type(): - p = parse_content_type - assert p("text/html") == ("text", "html", {}) - assert p("text") is None - - v = p("text/html; charset=UTF-8") - assert v == ('text', 'html', {'charset': 'UTF-8'}) - - -def test_assemble_content_type(): - p = assemble_content_type - assert p("text", "html", {}) == "text/html" - assert p("text", "html", {"charset": "utf8"}) == "text/html; charset=utf8" - assert p("text", "html", collections.OrderedDict([("charset", "utf8"), ("foo", "bar")])) == "text/html; charset=utf8; foo=bar" diff --git a/test/netlib/http/test_message.py b/test/netlib/http/test_message.py deleted file mode 100644 index 2bc8824f..00000000 --- a/test/netlib/http/test_message.py +++ /dev/null @@ -1,271 +0,0 @@ -# -*- coding: utf-8 -*- - -from mitmproxy.test import tutils -from netlib import http - - -def _test_passthrough_attr(message, attr): - assert getattr(message, attr) == getattr(message.data, attr) - setattr(message, attr, b"foo") - assert getattr(message.data, attr) == b"foo" - - -def _test_decoded_attr(message, attr): - assert getattr(message, attr) == getattr(message.data, attr).decode("utf8") - # Set str, get raw bytes - setattr(message, attr, "foo") - assert getattr(message.data, attr) == b"foo" - # Set raw bytes, get decoded - setattr(message.data, attr, b"BAR") # use uppercase so that we can also cover request.method - assert getattr(message, attr) == "BAR" - # Set bytes, get raw bytes - setattr(message, attr, b"baz") - assert getattr(message.data, attr) == b"baz" - - # Set UTF8 - setattr(message, attr, "Non-Autorisé") - assert getattr(message.data, attr) == b"Non-Autoris\xc3\xa9" - # Don't fail on garbage - setattr(message.data, attr, b"FOO\xBF\x00BAR") - assert getattr(message, attr).startswith("FOO") - assert getattr(message, attr).endswith("BAR") - # foo.bar = foo.bar should not cause any side effects. - d = getattr(message, attr) - setattr(message, attr, d) - assert getattr(message.data, attr) == b"FOO\xBF\x00BAR" - - -class TestMessageData: - def test_eq_ne(self): - data = tutils.tresp(timestamp_start=42, timestamp_end=42).data - same = tutils.tresp(timestamp_start=42, timestamp_end=42).data - assert data == same - assert not data != same - - other = tutils.tresp(content=b"foo").data - assert not data == other - assert data != other - - assert data != 0 - - -class TestMessage: - - def test_init(self): - resp = tutils.tresp() - assert resp.data - - def test_eq_ne(self): - resp = tutils.tresp(timestamp_start=42, timestamp_end=42) - same = tutils.tresp(timestamp_start=42, timestamp_end=42) - assert resp == same - assert not resp != same - - other = tutils.tresp(timestamp_start=0, timestamp_end=0) - assert not resp == other - assert resp != other - - assert resp != 0 - - def test_serializable(self): - resp = tutils.tresp() - resp2 = http.Response.from_state(resp.get_state()) - assert resp == resp2 - - def test_content_length_update(self): - resp = tutils.tresp() - resp.content = b"foo" - assert resp.data.content == b"foo" - assert resp.headers["content-length"] == "3" - resp.content = b"" - assert resp.data.content == b"" - assert resp.headers["content-length"] == "0" - resp.raw_content = b"bar" - assert resp.data.content == b"bar" - assert resp.headers["content-length"] == "0" - - def test_headers(self): - _test_passthrough_attr(tutils.tresp(), "headers") - - def test_timestamp_start(self): - _test_passthrough_attr(tutils.tresp(), "timestamp_start") - - def test_timestamp_end(self): - _test_passthrough_attr(tutils.tresp(), "timestamp_end") - - def test_http_version(self): - _test_decoded_attr(tutils.tresp(), "http_version") - - def test_replace(self): - r = tutils.tresp() - r.content = b"foofootoo" - r.replace(b"foo", "gg") - assert r.content == b"ggggtoo" - - r.content = b"foofootoo" - r.replace(b"foo", "gg", count=1) - assert r.content == b"ggfootoo" - - -class TestMessageContentEncoding: - def test_simple(self): - r = tutils.tresp() - assert r.raw_content == b"message" - assert "content-encoding" not in r.headers - r.encode("gzip") - - assert r.headers["content-encoding"] - assert r.raw_content != b"message" - assert r.content == b"message" - assert r.raw_content != b"message" - - def test_modify(self): - r = tutils.tresp() - assert "content-encoding" not in r.headers - r.encode("gzip") - - r.content = b"foo" - assert r.raw_content != b"foo" - r.decode() - assert r.raw_content == b"foo" - - with tutils.raises(TypeError): - r.content = u"foo" - - def test_unknown_ce(self): - r = tutils.tresp() - r.headers["content-encoding"] = "zopfli" - r.raw_content = b"foo" - with tutils.raises(ValueError): - assert r.content - assert r.headers["content-encoding"] - assert r.get_content(strict=False) == b"foo" - - def test_cannot_decode(self): - r = tutils.tresp() - r.encode("gzip") - r.raw_content = b"foo" - with tutils.raises(ValueError): - assert r.content - assert r.headers["content-encoding"] - assert r.get_content(strict=False) == b"foo" - - with tutils.raises(ValueError): - r.decode() - assert r.raw_content == b"foo" - assert "content-encoding" in r.headers - - r.decode(strict=False) - assert r.content == b"foo" - assert "content-encoding" not in r.headers - - def test_none(self): - r = tutils.tresp(content=None) - assert r.content is None - r.content = b"foo" - assert r.content is not None - r.content = None - assert r.content is None - - def test_cannot_encode(self): - r = tutils.tresp() - r.encode("gzip") - r.content = None - assert r.headers["content-encoding"] - assert r.raw_content is None - - r.headers["content-encoding"] = "zopfli" - r.content = b"foo" - assert "content-encoding" not in r.headers - assert r.raw_content == b"foo" - - with tutils.raises(ValueError): - r.encode("zopfli") - assert r.raw_content == b"foo" - assert "content-encoding" not in r.headers - - -class TestMessageText: - def test_simple(self): - r = tutils.tresp(content=b'\xfc') - assert r.raw_content == b"\xfc" - assert r.content == b"\xfc" - assert r.text == u"ü" - - r.encode("gzip") - assert r.text == u"ü" - r.decode() - assert r.text == u"ü" - - r.headers["content-type"] = "text/html; charset=latin1" - r.content = b"\xc3\xbc" - assert r.text == u"ü" - r.headers["content-type"] = "text/html; charset=utf8" - assert r.text == u"ü" - - def test_guess_json(self): - r = tutils.tresp(content=b'"\xc3\xbc"') - r.headers["content-type"] = "application/json" - assert r.text == u'"ü"' - - def test_none(self): - r = tutils.tresp(content=None) - assert r.text is None - r.text = u"foo" - assert r.text is not None - r.text = None - assert r.text is None - - def test_modify(self): - r = tutils.tresp() - - r.text = u"ü" - assert r.raw_content == b"\xfc" - - r.headers["content-type"] = "text/html; charset=utf8" - r.text = u"ü" - assert r.raw_content == b"\xc3\xbc" - assert r.headers["content-length"] == "2" - - def test_unknown_ce(self): - r = tutils.tresp() - r.headers["content-type"] = "text/html; charset=wtf" - r.raw_content = b"foo" - with tutils.raises(ValueError): - assert r.text == u"foo" - assert r.get_text(strict=False) == u"foo" - - def test_cannot_decode(self): - r = tutils.tresp() - r.headers["content-type"] = "text/html; charset=utf8" - r.raw_content = b"\xFF" - with tutils.raises(ValueError): - assert r.text - - assert r.get_text(strict=False) == '\udcff' - - def test_cannot_encode(self): - r = tutils.tresp() - r.content = None - assert "content-type" not in r.headers - assert r.raw_content is None - - r.headers["content-type"] = "text/html; charset=latin1; foo=bar" - r.text = u"☃" - assert r.headers["content-type"] == "text/html; charset=utf-8; foo=bar" - assert r.raw_content == b'\xe2\x98\x83' - - r.headers["content-type"] = "gibberish" - r.text = u"☃" - assert r.headers["content-type"] == "text/plain; charset=utf-8" - assert r.raw_content == b'\xe2\x98\x83' - - del r.headers["content-type"] - r.text = u"☃" - assert r.headers["content-type"] == "text/plain; charset=utf-8" - assert r.raw_content == b'\xe2\x98\x83' - - r.headers["content-type"] = "text/html; charset=latin1" - r.text = u'\udcff' - assert r.headers["content-type"] == "text/html; charset=utf-8" - assert r.raw_content == b"\xFF" diff --git a/test/netlib/http/test_multipart.py b/test/netlib/http/test_multipart.py deleted file mode 100644 index 1d7e0062..00000000 --- a/test/netlib/http/test_multipart.py +++ /dev/null @@ -1,24 +0,0 @@ -from netlib.http import Headers -from netlib.http import multipart - - -def test_decode(): - boundary = 'somefancyboundary' - headers = Headers( - content_type='multipart/form-data; boundary=' + boundary - ) - content = ( - "--{0}\n" - "Content-Disposition: form-data; name=\"field1\"\n\n" - "value1\n" - "--{0}\n" - "Content-Disposition: form-data; name=\"field2\"\n\n" - "value2\n" - "--{0}--".format(boundary).encode() - ) - - form = multipart.decode(headers, content) - - assert len(form) == 2 - assert form[0] == (b"field1", b"value1") - assert form[1] == (b"field2", b"value2") diff --git a/test/netlib/http/test_request.py b/test/netlib/http/test_request.py deleted file mode 100644 index ecfc1ba6..00000000 --- a/test/netlib/http/test_request.py +++ /dev/null @@ -1,271 +0,0 @@ -# -*- coding: utf-8 -*- - -from netlib.http import Headers -from mitmproxy.test.tutils import treq, raises -from .test_message import _test_decoded_attr, _test_passthrough_attr - - -class TestRequestData: - def test_init(self): - with raises(ValueError): - treq(headers="foobar") - - assert isinstance(treq(headers=()).headers, Headers) - - -class TestRequestCore: - """ - Tests for addons and the attributes that are directly proxied from the data structure - """ - def test_repr(self): - request = treq() - assert repr(request) == "Request(GET address:22/path)" - request.host = None - assert repr(request) == "Request(GET /path)" - - def replace(self): - r = treq() - r.path = b"foobarfoo" - r.replace(b"foo", "bar") - assert r.path == b"barbarbar" - - r.path = b"foobarfoo" - r.replace(b"foo", "bar", count=1) - assert r.path == b"barbarfoo" - - def test_first_line_format(self): - _test_passthrough_attr(treq(), "first_line_format") - - def test_method(self): - _test_decoded_attr(treq(), "method") - - def test_scheme(self): - _test_decoded_attr(treq(), "scheme") - - def test_port(self): - _test_passthrough_attr(treq(), "port") - - def test_path(self): - req = treq() - _test_decoded_attr(req, "path") - # path can also be None. - req.path = None - assert req.path is None - assert req.data.path is None - - def test_host(self): - request = treq() - assert request.host == request.data.host.decode("idna") - - # Test IDNA encoding - # Set str, get raw bytes - request.host = "ídna.example" - assert request.data.host == b"xn--dna-qma.example" - # Set raw bytes, get decoded - request.data.host = b"xn--idn-gla.example" - assert request.host == "idná.example" - # Set bytes, get raw bytes - request.host = b"xn--dn-qia9b.example" - assert request.data.host == b"xn--dn-qia9b.example" - # IDNA encoding is not bijective - request.host = "fußball" - assert request.host == "fussball" - - # Don't fail on garbage - request.data.host = b"foo\xFF\x00bar" - assert request.host.startswith("foo") - assert request.host.endswith("bar") - # foo.bar = foo.bar should not cause any side effects. - d = request.host - request.host = d - assert request.data.host == b"foo\xFF\x00bar" - - def test_host_header_update(self): - request = treq() - assert "host" not in request.headers - request.host = "example.com" - assert "host" not in request.headers - - request.headers["Host"] = "foo" - request.host = "example.org" - assert request.headers["Host"] == "example.org" - - -class TestRequestUtils: - """ - Tests for additional convenience methods. - """ - def test_url(self): - request = treq() - assert request.url == "http://address:22/path" - - request.url = "https://otheraddress:42/foo" - assert request.scheme == "https" - assert request.host == "otheraddress" - assert request.port == 42 - assert request.path == "/foo" - - with raises(ValueError): - request.url = "not-a-url" - - def test_url_options(self): - request = treq(method=b"OPTIONS", path=b"*") - assert request.url == "http://address:22" - - def test_url_authority(self): - request = treq(first_line_format="authority") - assert request.url == "address:22" - - def test_pretty_host(self): - request = treq() - # Without host header - assert request.pretty_host == "address" - assert request.host == "address" - # Same port as self.port (22) - request.headers["host"] = "other:22" - assert request.pretty_host == "other" - # Different ports - request.headers["host"] = "other" - assert request.pretty_host == "address" - assert request.host == "address" - # Empty host - request.host = None - assert request.pretty_host is None - assert request.host is None - - # Invalid IDNA - request.headers["host"] = ".disqus.com:22" - assert request.pretty_host == ".disqus.com" - - def test_pretty_url(self): - request = treq() - # Without host header - assert request.url == "http://address:22/path" - assert request.pretty_url == "http://address:22/path" - # Same port as self.port (22) - request.headers["host"] = "other:22" - assert request.pretty_url == "http://other:22/path" - # Different ports - request.headers["host"] = "other" - assert request.pretty_url == "http://address:22/path" - - def test_pretty_url_options(self): - request = treq(method=b"OPTIONS", path=b"*") - assert request.pretty_url == "http://address:22" - - def test_pretty_url_authority(self): - request = treq(first_line_format="authority") - assert request.pretty_url == "address:22" - - def test_get_query(self): - request = treq() - assert not request.query - - request.url = "http://localhost:80/foo?bar=42" - assert dict(request.query) == {"bar": "42"} - - def test_set_query(self): - request = treq() - assert not request.query - request.query["foo"] = "bar" - assert request.query["foo"] == "bar" - assert request.path == "/path?foo=bar" - - def test_get_cookies_none(self): - request = treq() - request.headers = Headers() - assert not request.cookies - - def test_get_cookies_single(self): - request = treq() - request.headers = Headers(cookie="cookiename=cookievalue") - assert len(request.cookies) == 1 - assert request.cookies['cookiename'] == 'cookievalue' - - def test_get_cookies_double(self): - request = treq() - request.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue") - result = request.cookies - assert len(result) == 2 - assert result['cookiename'] == 'cookievalue' - assert result['othercookiename'] == 'othercookievalue' - - def test_get_cookies_withequalsign(self): - request = treq() - request.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue") - result = request.cookies - assert len(result) == 2 - assert result['cookiename'] == 'coo=kievalue' - assert result['othercookiename'] == 'othercookievalue' - - def test_set_cookies(self): - request = treq() - request.headers = Headers(cookie="cookiename=cookievalue") - result = request.cookies - result["cookiename"] = "foo" - assert request.cookies["cookiename"] == "foo" - - def test_get_path_components(self): - request = treq(path=b"/foo/bar") - assert request.path_components == ("foo", "bar") - - def test_set_path_components(self): - request = treq() - request.path_components = ["foo", "baz"] - assert request.path == "/foo/baz" - - request.path_components = [] - assert request.path == "/" - - request.path_components = ["foo", "baz"] - request.query["hello"] = "hello" - assert request.path_components == ("foo", "baz") - - request.path_components = ["abc"] - assert request.path == "/abc?hello=hello" - - def test_anticache(self): - request = treq() - request.headers["If-Modified-Since"] = "foo" - request.headers["If-None-Match"] = "bar" - request.anticache() - assert "If-Modified-Since" not in request.headers - assert "If-None-Match" not in request.headers - - def test_anticomp(self): - request = treq() - request.headers["Accept-Encoding"] = "foobar" - request.anticomp() - assert request.headers["Accept-Encoding"] == "identity" - - def test_constrain_encoding(self): - request = treq() - - h = request.headers.copy() - request.constrain_encoding() # no-op if there is no accept_encoding header. - assert request.headers == h - - request.headers["Accept-Encoding"] = "identity, gzip, foo" - request.constrain_encoding() - assert "foo" not in request.headers["Accept-Encoding"] - assert "gzip" in request.headers["Accept-Encoding"] - - def test_get_urlencoded_form(self): - request = treq(content=b"foobar=baz") - assert not request.urlencoded_form - - request.headers["Content-Type"] = "application/x-www-form-urlencoded" - assert list(request.urlencoded_form.items()) == [(b"foobar", b"baz")] - - def test_set_urlencoded_form(self): - request = treq() - request.urlencoded_form = [(b'foo', b'bar'), (b'rab', b'oof')] - assert request.headers["Content-Type"] == "application/x-www-form-urlencoded" - assert request.content - - def test_get_multipart_form(self): - request = treq(content=b"foobar") - assert not request.multipart_form - - request.headers["Content-Type"] = "multipart/form-data" - assert list(request.multipart_form.items()) == [] diff --git a/test/netlib/http/test_response.py b/test/netlib/http/test_response.py deleted file mode 100644 index 4a6fac62..00000000 --- a/test/netlib/http/test_response.py +++ /dev/null @@ -1,145 +0,0 @@ -import email - -import time - -from netlib.http import Headers -from netlib.http import Response -from netlib.http.cookies import CookieAttrs -from mitmproxy.test.tutils import raises, tresp -from .test_message import _test_passthrough_attr, _test_decoded_attr - - -class TestResponseData: - def test_init(self): - with raises(ValueError): - tresp(headers="foobar") - - assert isinstance(tresp(headers=()).headers, Headers) - - -class TestResponseCore: - """ - Tests for addons and the attributes that are directly proxied from the data structure - """ - def test_repr(self): - response = tresp() - assert repr(response) == "Response(200 OK, unknown content type, 7b)" - response.content = None - assert repr(response) == "Response(200 OK, no content)" - - def test_make(self): - r = Response.make() - assert r.status_code == 200 - assert r.content == b"" - - r = Response.make(418, "teatime") - assert r.status_code == 418 - assert r.content == b"teatime" - assert r.headers["content-length"] == "7" - - Response.make(content=b"foo") - Response.make(content="foo") - with raises(TypeError): - Response.make(content=42) - - r = Response.make(headers=[(b"foo", b"bar")]) - assert r.headers["foo"] == "bar" - - r = Response.make(headers=({"foo": "baz"})) - assert r.headers["foo"] == "baz" - - with raises(TypeError): - Response.make(headers=42) - - def test_status_code(self): - _test_passthrough_attr(tresp(), "status_code") - - def test_reason(self): - _test_decoded_attr(tresp(), "reason") - - -class TestResponseUtils: - """ - Tests for additional convenience methods. - """ - def test_get_cookies_none(self): - resp = tresp() - resp.headers = Headers() - assert not resp.cookies - - def test_get_cookies_empty(self): - resp = tresp() - resp.headers = Headers(set_cookie="") - assert not resp.cookies - - def test_get_cookies_simple(self): - resp = tresp() - resp.headers = Headers(set_cookie="cookiename=cookievalue") - result = resp.cookies - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"] == ("cookievalue", CookieAttrs()) - - def test_get_cookies_with_parameters(self): - resp = tresp() - cookie = "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly" - resp.headers = Headers(set_cookie=cookie) - result = resp.cookies - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0] == "cookievalue" - attrs = result["cookiename"][1] - assert len(attrs) == 4 - assert attrs["domain"] == "example.com" - assert attrs["expires"] == "Wed Oct 21 16:29:41 2015" - assert attrs["path"] == "/" - assert attrs["httponly"] is None - - def test_get_cookies_no_value(self): - resp = tresp() - resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/") - result = resp.cookies - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0] == "" - assert len(result["cookiename"][1]) == 2 - - def test_get_cookies_twocookies(self): - resp = tresp() - resp.headers = Headers([ - [b"Set-Cookie", b"cookiename=cookievalue"], - [b"Set-Cookie", b"othercookie=othervalue"] - ]) - result = resp.cookies - assert len(result) == 2 - assert "cookiename" in result - assert result["cookiename"] == ("cookievalue", CookieAttrs()) - assert "othercookie" in result - assert result["othercookie"] == ("othervalue", CookieAttrs()) - - def test_set_cookies(self): - resp = tresp() - resp.cookies["foo"] = ("bar", {}) - - assert len(resp.cookies) == 1 - assert resp.cookies["foo"] == ("bar", CookieAttrs()) - - def test_refresh(self): - r = tresp() - n = time.time() - r.headers["date"] = email.utils.formatdate(n) - pre = r.headers["date"] - r.refresh(n) - assert pre == r.headers["date"] - r.refresh(n + 60) - - d = email.utils.parsedate_tz(r.headers["date"]) - d = email.utils.mktime_tz(d) - # Weird that this is not exact... - assert abs(60 - (d - n)) <= 1 - - cookie = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure" - r.headers["set-cookie"] = cookie - r.refresh() - # Cookie refreshing is tested in test_cookies, we just make sure that it's triggered here. - assert cookie != r.headers["set-cookie"] diff --git a/test/netlib/http/test_status_codes.py b/test/netlib/http/test_status_codes.py deleted file mode 100644 index 9fea6b70..00000000 --- a/test/netlib/http/test_status_codes.py +++ /dev/null @@ -1,6 +0,0 @@ -from netlib.http import status_codes - - -def test_simple(): - assert status_codes.IM_A_TEAPOT == 418 - assert status_codes.RESPONSES[418] == "I'm a teapot" diff --git a/test/netlib/http/test_url.py b/test/netlib/http/test_url.py deleted file mode 100644 index 7cea6c58..00000000 --- a/test/netlib/http/test_url.py +++ /dev/null @@ -1,102 +0,0 @@ -from mitmproxy.test import tutils -from netlib.http import url - - -def test_parse(): - with tutils.raises(ValueError): - url.parse("") - - s, h, po, pa = url.parse(b"http://foo.com:8888/test") - assert s == b"http" - assert h == b"foo.com" - assert po == 8888 - assert pa == b"/test" - - s, h, po, pa = url.parse("http://foo/bar") - assert s == b"http" - assert h == b"foo" - assert po == 80 - assert pa == b"/bar" - - s, h, po, pa = url.parse(b"http://user:pass@foo/bar") - assert s == b"http" - assert h == b"foo" - assert po == 80 - assert pa == b"/bar" - - s, h, po, pa = url.parse(b"http://foo") - assert pa == b"/" - - s, h, po, pa = url.parse(b"https://foo") - assert po == 443 - - with tutils.raises(ValueError): - url.parse(b"https://foo:bar") - - # Invalid IDNA - with tutils.raises(ValueError): - url.parse("http://\xfafoo") - # Invalid PATH - with tutils.raises(ValueError): - url.parse("http:/\xc6/localhost:56121") - # Null byte in host - with tutils.raises(ValueError): - url.parse("http://foo\0") - # Port out of range - _, _, port, _ = url.parse("http://foo:999999") - assert port == 80 - # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt - with tutils.raises(ValueError): - url.parse('http://lo[calhost') - - -def test_unparse(): - assert url.unparse("http", "foo.com", 99, "") == "http://foo.com:99" - assert url.unparse("http", "foo.com", 80, "/bar") == "http://foo.com/bar" - assert url.unparse("https", "foo.com", 80, "") == "https://foo.com:80" - assert url.unparse("https", "foo.com", 443, "") == "https://foo.com" - - -surrogates = bytes(range(256)).decode("utf8", "surrogateescape") - -surrogates_quoted = ( - '%00%01%02%03%04%05%06%07%08%09%0A%0B%0C%0D%0E%0F' - '%10%11%12%13%14%15%16%17%18%19%1A%1B%1C%1D%1E%1F' - '%20%21%22%23%24%25%26%27%28%29%2A%2B%2C-./' - '0123456789%3A%3B%3C%3D%3E%3F' - '%40ABCDEFGHIJKLMNO' - 'PQRSTUVWXYZ%5B%5C%5D%5E_' - '%60abcdefghijklmno' - 'pqrstuvwxyz%7B%7C%7D%7E%7F' - '%80%81%82%83%84%85%86%87%88%89%8A%8B%8C%8D%8E%8F' - '%90%91%92%93%94%95%96%97%98%99%9A%9B%9C%9D%9E%9F' - '%A0%A1%A2%A3%A4%A5%A6%A7%A8%A9%AA%AB%AC%AD%AE%AF' - '%B0%B1%B2%B3%B4%B5%B6%B7%B8%B9%BA%BB%BC%BD%BE%BF' - '%C0%C1%C2%C3%C4%C5%C6%C7%C8%C9%CA%CB%CC%CD%CE%CF' - '%D0%D1%D2%D3%D4%D5%D6%D7%D8%D9%DA%DB%DC%DD%DE%DF' - '%E0%E1%E2%E3%E4%E5%E6%E7%E8%E9%EA%EB%EC%ED%EE%EF' - '%F0%F1%F2%F3%F4%F5%F6%F7%F8%F9%FA%FB%FC%FD%FE%FF' -) - - -def test_encode(): - assert url.encode([('foo', 'bar')]) - assert url.encode([('foo', surrogates)]) - - -def test_decode(): - s = "one=two&three=four" - assert len(url.decode(s)) == 2 - assert url.decode(surrogates) - - -def test_quote(): - assert url.quote("foo") == "foo" - assert url.quote("foo bar") == "foo%20bar" - assert url.quote(surrogates) == surrogates_quoted - - -def test_unquote(): - assert url.unquote("foo") == "foo" - assert url.unquote("foo%20bar") == "foo bar" - assert url.unquote(surrogates_quoted) == surrogates diff --git a/test/netlib/http/test_user_agents.py b/test/netlib/http/test_user_agents.py deleted file mode 100644 index 0bf1bba7..00000000 --- a/test/netlib/http/test_user_agents.py +++ /dev/null @@ -1,6 +0,0 @@ -from netlib.http import user_agents - - -def test_get_shortcut(): - assert user_agents.get_by_shortcut("c")[0] == "chrome" - assert not user_agents.get_by_shortcut("_") |