diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/http/http1/test_protocol.py | 0 | ||||
-rw-r--r-- | test/http/test_models.py | 72 | ||||
-rw-r--r-- | test/test_encoding.py | 2 | ||||
-rw-r--r-- | test/test_wsgi.py | 21 | ||||
-rw-r--r-- | test/websockets/test_websockets.py | 71 |
5 files changed, 81 insertions, 85 deletions
diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py deleted file mode 100644 index e69de29b..00000000 --- a/test/http/http1/test_protocol.py +++ /dev/null diff --git a/test/http/test_models.py b/test/http/test_models.py index 6970a6e4..d420b22b 100644 --- a/test/http/test_models.py +++ b/test/http/test_models.py @@ -58,20 +58,20 @@ class TestRequest(object): req = tutils.treq() req.headers["Accept-Encoding"] = "foobar" req.anticomp() - assert req.headers["Accept-Encoding"] == "identity" + assert req.headers["Accept-Encoding"] == b"identity" def test_constrain_encoding(self): req = tutils.treq() req.headers["Accept-Encoding"] = "identity, gzip, foo" req.constrain_encoding() - assert "foo" not in req.headers["Accept-Encoding"] + assert b"foo" not in req.headers["Accept-Encoding"] def test_update_host(self): req = tutils.treq() req.headers["Host"] = "" req.host = "foobar" req.update_host_header() - assert req.headers["Host"] == "foobar" + assert req.headers["Host"] == b"foobar" def test_get_form(self): req = tutils.treq() @@ -132,7 +132,7 @@ class TestRequest(object): def test_set_path_components(self): req = tutils.treq() - req.set_path_components(["foo", "bar"]) + req.set_path_components([b"foo", b"bar"]) # TODO: add meaningful assertions def test_get_query(self): @@ -140,7 +140,7 @@ class TestRequest(object): assert req.get_query().lst == [] req.url = "http://localhost:80/foo?bar=42" - assert req.get_query().lst == [("bar", "42")] + assert req.get_query().lst == [(b"bar", b"42")] def test_set_query(self): req = tutils.treq() @@ -167,12 +167,12 @@ class TestRequest(object): def test_pretty_url(self): req = tutils.treq() req.form_out = "authority" - assert req.pretty_url(True) == "address:22" - assert req.pretty_url(False) == "address:22" + assert req.pretty_url(True) == b"address:22" + assert req.pretty_url(False) == b"address:22" req.form_out = "relative" - assert req.pretty_url(True) == "http://address:22/path" - assert req.pretty_url(False) == "http://address:22/path" + assert req.pretty_url(True) == b"http://address:22/path" + assert req.pretty_url(False) == b"http://address:22/path" def test_get_cookies_none(self): headers = Headers() @@ -213,11 +213,11 @@ class TestRequest(object): def test_set_url(self): r = tutils.treq(form_in="absolute") - r.url = "https://otheraddress:42/ORLY" - assert r.scheme == "https" - assert r.host == "otheraddress" + r.url = b"https://otheraddress:42/ORLY" + assert r.scheme == b"https" + assert r.host == b"otheraddress" assert r.port == 42 - assert r.path == "/ORLY" + assert r.path == b"/ORLY" try: r.url = "//localhost:80/foo@bar" @@ -374,8 +374,8 @@ class TestResponse(object): def test_get_cookies_twocookies(self): resp = tutils.tresp() resp.headers = Headers([ - ["Set-Cookie", "cookiename=cookievalue"], - ["Set-Cookie", "othercookie=othervalue"] + [b"Set-Cookie", b"cookiename=cookievalue"], + [b"Set-Cookie", b"othercookie=othervalue"] ]) result = resp.get_cookies() assert len(result) == 2 @@ -399,8 +399,8 @@ class TestHeaders(object): def _2host(self): return Headers( [ - ["Host", "example.com"], - ["host", "example.org"] + [b"Host", b"example.com"], + [b"host", b"example.org"] ] ) @@ -408,37 +408,37 @@ class TestHeaders(object): headers = Headers() assert len(headers) == 0 - headers = Headers([["Host", "example.com"]]) + headers = Headers([[b"Host", b"example.com"]]) assert len(headers) == 1 - assert headers["Host"] == "example.com" + assert headers["Host"] == b"example.com" headers = Headers(Host="example.com") assert len(headers) == 1 - assert headers["Host"] == "example.com" + assert headers["Host"] == b"example.com" headers = Headers( - [["Host", "invalid"]], + [[b"Host", b"invalid"]], Host="example.com" ) assert len(headers) == 1 - assert headers["Host"] == "example.com" + assert headers["Host"] == b"example.com" headers = Headers( - [["Host", "invalid"], ["Accept", "text/plain"]], + [[b"Host", b"invalid"], [b"Accept", b"text/plain"]], Host="example.com" ) assert len(headers) == 2 - assert headers["Host"] == "example.com" - assert headers["Accept"] == "text/plain" + assert headers["Host"] == b"example.com" + assert headers["Accept"] == b"text/plain" def test_getitem(self): headers = Headers(Host="example.com") - assert headers["Host"] == "example.com" - assert headers["host"] == "example.com" + assert headers["Host"] == b"example.com" + assert headers["host"] == b"example.com" tutils.raises(KeyError, headers.__getitem__, "Accept") headers = self._2host() - assert headers["Host"] == "example.com, example.org" + assert headers["Host"] == b"example.com, example.org" def test_str(self): headers = Headers(Host="example.com") @@ -458,12 +458,12 @@ class TestHeaders(object): headers["Host"] = "example.com" assert "Host" in headers assert "host" in headers - assert headers["Host"] == "example.com" + assert headers["Host"] == b"example.com" headers["host"] = "example.org" assert "Host" in headers assert "host" in headers - assert headers["Host"] == "example.org" + assert headers["Host"] == b"example.org" headers["accept"] = "text/plain" assert len(headers) == 2 @@ -494,12 +494,10 @@ class TestHeaders(object): def test_keys(self): headers = Headers(Host="example.com") - assert len(headers.keys()) == 1 - assert headers.keys()[0] == "Host" + assert list(headers.keys()) == [b"Host"] headers = self._2host() - assert len(headers.keys()) == 1 - assert headers.keys()[0] == "Host" + assert list(headers.keys()) == [b"Host"] def test_eq_ne(self): headers1 = Headers(Host="example.com") @@ -516,7 +514,7 @@ class TestHeaders(object): def test_get_all(self): headers = self._2host() - assert headers.get_all("host") == ["example.com", "example.org"] + assert headers.get_all("host") == [b"example.com", b"example.org"] assert headers.get_all("accept") == [] def test_set_all(self): @@ -527,10 +525,10 @@ class TestHeaders(object): headers = self._2host() headers.set_all("Host", ["example.org"]) - assert headers["host"] == "example.org" + assert headers["host"] == b"example.org" headers.set_all("Host", ["example.org", "example.net"]) - assert headers["host"] == "example.org, example.net" + assert headers["host"] == b"example.org, example.net" def test_state(self): headers = self._2host() diff --git a/test/test_encoding.py b/test/test_encoding.py index 90f99338..0ff1aad1 100644 --- a/test/test_encoding.py +++ b/test/test_encoding.py @@ -4,8 +4,6 @@ from netlib import encoding def test_identity(): assert b"string" == encoding.decode("identity", b"string") assert b"string" == encoding.encode("identity", b"string") - assert b"string" == encoding.encode(b"identity", b"string") - assert b"string" == encoding.decode(b"identity", b"string") assert not encoding.encode("nonexistent", b"string") assert not encoding.decode("nonexistent encoding", b"string") diff --git a/test/test_wsgi.py b/test/test_wsgi.py index 856967af..fe6f09b5 100644 --- a/test/test_wsgi.py +++ b/test/test_wsgi.py @@ -5,8 +5,8 @@ from netlib.http import Headers def tflow(): - headers = Headers(test="value") - req = wsgi.Request("http", "GET", "/", headers, "") + headers = Headers(test=b"value") + req = wsgi.Request("http", "GET", "/", "HTTP/1.1", headers, "") return wsgi.Flow(("127.0.0.1", 8888), req) @@ -20,7 +20,7 @@ class TestApp: status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) - return ['Hello', ' world!\n'] + return [b'Hello', b' world!\n'] class TestWSGI: @@ -47,8 +47,8 @@ class TestWSGI: assert not err val = wfile.getvalue() - assert "Hello world" in val - assert "Server:" in val + assert b"Hello world" in val + assert b"Server:" in val def _serve(self, app): w = wsgi.WSGIAdaptor(app, "foo", 80, "version") @@ -77,7 +77,7 @@ class TestWSGI: response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) start_response(status, response_headers) - assert "Internal Server Error" in self._serve(app) + assert b"Internal Server Error" in self._serve(app) def test_serve_single_err(self): def app(environ, start_response): @@ -88,7 +88,8 @@ class TestWSGI: status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers, ei) - assert "Internal Server Error" in self._serve(app) + yield b"" + assert b"Internal Server Error" in self._serve(app) def test_serve_double_err(self): def app(environ, start_response): @@ -99,7 +100,7 @@ class TestWSGI: status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) - yield "aaa" + yield b"aaa" start_response(status, response_headers, ei) - yield "bbb" - assert "Internal Server Error" in self._serve(app) + yield b"bbb" + assert b"Internal Server Error" in self._serve(app) diff --git a/test/websockets/test_websockets.py b/test/websockets/test_websockets.py index 3af5dc9c..6f67b84d 100644 --- a/test/websockets/test_websockets.py +++ b/test/websockets/test_websockets.py @@ -41,7 +41,7 @@ class WebSocketsEchoHandler(tcp.BaseHandler): key = self.protocol.check_client_handshake(req.headers) preamble = 'HTTP/1.1 101 %s' % status_codes.RESPONSES.get(101) - self.wfile.write(preamble + "\r\n") + self.wfile.write(preamble.encode() + b"\r\n") headers = self.protocol.server_handshake_headers(key) self.wfile.write(str(headers) + "\r\n") self.wfile.flush() @@ -62,11 +62,11 @@ class WebSocketsClient(tcp.TCPClient): def connect(self): super(WebSocketsClient, self).connect() - preamble = 'GET / HTTP/1.1' - self.wfile.write(preamble + "\r\n") + preamble = b'GET / HTTP/1.1' + self.wfile.write(preamble + b"\r\n") headers = self.protocol.client_handshake_headers() self.client_nonce = headers["sec-websocket-key"] - self.wfile.write(str(headers) + "\r\n") + self.wfile.write(bytes(headers) + b"\r\n") self.wfile.flush() resp = read_response(self.rfile, treq(method="GET")) @@ -101,7 +101,7 @@ class TestWebSockets(tservers.ServerTestBase): assert response == msg def test_simple_echo(self): - self.echo("hello I'm the client") + self.echo(b"hello I'm the client") def test_frame_sizes(self): # length can fit in the the 7 bit payload length @@ -161,10 +161,10 @@ class BadHandshakeHandler(WebSocketsEchoHandler): client_hs = read_request(self.rfile) self.protocol.check_client_handshake(client_hs.headers) - preamble = 'HTTP/1.1 101 %s' % status_codes.RESPONSES.get(101) - self.wfile.write(preamble + "\r\n") - headers = self.protocol.server_handshake_headers("malformed key") - self.wfile.write(str(headers) + "\r\n") + preamble = 'HTTP/1.1 101 %s\r\n' % status_codes.RESPONSES.get(101) + self.wfile.write(preamble.encode()) + headers = self.protocol.server_handshake_headers(b"malformed key") + self.wfile.write(bytes(headers) + b"\r\n") self.wfile.flush() self.handshake_done = True @@ -180,7 +180,7 @@ class TestBadHandshake(tservers.ServerTestBase): def test(self): client = WebSocketsClient(("127.0.0.1", self.port)) client.connect() - client.send_message("hello") + client.send_message(b"hello") class TestFrameHeader: @@ -188,8 +188,7 @@ class TestFrameHeader: def test_roundtrip(self): def round(*args, **kwargs): f = websockets.FrameHeader(*args, **kwargs) - bytes = f.to_bytes() - f2 = websockets.FrameHeader.from_file(tutils.treader(bytes)) + f2 = websockets.FrameHeader.from_file(tutils.treader(bytes(f))) assert f == f2 round() round(fin=1) @@ -201,11 +200,11 @@ class TestFrameHeader: round(payload_length=1000) round(payload_length=10000) round(opcode=websockets.OPCODE.PING) - round(masking_key="test") + round(masking_key=b"test") def test_human_readable(self): f = websockets.FrameHeader( - masking_key="test", + masking_key=b"test", fin=True, payload_length=10 ) @@ -214,23 +213,23 @@ class TestFrameHeader: assert f.human_readable() def test_funky(self): - f = websockets.FrameHeader(masking_key="test", mask=False) + f = websockets.FrameHeader(masking_key=b"test", mask=False) bytes = f.to_bytes() f2 = websockets.FrameHeader.from_file(tutils.treader(bytes)) assert not f2.mask def test_violations(self): tutils.raises("opcode", websockets.FrameHeader, opcode=17) - tutils.raises("masking key", websockets.FrameHeader, masking_key="x") + tutils.raises("masking key", websockets.FrameHeader, masking_key=b"x") def test_automask(self): f = websockets.FrameHeader(mask=True) assert f.masking_key - f = websockets.FrameHeader(masking_key="foob") + f = websockets.FrameHeader(masking_key=b"foob") assert f.mask - f = websockets.FrameHeader(masking_key="foob", mask=0) + f = websockets.FrameHeader(masking_key=b"foob", mask=0) assert not f.mask assert f.masking_key @@ -240,31 +239,31 @@ class TestFrame: def test_roundtrip(self): def round(*args, **kwargs): f = websockets.Frame(*args, **kwargs) - bytes = f.to_bytes() - f2 = websockets.Frame.from_file(tutils.treader(bytes)) + raw = bytes(f) + f2 = websockets.Frame.from_file(tutils.treader(raw)) assert f == f2 - round("test") - round("test", fin=1) - round("test", rsv1=1) - round("test", opcode=websockets.OPCODE.PING) - round("test", masking_key="test") + round(b"test") + round(b"test", fin=1) + round(b"test", rsv1=1) + round(b"test", opcode=websockets.OPCODE.PING) + round(b"test", masking_key=b"test") def test_human_readable(self): f = websockets.Frame() - assert f.human_readable() + assert repr(f) def test_masker(): tests = [ - ["a"], - ["four"], - ["fourf"], - ["fourfive"], - ["a", "aasdfasdfa", "asdf"], - ["a" * 50, "aasdfasdfa", "asdf"], + [b"a"], + [b"four"], + [b"fourf"], + [b"fourfive"], + [b"a", b"aasdfasdfa", b"asdf"], + [b"a" * 50, b"aasdfasdfa", b"asdf"], ] for i in tests: - m = websockets.Masker("abcd") - data = "".join([m(t) for t in i]) - data2 = websockets.Masker("abcd")(data) - assert data2 == "".join(i) + m = websockets.Masker(b"abcd") + data = b"".join([m(t) for t in i]) + data2 = websockets.Masker(b"abcd")(data) + assert data2 == b"".join(i) |