aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/http/http1/test_protocol.py159
-rw-r--r--test/http/http2/test_frames.py4
-rw-r--r--test/http/test_authentication.py2
-rw-r--r--test/http/test_semantics.py2
-rw-r--r--test/test_encoding.py24
-rw-r--r--test/test_utils.py75
-rw-r--r--test/test_version_check.py8
-rw-r--r--test/tservers.py8
8 files changed, 131 insertions, 151 deletions
diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py
index f7c615bd..bdcba5cb 100644
--- a/test/http/http1/test_protocol.py
+++ b/test/http/http1/test_protocol.py
@@ -1,9 +1,12 @@
-import cStringIO
+from io import BytesIO
import textwrap
+from http.http1.protocol import _parse_authority_form
+from netlib.exceptions import HttpSyntaxException, HttpReadDisconnect, HttpException
-from netlib import http, odict, tcp, tutils
+from netlib import http, tcp, tutils
from netlib.http import semantics, Headers
-from netlib.http.http1 import HTTP1Protocol
+from netlib.http.http1 import HTTP1Protocol, read_message_body, read_request, \
+ read_message_body_chunked, expected_http_body_size
from ... import tservers
@@ -14,8 +17,8 @@ class NoContentLengthHTTPHandler(tcp.BaseHandler):
def mock_protocol(data=''):
- rfile = cStringIO.StringIO(data)
- wfile = cStringIO.StringIO()
+ rfile = BytesIO(data)
+ wfile = BytesIO()
return HTTP1Protocol(rfile=rfile, wfile=wfile)
@@ -37,53 +40,35 @@ def test_stripped_chunked_encoding_no_content():
assert "Content-Length" in mock_protocol()._assemble_response_headers(r)
-def test_has_chunked_encoding():
- headers = http.Headers()
- assert not HTTP1Protocol.has_chunked_encoding(headers)
- headers["transfer-encoding"] = "chunked"
- assert HTTP1Protocol.has_chunked_encoding(headers)
-
-
def test_read_chunked():
- headers = http.Headers()
- headers["transfer-encoding"] = "chunked"
+ req = tutils.treq(None)
+ req.headers["Transfer-Encoding"] = "chunked"
- data = "1\r\na\r\n0\r\n"
- tutils.raises(
- "malformed chunked body",
- mock_protocol(data).read_http_body,
- headers, None, "GET", None, True
- )
+ data = b"1\r\na\r\n0\r\n"
+ with tutils.raises(HttpSyntaxException):
+ read_message_body(BytesIO(data), req)
- data = "1\r\na\r\n0\r\n\r\n"
- assert mock_protocol(data).read_http_body(headers, None, "GET", None, True) == "a"
+ data = b"1\r\na\r\n0\r\n\r\n"
+ assert read_message_body(BytesIO(data), req) == b"a"
- data = "\r\n\r\n1\r\na\r\n0\r\n\r\n"
- assert mock_protocol(data).read_http_body(headers, None, "GET", None, True) == "a"
+ data = b"\r\n\r\n1\r\na\r\n1\r\nb\r\n0\r\n\r\n"
+ assert read_message_body(BytesIO(data), req) == b"ab"
- data = "\r\n"
- tutils.raises(
- "closed prematurely",
- mock_protocol(data).read_http_body,
- headers, None, "GET", None, True
- )
+ data = b"\r\n"
+ with tutils.raises("closed prematurely"):
+ read_message_body(BytesIO(data), req)
- data = "1\r\nfoo"
- tutils.raises(
- "malformed chunked body",
- mock_protocol(data).read_http_body,
- headers, None, "GET", None, True
- )
+ data = b"1\r\nfoo"
+ with tutils.raises("malformed chunked body"):
+ read_message_body(BytesIO(data), req)
- data = "foo\r\nfoo"
- tutils.raises(
- http.HttpError,
- mock_protocol(data).read_http_body,
- headers, None, "GET", None, True
- )
+ data = b"foo\r\nfoo"
+ with tutils.raises(HttpSyntaxException):
+ read_message_body(BytesIO(data), req)
- data = "5\r\naaaaa\r\n0\r\n\r\n"
- tutils.raises("too large", mock_protocol(data).read_http_body, headers, 2, "GET", None, True)
+ data = b"5\r\naaaaa\r\n0\r\n\r\n"
+ with tutils.raises("too large"):
+ read_message_body(BytesIO(data), req, limit=2)
def test_connection_close():
@@ -171,52 +156,37 @@ def test_read_http_body():
def test_expected_http_body_size():
# gibber in the content-length field
headers = Headers(content_length="foo")
- assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) is None
+ with tutils.raises(HttpSyntaxException):
+ expected_http_body_size(headers, False, "GET", 200) is None
# negative number in the content-length field
headers = Headers(content_length="-7")
- assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) is None
+ with tutils.raises(HttpSyntaxException):
+ expected_http_body_size(headers, False, "GET", 200) is None
# explicit length
headers = Headers(content_length="5")
- assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) == 5
+ assert expected_http_body_size(headers, False, "GET", 200) == 5
# no length
headers = Headers()
- assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) == -1
+ assert expected_http_body_size(headers, False, "GET", 200) == -1
# no length request
headers = Headers()
- assert HTTP1Protocol.expected_http_body_size(headers, True, "GET", None) == 0
-
-
-def test_get_request_line():
- data = "\nfoo"
- p = mock_protocol(data)
- assert p._get_request_line() == "foo"
- assert not p._get_request_line()
-
-
-def test_parse_http_protocol():
- assert HTTP1Protocol._parse_http_protocol("HTTP/1.1") == (1, 1)
- assert HTTP1Protocol._parse_http_protocol("HTTP/0.0") == (0, 0)
- assert not HTTP1Protocol._parse_http_protocol("HTTP/a.1")
- assert not HTTP1Protocol._parse_http_protocol("HTTP/1.a")
- assert not HTTP1Protocol._parse_http_protocol("foo/0.0")
- assert not HTTP1Protocol._parse_http_protocol("HTTP/x")
+ assert expected_http_body_size(headers, True, "GET", None) == 0
+ # expect header
+ headers = Headers(content_length="5", expect="100-continue")
+ assert expected_http_body_size(headers, True, "GET", None) == 0
def test_parse_init_connect():
- assert HTTP1Protocol._parse_init_connect("CONNECT host.com:443 HTTP/1.0")
- assert not HTTP1Protocol._parse_init_connect("C\xfeONNECT host.com:443 HTTP/1.0")
- assert not HTTP1Protocol._parse_init_connect("CONNECT \0host.com:443 HTTP/1.0")
- assert not HTTP1Protocol._parse_init_connect("CONNECT host.com:444444 HTTP/1.0")
- assert not HTTP1Protocol._parse_init_connect("bogus")
- assert not HTTP1Protocol._parse_init_connect("GET host.com:443 HTTP/1.0")
- assert not HTTP1Protocol._parse_init_connect("CONNECT host.com443 HTTP/1.0")
- assert not HTTP1Protocol._parse_init_connect("CONNECT host.com:443 foo/1.0")
- assert not HTTP1Protocol._parse_init_connect("CONNECT host.com:foo HTTP/1.0")
+ assert _parse_authority_form(b"CONNECT host.com:443 HTTP/1.0")
+ tutils.raises(ValueError,_parse_authority_form, b"\0host.com:443")
+ tutils.raises(ValueError,_parse_authority_form, b"host.com:444444")
+ tutils.raises(ValueError,_parse_authority_form, b"CONNECT host.com443 HTTP/1.0")
+ tutils.raises(ValueError,_parse_authority_form, b"CONNECT host.com:foo HTTP/1.0")
def test_parse_init_proxy():
- u = "GET http://foo.com:8888/test HTTP/1.1"
- m, s, h, po, pa, httpversion = HTTP1Protocol._parse_init_proxy(u)
+ u = b"GET http://foo.com:8888/test HTTP/1.1"
+ m, s, h, po, pa, httpversion = HTTP1Protocol._parse_absolute_form(u)
assert m == "GET"
assert s == "http"
assert h == "foo.com"
@@ -225,11 +195,14 @@ def test_parse_init_proxy():
assert httpversion == (1, 1)
u = "G\xfeET http://foo.com:8888/test HTTP/1.1"
- assert not HTTP1Protocol._parse_init_proxy(u)
+ assert not HTTP1Protocol._parse_absolute_form(u)
- assert not HTTP1Protocol._parse_init_proxy("invalid")
- assert not HTTP1Protocol._parse_init_proxy("GET invalid HTTP/1.1")
- assert not HTTP1Protocol._parse_init_proxy("GET http://foo.com:8888/test foo/1.1")
+ with tutils.raises(ValueError):
+ assert not HTTP1Protocol._parse_absolute_form("invalid")
+ with tutils.raises(ValueError):
+ assert not HTTP1Protocol._parse_absolute_form("GET invalid HTTP/1.1")
+ with tutils.raises(ValueError):
+ assert not HTTP1Protocol._parse_absolute_form("GET http://foo.com:8888/test foo/1.1")
def test_parse_init_http():
@@ -317,15 +290,11 @@ class TestReadRequest(object):
"get / HTTP/1.1\r\nfoo"
)
tutils.raises(
- tcp.NetLibDisconnect,
+ HttpReadDisconnect,
self.tst,
"\r\n"
)
- def test_empty(self):
- v = self.tst("", allow_empty=True)
- assert isinstance(v, semantics.EmptyRequest)
-
def test_asterisk_form_in(self):
v = self.tst("OPTIONS * HTTP/1.1")
assert v.form_in == "relative"
@@ -356,18 +325,18 @@ class TestReadRequest(object):
assert v.host == "foo.com"
def test_expect(self):
- data = "".join(
- "GET / HTTP/1.1\r\n"
- "Content-Length: 3\r\n"
- "Expect: 100-continue\r\n\r\n"
- "foobar"
+ data = (
+ b"GET / HTTP/1.1\r\n"
+ b"Content-Length: 3\r\n"
+ b"Expect: 100-continue\r\n"
+ b"\r\n"
+ b"foobar"
)
- p = mock_protocol(data)
- v = p.read_request()
- assert p.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
- assert v.body == "foo"
- assert p.tcp_handler.rfile.read(3) == "bar"
+ rfile = BytesIO(data)
+ r = read_request(rfile)
+ assert r.body == b""
+ assert rfile.read(-1) == b"foobar"
class TestReadResponse(object):
diff --git a/test/http/http2/test_frames.py b/test/http/http2/test_frames.py
index 5d5cb0ba..efdb55e2 100644
--- a/test/http/http2/test_frames.py
+++ b/test/http/http2/test_frames.py
@@ -1,4 +1,4 @@
-import cStringIO
+from io import BytesIO
from nose.tools import assert_equal
from netlib import tcp, tutils
@@ -7,7 +7,7 @@ from netlib.http.http2.frame import *
def hex_to_file(data):
data = data.decode('hex')
- return tcp.Reader(cStringIO.StringIO(data))
+ return tcp.Reader(BytesIO(data))
def test_invalid_flags():
diff --git a/test/http/test_authentication.py b/test/http/test_authentication.py
index 17c91fe5..ee192dd7 100644
--- a/test/http/test_authentication.py
+++ b/test/http/test_authentication.py
@@ -5,7 +5,7 @@ from netlib.http import authentication, Headers
def test_parse_http_basic_auth():
- vals = ("basic", "foo", "bar")
+ vals = (b"basic", b"foo", b"bar")
assert authentication.parse_http_basic_auth(
authentication.assemble_http_basic_auth(*vals)
) == vals
diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py
index 6dcbbe07..44d3c85e 100644
--- a/test/http/test_semantics.py
+++ b/test/http/test_semantics.py
@@ -475,7 +475,7 @@ class TestHeaders(object):
def test_str(self):
headers = semantics.Headers(Host="example.com")
- assert str(headers) == "Host: example.com\r\n"
+ assert bytes(headers) == "Host: example.com\r\n"
headers = semantics.Headers([
["Host", "example.com"],
diff --git a/test/test_encoding.py b/test/test_encoding.py
index 612aea89..9da3a38d 100644
--- a/test/test_encoding.py
+++ b/test/test_encoding.py
@@ -9,25 +9,29 @@ def test_identity():
def test_gzip():
- assert "string" == encoding.decode(
+ assert b"string" == encoding.decode(
"gzip",
encoding.encode(
"gzip",
- "string"))
- assert None == encoding.decode("gzip", "bogus")
+ b"string"
+ )
+ )
+ assert encoding.decode("gzip", b"bogus") is None
def test_deflate():
- assert "string" == encoding.decode(
+ assert b"string" == encoding.decode(
"deflate",
encoding.encode(
"deflate",
- "string"))
- assert "string" == encoding.decode(
+ b"string"
+ )
+ )
+ assert b"string" == encoding.decode(
"deflate",
encoding.encode(
"deflate",
- "string")[
- 2:-
- 4])
- assert None == encoding.decode("deflate", "bogus")
+ b"string"
+ )[2:-4]
+ )
+ assert encoding.decode("deflate", b"bogus") is None
diff --git a/test/test_utils.py b/test/test_utils.py
index 9dba5d35..8b2ddae4 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -36,46 +36,51 @@ def test_pretty_size():
def test_parse_url():
- assert not utils.parse_url("")
+ with tutils.raises(ValueError):
+ utils.parse_url("")
- u = "http://foo.com:8888/test"
- s, h, po, pa = utils.parse_url(u)
- assert s == "http"
- assert h == "foo.com"
+ s, h, po, pa = utils.parse_url(b"http://foo.com:8888/test")
+ assert s == b"http"
+ assert h == b"foo.com"
assert po == 8888
- assert pa == "/test"
+ assert pa == b"/test"
s, h, po, pa = utils.parse_url("http://foo/bar")
- assert s == "http"
- assert h == "foo"
+ assert s == b"http"
+ assert h == b"foo"
assert po == 80
- assert pa == "/bar"
+ assert pa == b"/bar"
- s, h, po, pa = utils.parse_url("http://user:pass@foo/bar")
- assert s == "http"
- assert h == "foo"
+ s, h, po, pa = utils.parse_url(b"http://user:pass@foo/bar")
+ assert s == b"http"
+ assert h == b"foo"
assert po == 80
- assert pa == "/bar"
+ assert pa == b"/bar"
- s, h, po, pa = utils.parse_url("http://foo")
- assert pa == "/"
+ s, h, po, pa = utils.parse_url(b"http://foo")
+ assert pa == b"/"
- s, h, po, pa = utils.parse_url("https://foo")
+ s, h, po, pa = utils.parse_url(b"https://foo")
assert po == 443
- assert not utils.parse_url("https://foo:bar")
- assert not utils.parse_url("https://foo:")
+ with tutils.raises(ValueError):
+ utils.parse_url(b"https://foo:bar")
# Invalid IDNA
- assert not utils.parse_url("http://\xfafoo")
+ with tutils.raises(ValueError):
+ utils.parse_url("http://\xfafoo")
# Invalid PATH
- assert not utils.parse_url("http:/\xc6/localhost:56121")
+ with tutils.raises(ValueError):
+ utils.parse_url("http:/\xc6/localhost:56121")
# Null byte in host
- assert not utils.parse_url("http://foo\0")
+ with tutils.raises(ValueError):
+ utils.parse_url("http://foo\0")
# Port out of range
- assert not utils.parse_url("http://foo:999999")
+ _, _, port, _ = utils.parse_url("http://foo:999999")
+ assert port == 80
# Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
- assert not utils.parse_url('http://lo[calhost')
+ with tutils.raises(ValueError):
+ utils.parse_url('http://lo[calhost')
def test_unparse_url():
@@ -106,23 +111,25 @@ def test_get_header_tokens():
def test_multipartdecode():
- boundary = 'somefancyboundary'
+ boundary = b'somefancyboundary'
headers = Headers(
- content_type='multipart/form-data; boundary=%s' % boundary
+ content_type=b'multipart/form-data; boundary=' + boundary
+ )
+ content = (
+ "--{0}\n"
+ "Content-Disposition: form-data; name=\"field1\"\n\n"
+ "value1\n"
+ "--{0}\n"
+ "Content-Disposition: form-data; name=\"field2\"\n\n"
+ "value2\n"
+ "--{0}--".format(boundary).encode("ascii")
)
- content = "--{0}\n" \
- "Content-Disposition: form-data; name=\"field1\"\n\n" \
- "value1\n" \
- "--{0}\n" \
- "Content-Disposition: form-data; name=\"field2\"\n\n" \
- "value2\n" \
- "--{0}--".format(boundary)
form = utils.multipartdecode(headers, content)
assert len(form) == 2
- assert form[0] == ('field1', 'value1')
- assert form[1] == ('field2', 'value2')
+ assert form[0] == (b"field1", b"value1")
+ assert form[1] == (b"field2", b"value2")
def test_parse_content_type():
diff --git a/test/test_version_check.py b/test/test_version_check.py
index 9a127814..ec2396fe 100644
--- a/test/test_version_check.py
+++ b/test/test_version_check.py
@@ -1,11 +1,11 @@
-import cStringIO
+from io import StringIO
import mock
from netlib import version_check, version
@mock.patch("sys.exit")
def test_check_mitmproxy_version(sexit):
- fp = cStringIO.StringIO()
+ fp = StringIO()
version_check.check_mitmproxy_version(version.IVERSION, fp=fp)
assert not fp.getvalue()
assert not sexit.called
@@ -18,7 +18,7 @@ def test_check_mitmproxy_version(sexit):
@mock.patch("sys.exit")
def test_check_pyopenssl_version(sexit):
- fp = cStringIO.StringIO()
+ fp = StringIO()
version_check.check_pyopenssl_version(fp=fp)
assert not fp.getvalue()
assert not sexit.called
@@ -32,7 +32,7 @@ def test_check_pyopenssl_version(sexit):
@mock.patch("OpenSSL.__version__")
def test_unparseable_pyopenssl_version(version, sexit):
version.split.return_value = ["foo", "bar"]
- fp = cStringIO.StringIO()
+ fp = StringIO()
version_check.check_pyopenssl_version(fp=fp)
assert "Cannot parse" in fp.getvalue()
assert not sexit.called
diff --git a/test/tservers.py b/test/tservers.py
index 682a9144..1f4ce725 100644
--- a/test/tservers.py
+++ b/test/tservers.py
@@ -1,7 +1,7 @@
from __future__ import (absolute_import, print_function, division)
import threading
-import Queue
-import cStringIO
+from six.moves import queue
+from io import StringIO
import OpenSSL
from netlib import tcp
from netlib import tutils
@@ -27,7 +27,7 @@ class ServerTestBase(object):
@classmethod
def setupAll(cls):
- cls.q = Queue.Queue()
+ cls.q = queue.Queue()
s = cls.makeserver()
cls.port = s.address.port
cls.server = ServerThread(s)
@@ -102,6 +102,6 @@ class TServer(tcp.TCPServer):
h.finish()
def handle_error(self, connection, client_address, fp=None):
- s = cStringIO.StringIO()
+ s = StringIO()
tcp.TCPServer.handle_error(self, connection, client_address, s)
self.q.put(s.getvalue())