aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/http/http1/test_protocol.py0
-rw-r--r--test/http/test_models.py72
-rw-r--r--test/test_encoding.py2
-rw-r--r--test/test_wsgi.py21
-rw-r--r--test/websockets/test_websockets.py71
5 files changed, 81 insertions, 85 deletions
diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py
deleted file mode 100644
index e69de29b..00000000
--- a/test/http/http1/test_protocol.py
+++ /dev/null
diff --git a/test/http/test_models.py b/test/http/test_models.py
index 6970a6e4..d420b22b 100644
--- a/test/http/test_models.py
+++ b/test/http/test_models.py
@@ -58,20 +58,20 @@ class TestRequest(object):
req = tutils.treq()
req.headers["Accept-Encoding"] = "foobar"
req.anticomp()
- assert req.headers["Accept-Encoding"] == "identity"
+ assert req.headers["Accept-Encoding"] == b"identity"
def test_constrain_encoding(self):
req = tutils.treq()
req.headers["Accept-Encoding"] = "identity, gzip, foo"
req.constrain_encoding()
- assert "foo" not in req.headers["Accept-Encoding"]
+ assert b"foo" not in req.headers["Accept-Encoding"]
def test_update_host(self):
req = tutils.treq()
req.headers["Host"] = ""
req.host = "foobar"
req.update_host_header()
- assert req.headers["Host"] == "foobar"
+ assert req.headers["Host"] == b"foobar"
def test_get_form(self):
req = tutils.treq()
@@ -132,7 +132,7 @@ class TestRequest(object):
def test_set_path_components(self):
req = tutils.treq()
- req.set_path_components(["foo", "bar"])
+ req.set_path_components([b"foo", b"bar"])
# TODO: add meaningful assertions
def test_get_query(self):
@@ -140,7 +140,7 @@ class TestRequest(object):
assert req.get_query().lst == []
req.url = "http://localhost:80/foo?bar=42"
- assert req.get_query().lst == [("bar", "42")]
+ assert req.get_query().lst == [(b"bar", b"42")]
def test_set_query(self):
req = tutils.treq()
@@ -167,12 +167,12 @@ class TestRequest(object):
def test_pretty_url(self):
req = tutils.treq()
req.form_out = "authority"
- assert req.pretty_url(True) == "address:22"
- assert req.pretty_url(False) == "address:22"
+ assert req.pretty_url(True) == b"address:22"
+ assert req.pretty_url(False) == b"address:22"
req.form_out = "relative"
- assert req.pretty_url(True) == "http://address:22/path"
- assert req.pretty_url(False) == "http://address:22/path"
+ assert req.pretty_url(True) == b"http://address:22/path"
+ assert req.pretty_url(False) == b"http://address:22/path"
def test_get_cookies_none(self):
headers = Headers()
@@ -213,11 +213,11 @@ class TestRequest(object):
def test_set_url(self):
r = tutils.treq(form_in="absolute")
- r.url = "https://otheraddress:42/ORLY"
- assert r.scheme == "https"
- assert r.host == "otheraddress"
+ r.url = b"https://otheraddress:42/ORLY"
+ assert r.scheme == b"https"
+ assert r.host == b"otheraddress"
assert r.port == 42
- assert r.path == "/ORLY"
+ assert r.path == b"/ORLY"
try:
r.url = "//localhost:80/foo@bar"
@@ -374,8 +374,8 @@ class TestResponse(object):
def test_get_cookies_twocookies(self):
resp = tutils.tresp()
resp.headers = Headers([
- ["Set-Cookie", "cookiename=cookievalue"],
- ["Set-Cookie", "othercookie=othervalue"]
+ [b"Set-Cookie", b"cookiename=cookievalue"],
+ [b"Set-Cookie", b"othercookie=othervalue"]
])
result = resp.get_cookies()
assert len(result) == 2
@@ -399,8 +399,8 @@ class TestHeaders(object):
def _2host(self):
return Headers(
[
- ["Host", "example.com"],
- ["host", "example.org"]
+ [b"Host", b"example.com"],
+ [b"host", b"example.org"]
]
)
@@ -408,37 +408,37 @@ class TestHeaders(object):
headers = Headers()
assert len(headers) == 0
- headers = Headers([["Host", "example.com"]])
+ headers = Headers([[b"Host", b"example.com"]])
assert len(headers) == 1
- assert headers["Host"] == "example.com"
+ assert headers["Host"] == b"example.com"
headers = Headers(Host="example.com")
assert len(headers) == 1
- assert headers["Host"] == "example.com"
+ assert headers["Host"] == b"example.com"
headers = Headers(
- [["Host", "invalid"]],
+ [[b"Host", b"invalid"]],
Host="example.com"
)
assert len(headers) == 1
- assert headers["Host"] == "example.com"
+ assert headers["Host"] == b"example.com"
headers = Headers(
- [["Host", "invalid"], ["Accept", "text/plain"]],
+ [[b"Host", b"invalid"], [b"Accept", b"text/plain"]],
Host="example.com"
)
assert len(headers) == 2
- assert headers["Host"] == "example.com"
- assert headers["Accept"] == "text/plain"
+ assert headers["Host"] == b"example.com"
+ assert headers["Accept"] == b"text/plain"
def test_getitem(self):
headers = Headers(Host="example.com")
- assert headers["Host"] == "example.com"
- assert headers["host"] == "example.com"
+ assert headers["Host"] == b"example.com"
+ assert headers["host"] == b"example.com"
tutils.raises(KeyError, headers.__getitem__, "Accept")
headers = self._2host()
- assert headers["Host"] == "example.com, example.org"
+ assert headers["Host"] == b"example.com, example.org"
def test_str(self):
headers = Headers(Host="example.com")
@@ -458,12 +458,12 @@ class TestHeaders(object):
headers["Host"] = "example.com"
assert "Host" in headers
assert "host" in headers
- assert headers["Host"] == "example.com"
+ assert headers["Host"] == b"example.com"
headers["host"] = "example.org"
assert "Host" in headers
assert "host" in headers
- assert headers["Host"] == "example.org"
+ assert headers["Host"] == b"example.org"
headers["accept"] = "text/plain"
assert len(headers) == 2
@@ -494,12 +494,10 @@ class TestHeaders(object):
def test_keys(self):
headers = Headers(Host="example.com")
- assert len(headers.keys()) == 1
- assert headers.keys()[0] == "Host"
+ assert list(headers.keys()) == [b"Host"]
headers = self._2host()
- assert len(headers.keys()) == 1
- assert headers.keys()[0] == "Host"
+ assert list(headers.keys()) == [b"Host"]
def test_eq_ne(self):
headers1 = Headers(Host="example.com")
@@ -516,7 +514,7 @@ class TestHeaders(object):
def test_get_all(self):
headers = self._2host()
- assert headers.get_all("host") == ["example.com", "example.org"]
+ assert headers.get_all("host") == [b"example.com", b"example.org"]
assert headers.get_all("accept") == []
def test_set_all(self):
@@ -527,10 +525,10 @@ class TestHeaders(object):
headers = self._2host()
headers.set_all("Host", ["example.org"])
- assert headers["host"] == "example.org"
+ assert headers["host"] == b"example.org"
headers.set_all("Host", ["example.org", "example.net"])
- assert headers["host"] == "example.org, example.net"
+ assert headers["host"] == b"example.org, example.net"
def test_state(self):
headers = self._2host()
diff --git a/test/test_encoding.py b/test/test_encoding.py
index 90f99338..0ff1aad1 100644
--- a/test/test_encoding.py
+++ b/test/test_encoding.py
@@ -4,8 +4,6 @@ from netlib import encoding
def test_identity():
assert b"string" == encoding.decode("identity", b"string")
assert b"string" == encoding.encode("identity", b"string")
- assert b"string" == encoding.encode(b"identity", b"string")
- assert b"string" == encoding.decode(b"identity", b"string")
assert not encoding.encode("nonexistent", b"string")
assert not encoding.decode("nonexistent encoding", b"string")
diff --git a/test/test_wsgi.py b/test/test_wsgi.py
index 856967af..fe6f09b5 100644
--- a/test/test_wsgi.py
+++ b/test/test_wsgi.py
@@ -5,8 +5,8 @@ from netlib.http import Headers
def tflow():
- headers = Headers(test="value")
- req = wsgi.Request("http", "GET", "/", headers, "")
+ headers = Headers(test=b"value")
+ req = wsgi.Request("http", "GET", "/", "HTTP/1.1", headers, "")
return wsgi.Flow(("127.0.0.1", 8888), req)
@@ -20,7 +20,7 @@ class TestApp:
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
- return ['Hello', ' world!\n']
+ return [b'Hello', b' world!\n']
class TestWSGI:
@@ -47,8 +47,8 @@ class TestWSGI:
assert not err
val = wfile.getvalue()
- assert "Hello world" in val
- assert "Server:" in val
+ assert b"Hello world" in val
+ assert b"Server:" in val
def _serve(self, app):
w = wsgi.WSGIAdaptor(app, "foo", 80, "version")
@@ -77,7 +77,7 @@ class TestWSGI:
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
start_response(status, response_headers)
- assert "Internal Server Error" in self._serve(app)
+ assert b"Internal Server Error" in self._serve(app)
def test_serve_single_err(self):
def app(environ, start_response):
@@ -88,7 +88,8 @@ class TestWSGI:
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers, ei)
- assert "Internal Server Error" in self._serve(app)
+ yield b""
+ assert b"Internal Server Error" in self._serve(app)
def test_serve_double_err(self):
def app(environ, start_response):
@@ -99,7 +100,7 @@ class TestWSGI:
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
- yield "aaa"
+ yield b"aaa"
start_response(status, response_headers, ei)
- yield "bbb"
- assert "Internal Server Error" in self._serve(app)
+ yield b"bbb"
+ assert b"Internal Server Error" in self._serve(app)
diff --git a/test/websockets/test_websockets.py b/test/websockets/test_websockets.py
index 3af5dc9c..6f67b84d 100644
--- a/test/websockets/test_websockets.py
+++ b/test/websockets/test_websockets.py
@@ -41,7 +41,7 @@ class WebSocketsEchoHandler(tcp.BaseHandler):
key = self.protocol.check_client_handshake(req.headers)
preamble = 'HTTP/1.1 101 %s' % status_codes.RESPONSES.get(101)
- self.wfile.write(preamble + "\r\n")
+ self.wfile.write(preamble.encode() + b"\r\n")
headers = self.protocol.server_handshake_headers(key)
self.wfile.write(str(headers) + "\r\n")
self.wfile.flush()
@@ -62,11 +62,11 @@ class WebSocketsClient(tcp.TCPClient):
def connect(self):
super(WebSocketsClient, self).connect()
- preamble = 'GET / HTTP/1.1'
- self.wfile.write(preamble + "\r\n")
+ preamble = b'GET / HTTP/1.1'
+ self.wfile.write(preamble + b"\r\n")
headers = self.protocol.client_handshake_headers()
self.client_nonce = headers["sec-websocket-key"]
- self.wfile.write(str(headers) + "\r\n")
+ self.wfile.write(bytes(headers) + b"\r\n")
self.wfile.flush()
resp = read_response(self.rfile, treq(method="GET"))
@@ -101,7 +101,7 @@ class TestWebSockets(tservers.ServerTestBase):
assert response == msg
def test_simple_echo(self):
- self.echo("hello I'm the client")
+ self.echo(b"hello I'm the client")
def test_frame_sizes(self):
# length can fit in the the 7 bit payload length
@@ -161,10 +161,10 @@ class BadHandshakeHandler(WebSocketsEchoHandler):
client_hs = read_request(self.rfile)
self.protocol.check_client_handshake(client_hs.headers)
- preamble = 'HTTP/1.1 101 %s' % status_codes.RESPONSES.get(101)
- self.wfile.write(preamble + "\r\n")
- headers = self.protocol.server_handshake_headers("malformed key")
- self.wfile.write(str(headers) + "\r\n")
+ preamble = 'HTTP/1.1 101 %s\r\n' % status_codes.RESPONSES.get(101)
+ self.wfile.write(preamble.encode())
+ headers = self.protocol.server_handshake_headers(b"malformed key")
+ self.wfile.write(bytes(headers) + b"\r\n")
self.wfile.flush()
self.handshake_done = True
@@ -180,7 +180,7 @@ class TestBadHandshake(tservers.ServerTestBase):
def test(self):
client = WebSocketsClient(("127.0.0.1", self.port))
client.connect()
- client.send_message("hello")
+ client.send_message(b"hello")
class TestFrameHeader:
@@ -188,8 +188,7 @@ class TestFrameHeader:
def test_roundtrip(self):
def round(*args, **kwargs):
f = websockets.FrameHeader(*args, **kwargs)
- bytes = f.to_bytes()
- f2 = websockets.FrameHeader.from_file(tutils.treader(bytes))
+ f2 = websockets.FrameHeader.from_file(tutils.treader(bytes(f)))
assert f == f2
round()
round(fin=1)
@@ -201,11 +200,11 @@ class TestFrameHeader:
round(payload_length=1000)
round(payload_length=10000)
round(opcode=websockets.OPCODE.PING)
- round(masking_key="test")
+ round(masking_key=b"test")
def test_human_readable(self):
f = websockets.FrameHeader(
- masking_key="test",
+ masking_key=b"test",
fin=True,
payload_length=10
)
@@ -214,23 +213,23 @@ class TestFrameHeader:
assert f.human_readable()
def test_funky(self):
- f = websockets.FrameHeader(masking_key="test", mask=False)
+ f = websockets.FrameHeader(masking_key=b"test", mask=False)
bytes = f.to_bytes()
f2 = websockets.FrameHeader.from_file(tutils.treader(bytes))
assert not f2.mask
def test_violations(self):
tutils.raises("opcode", websockets.FrameHeader, opcode=17)
- tutils.raises("masking key", websockets.FrameHeader, masking_key="x")
+ tutils.raises("masking key", websockets.FrameHeader, masking_key=b"x")
def test_automask(self):
f = websockets.FrameHeader(mask=True)
assert f.masking_key
- f = websockets.FrameHeader(masking_key="foob")
+ f = websockets.FrameHeader(masking_key=b"foob")
assert f.mask
- f = websockets.FrameHeader(masking_key="foob", mask=0)
+ f = websockets.FrameHeader(masking_key=b"foob", mask=0)
assert not f.mask
assert f.masking_key
@@ -240,31 +239,31 @@ class TestFrame:
def test_roundtrip(self):
def round(*args, **kwargs):
f = websockets.Frame(*args, **kwargs)
- bytes = f.to_bytes()
- f2 = websockets.Frame.from_file(tutils.treader(bytes))
+ raw = bytes(f)
+ f2 = websockets.Frame.from_file(tutils.treader(raw))
assert f == f2
- round("test")
- round("test", fin=1)
- round("test", rsv1=1)
- round("test", opcode=websockets.OPCODE.PING)
- round("test", masking_key="test")
+ round(b"test")
+ round(b"test", fin=1)
+ round(b"test", rsv1=1)
+ round(b"test", opcode=websockets.OPCODE.PING)
+ round(b"test", masking_key=b"test")
def test_human_readable(self):
f = websockets.Frame()
- assert f.human_readable()
+ assert repr(f)
def test_masker():
tests = [
- ["a"],
- ["four"],
- ["fourf"],
- ["fourfive"],
- ["a", "aasdfasdfa", "asdf"],
- ["a" * 50, "aasdfasdfa", "asdf"],
+ [b"a"],
+ [b"four"],
+ [b"fourf"],
+ [b"fourfive"],
+ [b"a", b"aasdfasdfa", b"asdf"],
+ [b"a" * 50, b"aasdfasdfa", b"asdf"],
]
for i in tests:
- m = websockets.Masker("abcd")
- data = "".join([m(t) for t in i])
- data2 = websockets.Masker("abcd")(data)
- assert data2 == "".join(i)
+ m = websockets.Masker(b"abcd")
+ data = b"".join([m(t) for t in i])
+ data2 = websockets.Masker(b"abcd")(data)
+ assert data2 == b"".join(i)