diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/h2/test_frames.py | 32 | ||||
-rw-r--r-- | test/test_certutils.py | 12 | ||||
-rw-r--r-- | test/test_http.py | 4 | ||||
-rw-r--r-- | test/test_http_auth.py | 21 | ||||
-rw-r--r-- | test/test_http_cookies.py | 3 | ||||
-rw-r--r-- | test/test_http_uastrings.py | 1 | ||||
-rw-r--r-- | test/test_imports.py | 2 | ||||
-rw-r--r-- | test/test_odict.py | 7 | ||||
-rw-r--r-- | test/test_tcp.py | 190 | ||||
-rw-r--r-- | test/test_utils.py | 6 | ||||
-rw-r--r-- | test/test_websockets.py | 18 | ||||
-rw-r--r-- | test/test_wsgi.py | 9 | ||||
-rw-r--r-- | test/tutils.py | 8 |
13 files changed, 199 insertions, 114 deletions
diff --git a/test/h2/test_frames.py b/test/h2/test_frames.py index d04c7c8b..90162984 100644 --- a/test/h2/test_frames.py +++ b/test/h2/test_frames.py @@ -4,20 +4,22 @@ import tutils from nose.tools import assert_equal - # TODO test stream association if valid or not def test_invalid_flags(): tutils.raises(ValueError, DataFrame, ContinuationFrame.FLAG_END_HEADERS, 0x1234567, 'foobar') + def test_frame_equality(): a = DataFrame(6, Frame.FLAG_END_STREAM, 0x1234567, 'foobar') b = DataFrame(6, Frame.FLAG_END_STREAM, 0x1234567, 'foobar') assert_equal(a, b) + def test_too_large_frames(): DataFrame(6, Frame.FLAG_END_STREAM, 0x1234567) + def test_data_frame_to_bytes(): f = DataFrame(6, Frame.FLAG_END_STREAM, 0x1234567, 'foobar') assert_equal(f.to_bytes().encode('hex'), '000006000101234567666f6f626172') @@ -28,6 +30,7 @@ def test_data_frame_to_bytes(): f = DataFrame(6, Frame.FLAG_NO_FLAGS, 0x0, 'foobar') tutils.raises(ValueError, f.to_bytes) + def test_data_frame_from_bytes(): f = Frame.from_bytes('000006000101234567666f6f626172'.decode('hex')) assert isinstance(f, DataFrame) @@ -45,6 +48,7 @@ def test_data_frame_from_bytes(): assert_equal(f.stream_id, 0x1234567) assert_equal(f.payload, 'foobar') + def test_headers_frame_to_bytes(): f = HeadersFrame(6, Frame.FLAG_NO_FLAGS, 0x1234567, 'foobar') assert_equal(f.to_bytes().encode('hex'), '000006010001234567666f6f626172') @@ -55,15 +59,18 @@ def test_headers_frame_to_bytes(): f = HeadersFrame(10, HeadersFrame.FLAG_PRIORITY, 0x1234567, 'foobar', exclusive=True, stream_dependency=0x7654321, weight=42) assert_equal(f.to_bytes().encode('hex'), '00000b012001234567876543212a666f6f626172') - f = HeadersFrame(14, HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY, 0x1234567, 'foobar', pad_length=3, exclusive=True, stream_dependency=0x7654321, weight=42) + f = HeadersFrame(14, HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY, 0x1234567, + 'foobar', pad_length=3, exclusive=True, stream_dependency=0x7654321, weight=42) assert_equal(f.to_bytes().encode('hex'), '00000f01280123456703876543212a666f6f626172000000') - f = HeadersFrame(14, HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY, 0x1234567, 'foobar', pad_length=3, exclusive=False, stream_dependency=0x7654321, weight=42) + f = HeadersFrame(14, HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY, 0x1234567, 'foobar', + pad_length=3, exclusive=False, stream_dependency=0x7654321, weight=42) assert_equal(f.to_bytes().encode('hex'), '00000f01280123456703076543212a666f6f626172000000') f = HeadersFrame(6, Frame.FLAG_NO_FLAGS, 0x0, 'foobar') tutils.raises(ValueError, f.to_bytes) + def test_headers_frame_from_bytes(): f = Frame.from_bytes('000006010001234567666f6f626172'.decode('hex')) assert isinstance(f, HeadersFrame) @@ -114,6 +121,7 @@ def test_headers_frame_from_bytes(): assert_equal(f.stream_dependency, 0x7654321) assert_equal(f.weight, 42) + def test_priority_frame_to_bytes(): f = PriorityFrame(5, Frame.FLAG_NO_FLAGS, 0x1234567, exclusive=True, stream_dependency=0x7654321, weight=42) assert_equal(f.to_bytes().encode('hex'), '000005020001234567876543212a') @@ -127,6 +135,7 @@ def test_priority_frame_to_bytes(): f = PriorityFrame(5, Frame.FLAG_NO_FLAGS, 0x1234567, stream_dependency=0x0) tutils.raises(ValueError, f.to_bytes) + def test_priority_frame_from_bytes(): f = Frame.from_bytes('000005020001234567876543212a'.decode('hex')) assert isinstance(f, PriorityFrame) @@ -148,6 +157,7 @@ def test_priority_frame_from_bytes(): assert_equal(f.stream_dependency, 0x7654321) assert_equal(f.weight, 21) + def test_rst_stream_frame_to_bytes(): f = RstStreamFrame(4, Frame.FLAG_NO_FLAGS, 0x1234567, error_code=0x7654321) assert_equal(f.to_bytes().encode('hex'), '00000403000123456707654321') @@ -155,6 +165,7 @@ def test_rst_stream_frame_to_bytes(): f = RstStreamFrame(4, Frame.FLAG_NO_FLAGS, 0x0) tutils.raises(ValueError, f.to_bytes) + def test_rst_stream_frame_from_bytes(): f = Frame.from_bytes('00000403000123456707654321'.decode('hex')) assert isinstance(f, RstStreamFrame) @@ -164,6 +175,7 @@ def test_rst_stream_frame_from_bytes(): assert_equal(f.stream_id, 0x1234567) assert_equal(f.error_code, 0x07654321) + def test_settings_frame_to_bytes(): f = SettingsFrame(0, Frame.FLAG_NO_FLAGS, 0x0) assert_equal(f.to_bytes().encode('hex'), '000000040000000000') @@ -174,12 +186,14 @@ def test_settings_frame_to_bytes(): f = SettingsFrame(6, SettingsFrame.FLAG_ACK, 0x0, settings={SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 1}) assert_equal(f.to_bytes().encode('hex'), '000006040100000000000200000001') - f = SettingsFrame(12, Frame.FLAG_NO_FLAGS, 0x0, settings={SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 1, SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS: 0x12345678}) + f = SettingsFrame(12, Frame.FLAG_NO_FLAGS, 0x0, settings={ + SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 1, SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS: 0x12345678}) assert_equal(f.to_bytes().encode('hex'), '00000c040000000000000200000001000312345678') f = SettingsFrame(0, Frame.FLAG_NO_FLAGS, 0x1234567) tutils.raises(ValueError, f.to_bytes) + def test_settings_frame_from_bytes(): f = Frame.from_bytes('000000040000000000'.decode('hex')) assert isinstance(f, SettingsFrame) @@ -214,6 +228,7 @@ def test_settings_frame_from_bytes(): assert_equal(f.settings[SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH], 1) assert_equal(f.settings[SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS], 0x12345678) + def test_push_promise_frame_to_bytes(): f = PushPromiseFrame(10, Frame.FLAG_NO_FLAGS, 0x1234567, 0x7654321, 'foobar') assert_equal(f.to_bytes().encode('hex'), '00000a05000123456707654321666f6f626172') @@ -227,6 +242,7 @@ def test_push_promise_frame_to_bytes(): f = PushPromiseFrame(4, Frame.FLAG_NO_FLAGS, 0x1234567, 0x0) tutils.raises(ValueError, f.to_bytes) + def test_push_promise_frame_from_bytes(): f = Frame.from_bytes('00000a05000123456707654321666f6f626172'.decode('hex')) assert isinstance(f, PushPromiseFrame) @@ -244,6 +260,7 @@ def test_push_promise_frame_from_bytes(): assert_equal(f.stream_id, 0x1234567) assert_equal(f.header_block_fragment, 'foobar') + def test_ping_frame_to_bytes(): f = PingFrame(8, PingFrame.FLAG_ACK, 0x0, payload=b'foobar') assert_equal(f.to_bytes().encode('hex'), '000008060100000000666f6f6261720000') @@ -254,6 +271,7 @@ def test_ping_frame_to_bytes(): f = PingFrame(8, Frame.FLAG_NO_FLAGS, 0x1234567) tutils.raises(ValueError, f.to_bytes) + def test_ping_frame_from_bytes(): f = Frame.from_bytes('000008060100000000666f6f6261720000'.decode('hex')) assert isinstance(f, PingFrame) @@ -271,6 +289,7 @@ def test_ping_frame_from_bytes(): assert_equal(f.stream_id, 0x0) assert_equal(f.payload, b'foobarde') + def test_goaway_frame_to_bytes(): f = GoAwayFrame(8, Frame.FLAG_NO_FLAGS, 0x0, last_stream=0x1234567, error_code=0x87654321, data=b'') assert_equal(f.to_bytes().encode('hex'), '0000080700000000000123456787654321') @@ -281,6 +300,7 @@ def test_goaway_frame_to_bytes(): f = GoAwayFrame(8, Frame.FLAG_NO_FLAGS, 0x1234567, last_stream=0x1234567, error_code=0x87654321) tutils.raises(ValueError, f.to_bytes) + def test_goaway_frame_from_bytes(): f = Frame.from_bytes('0000080700000000000123456787654321'.decode('hex')) assert isinstance(f, GoAwayFrame) @@ -302,6 +322,7 @@ def test_goaway_frame_from_bytes(): assert_equal(f.error_code, 0x87654321) assert_equal(f.data, b'foobar') + def test_window_update_frame_to_bytes(): f = WindowUpdateFrame(4, Frame.FLAG_NO_FLAGS, 0x0, window_size_increment=0x1234567) assert_equal(f.to_bytes().encode('hex'), '00000408000000000001234567') @@ -315,6 +336,7 @@ def test_window_update_frame_to_bytes(): f = WindowUpdateFrame(4, Frame.FLAG_NO_FLAGS, 0x0, window_size_increment=0) tutils.raises(ValueError, f.to_bytes) + def test_window_update_frame_from_bytes(): f = Frame.from_bytes('00000408000000000001234567'.decode('hex')) assert isinstance(f, WindowUpdateFrame) @@ -324,6 +346,7 @@ def test_window_update_frame_from_bytes(): assert_equal(f.stream_id, 0x0) assert_equal(f.window_size_increment, 0x1234567) + def test_continuation_frame_to_bytes(): f = ContinuationFrame(6, ContinuationFrame.FLAG_END_HEADERS, 0x1234567, 'foobar') assert_equal(f.to_bytes().encode('hex'), '000006090401234567666f6f626172') @@ -331,6 +354,7 @@ def test_continuation_frame_to_bytes(): f = ContinuationFrame(6, ContinuationFrame.FLAG_END_HEADERS, 0x0, 'foobar') tutils.raises(ValueError, f.to_bytes) + def test_continuation_frame_from_bytes(): f = Frame.from_bytes('000006090401234567666f6f626172'.decode('hex')) assert isinstance(f, ContinuationFrame) diff --git a/test/test_certutils.py b/test/test_certutils.py index c96c5087..115cac4d 100644 --- a/test/test_certutils.py +++ b/test/test_certutils.py @@ -1,6 +1,5 @@ import os from netlib import certutils, certffi -import OpenSSL import tutils # class TestDNTree: @@ -34,6 +33,7 @@ import tutils class TestCertStore: + def test_create_explicit(self): with tutils.tmpdir() as d: ca = certutils.CertStore.from_store(d, "test") @@ -56,13 +56,13 @@ class TestCertStore: def test_add_cert(self): with tutils.tmpdir() as d: - ca = certutils.CertStore.from_store(d, "test") + certutils.CertStore.from_store(d, "test") def test_sans(self): with tutils.tmpdir() as d: ca = certutils.CertStore.from_store(d, "test") c1 = ca.get_cert("foo.com", ["*.bar.com"]) - c2 = ca.get_cert("foo.bar.com", []) + ca.get_cert("foo.bar.com", []) # assert c1 == c2 c3 = ca.get_cert("bar.com", []) assert not c1 == c3 @@ -70,7 +70,7 @@ class TestCertStore: def test_sans_change(self): with tutils.tmpdir() as d: ca = certutils.CertStore.from_store(d, "test") - _ = ca.get_cert("foo.com", ["*.bar.com"]) + ca.get_cert("foo.com", ["*.bar.com"]) cert, key, chain_file = ca.get_cert("foo.bar.com", ["*.baz.com"]) assert "*.baz.com" in cert.altnames @@ -102,6 +102,7 @@ class TestCertStore: class TestDummyCert: + def test_with_ca(self): with tutils.tmpdir() as d: ca = certutils.CertStore.from_store(d, "test") @@ -115,6 +116,7 @@ class TestDummyCert: class TestSSLCert: + def test_simple(self): with open(tutils.test_data.path("data/text_cert"), "rb") as f: d = f.read() @@ -152,5 +154,3 @@ class TestSSLCert: d = f.read() s = certutils.SSLCert.from_der(d) assert s.cn - - diff --git a/test/test_http.py b/test/test_http.py index 63b39f08..0a9e276f 100644 --- a/test/test_http.py +++ b/test/test_http.py @@ -230,6 +230,7 @@ def test_parse_init_http(): class TestReadHeaders: + def _read(self, data, verbatim=False): if not verbatim: data = textwrap.dedent(data) @@ -277,6 +278,7 @@ class TestReadHeaders: class NoContentLengthHTTPHandler(tcp.BaseHandler): + def handle(self): self.wfile.write("HTTP/1.1 200 OK\r\n\r\nbar\r\n\r\n") self.wfile.flush() @@ -297,7 +299,7 @@ def test_read_response(): data = textwrap.dedent(data) r = cStringIO.StringIO(data) return http.read_response( - r, method, limit, include_body = include_body + r, method, limit, include_body=include_body ) tutils.raises("server disconnect", tst, "", "GET", None) diff --git a/test/test_http_auth.py b/test/test_http_auth.py index 176aa3ff..045fb13e 100644 --- a/test/test_http_auth.py +++ b/test/test_http_auth.py @@ -1,9 +1,9 @@ -import binascii, cStringIO from netlib import odict, http_auth, http -import mock import tutils + class TestPassManNonAnon: + def test_simple(self): p = http_auth.PassManNonAnon() assert not p.test("", "") @@ -11,6 +11,7 @@ class TestPassManNonAnon: class TestPassManHtpasswd: + def test_file_errors(self): tutils.raises("malformed htpasswd file", http_auth.PassManHtpasswd, tutils.test_data.path("data/server.crt")) @@ -18,7 +19,7 @@ class TestPassManHtpasswd: pm = http_auth.PassManHtpasswd(tutils.test_data.path("data/htpasswd")) vals = ("basic", "test", "test") - p = http.assemble_http_basic_auth(*vals) + http.assemble_http_basic_auth(*vals) assert pm.test("test", "test") assert not pm.test("test", "foo") assert not pm.test("foo", "test") @@ -27,6 +28,7 @@ class TestPassManHtpasswd: class TestPassManSingleUser: + def test_simple(self): pm = http_auth.PassManSingleUser("test", "test") assert pm.test("test", "test") @@ -35,6 +37,7 @@ class TestPassManSingleUser: class TestNullProxyAuth: + def test_simple(self): na = http_auth.NullProxyAuth(http_auth.PassManNonAnon()) assert not na.auth_challenge_headers() @@ -43,6 +46,7 @@ class TestNullProxyAuth: class TestBasicProxyAuth: + def test_simple(self): ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test") h = odict.ODictCaseless() @@ -60,7 +64,6 @@ class TestBasicProxyAuth: ba.clean(hdrs) assert not ba.AUTH_HEADER in hdrs - hdrs[ba.AUTH_HEADER] = [""] assert not ba.authenticate(hdrs) @@ -77,25 +80,27 @@ class TestBasicProxyAuth: assert not ba.authenticate(hdrs) -class Bunch: pass +class Bunch: + pass class TestAuthAction: + def test_nonanonymous(self): m = Bunch() aa = http_auth.NonanonymousAuthAction(None, "authenticator") aa(None, m, None, None) - assert m.authenticator + assert m.authenticator def test_singleuser(self): m = Bunch() aa = http_auth.SingleuserAuthAction(None, "authenticator") aa(None, m, "foo:bar", None) - assert m.authenticator + assert m.authenticator tutils.raises("invalid", aa, None, m, "foo", None) def test_httppasswd(self): m = Bunch() aa = http_auth.HtpasswdAuthAction(None, "authenticator") aa(None, m, tutils.test_data.path("data/htpasswd"), None) - assert m.authenticator + assert m.authenticator diff --git a/test/test_http_cookies.py b/test/test_http_cookies.py index 7438af7c..070849cf 100644 --- a/test/test_http_cookies.py +++ b/test/test_http_cookies.py @@ -1,7 +1,6 @@ -import pprint import nose.tools -from netlib import http_cookies, odict +from netlib import http_cookies def test_read_token(): diff --git a/test/test_http_uastrings.py b/test/test_http_uastrings.py index c70b7048..3fa4f359 100644 --- a/test/test_http_uastrings.py +++ b/test/test_http_uastrings.py @@ -4,4 +4,3 @@ from netlib import http_uastrings def test_get_shortcut(): assert http_uastrings.get_by_shortcut("c")[0] == "chrome" assert not http_uastrings.get_by_shortcut("_") - diff --git a/test/test_imports.py b/test/test_imports.py index 7b8a643b..b88ef26d 100644 --- a/test/test_imports.py +++ b/test/test_imports.py @@ -1,3 +1 @@ # These are actually tests! -import netlib.http_status -import netlib.version diff --git a/test/test_odict.py b/test/test_odict.py index c01c4dbe..d66ae59b 100644 --- a/test/test_odict.py +++ b/test/test_odict.py @@ -3,6 +3,7 @@ import tutils class TestODict: + def setUp(self): self.od = odict.ODict() @@ -106,13 +107,13 @@ class TestODict: def test_get(self): self.od.add("one", "two") assert self.od.get("one") == ["two"] - assert self.od.get("two") == None + assert self.od.get("two") is None def test_get_first(self): self.od.add("one", "two") self.od.add("one", "three") assert self.od.get_first("one") == "two" - assert self.od.get_first("two") == None + assert self.od.get_first("two") is None def test_extend(self): a = odict.ODict([["a", "b"], ["c", "d"]]) @@ -121,7 +122,9 @@ class TestODict: assert len(a) == 4 assert a["a"] == ["b", "b"] + class TestODictCaseless: + def setUp(self): self.od = odict.ODictCaseless() diff --git a/test/test_tcp.py b/test/test_tcp.py index 4dbdd780..ef00e029 100644 --- a/test/test_tcp.py +++ b/test/test_tcp.py @@ -1,4 +1,8 @@ -import cStringIO, Queue, time, socket, random +import cStringIO +import Queue +import time +import socket +import random import os from netlib import tcp, certutils, test, certffi import threading @@ -6,8 +10,10 @@ import mock import tutils from OpenSSL import SSL + class EchoHandler(tcp.BaseHandler): sni = None + def handle_sni(self, connection): self.sni = connection.get_servername() @@ -19,19 +25,22 @@ class EchoHandler(tcp.BaseHandler): class ClientCipherListHandler(tcp.BaseHandler): sni = None + def handle(self): - self.wfile.write("%s"%self.connection.get_cipher_list()) + self.wfile.write("%s" % self.connection.get_cipher_list()) self.wfile.flush() class HangHandler(tcp.BaseHandler): + def handle(self): - while 1: + while True: time.sleep(1) class TestServer(test.ServerTestBase): handler = EchoHandler + def test_echo(self): testval = "echo!\n" c = tcp.TCPClient(("127.0.0.1", self.port)) @@ -51,7 +60,9 @@ class TestServer(test.ServerTestBase): class TestServerBind(test.ServerTestBase): + class handler(tcp.BaseHandler): + def handle(self): self.wfile.write(str(self.connection.getpeername())) self.wfile.flush() @@ -65,7 +76,7 @@ class TestServerBind(test.ServerTestBase): c.connect() assert c.rfile.readline() == str(("127.0.0.1", random_port)) return - except tcp.NetLibError: # port probably already in use + except tcp.NetLibError: # port probably already in use pass @@ -84,6 +95,7 @@ class TestServerIPv6(test.ServerTestBase): class TestEcho(test.ServerTestBase): handler = EchoHandler + def test_echo(self): testval = "echo!\n" c = tcp.TCPClient(("127.0.0.1", self.port)) @@ -94,16 +106,19 @@ class TestEcho(test.ServerTestBase): class HardDisconnectHandler(tcp.BaseHandler): + def handle(self): self.connection.close() class TestFinishFail(test.ServerTestBase): + """ This tests a difficult-to-trigger exception in the .finish() method of the handler. """ handler = EchoHandler + def test_disconnect_in_finish(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -115,13 +130,14 @@ class TestFinishFail(test.ServerTestBase): class TestServerSSL(test.ServerTestBase): handler = EchoHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - cipher_list = "AES256-SHA", - chain_file=tutils.test_data.path("data/server.crt") - ) + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + cipher_list="AES256-SHA", + chain_file=tutils.test_data.path("data/server.crt") + ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -144,11 +160,12 @@ class TestServerSSL(test.ServerTestBase): class TestSSLv3Only(test.ServerTestBase): handler = EchoHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = True + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=True ) + def test_failure(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -156,20 +173,23 @@ class TestSSLv3Only(test.ServerTestBase): class TestSSLClientCert(test.ServerTestBase): + class handler(tcp.BaseHandler): sni = None + def handle_sni(self, connection): self.sni = connection.get_servername() def handle(self): - self.wfile.write("%s\n"%self.clientcert.serial) + self.wfile.write("%s\n" % self.clientcert.serial) self.wfile.flush() ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = True, - v3_only = False + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=True, + v3_only=False ) + def test_clientcert(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -187,8 +207,10 @@ class TestSSLClientCert(test.ServerTestBase): class TestSNI(test.ServerTestBase): + class handler(tcp.BaseHandler): sni = None + def handle_sni(self, connection): self.sni = connection.get_servername() @@ -197,11 +219,12 @@ class TestSNI(test.ServerTestBase): self.wfile.flush() ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -213,12 +236,13 @@ class TestSNI(test.ServerTestBase): class TestServerCipherList(test.ServerTestBase): handler = ClientCipherListHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - cipher_list = 'RC4-SHA' + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + cipher_list='RC4-SHA' ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -227,18 +251,21 @@ class TestServerCipherList(test.ServerTestBase): class TestServerCurrentCipher(test.ServerTestBase): + class handler(tcp.BaseHandler): sni = None + def handle(self): - self.wfile.write("%s"%str(self.get_current_cipher())) + self.wfile.write("%s" % str(self.get_current_cipher())) self.wfile.flush() ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - cipher_list = 'RC4-SHA' + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + cipher_list='RC4-SHA' ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -249,12 +276,13 @@ class TestServerCurrentCipher(test.ServerTestBase): class TestServerCipherListError(test.ServerTestBase): handler = ClientCipherListHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - cipher_list = 'bogus' + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + cipher_list='bogus' ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -264,12 +292,13 @@ class TestServerCipherListError(test.ServerTestBase): class TestClientCipherListError(test.ServerTestBase): handler = ClientCipherListHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - cipher_list = 'RC4-SHA' + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + cipher_list='RC4-SHA' ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -277,15 +306,18 @@ class TestClientCipherListError(test.ServerTestBase): class TestSSLDisconnect(test.ServerTestBase): + class handler(tcp.BaseHandler): + def handle(self): self.finish() ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -300,11 +332,12 @@ class TestSSLDisconnect(test.ServerTestBase): class TestSSLHardDisconnect(test.ServerTestBase): handler = HardDisconnectHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -316,6 +349,7 @@ class TestSSLHardDisconnect(test.ServerTestBase): class TestDisconnect(test.ServerTestBase): + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -326,7 +360,9 @@ class TestDisconnect(test.ServerTestBase): class TestServerTimeOut(test.ServerTestBase): + class handler(tcp.BaseHandler): + def handle(self): self.timeout = False self.settimeout(0.01) @@ -344,6 +380,7 @@ class TestServerTimeOut(test.ServerTestBase): class TestTimeOut(test.ServerTestBase): handler = HangHandler + def test_timeout(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -355,11 +392,12 @@ class TestTimeOut(test.ServerTestBase): class TestSSLTimeOut(test.ServerTestBase): handler = HangHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False ) + def test_timeout_client(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -371,15 +409,16 @@ class TestSSLTimeOut(test.ServerTestBase): class TestDHParams(test.ServerTestBase): handler = HangHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - dhparams = certutils.CertStore.load_dhparam( + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + dhparams=certutils.CertStore.load_dhparam( tutils.test_data.path("data/dhparam.pem"), ), - cipher_list = "DHE-RSA-AES256-SHA" + cipher_list="DHE-RSA-AES256-SHA" ) + def test_dhparams(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -395,7 +434,9 @@ class TestDHParams(test.ServerTestBase): class TestPrivkeyGen(test.ServerTestBase): + class handler(tcp.BaseHandler): + def handle(self): with tutils.tmpdir() as d: ca1 = certutils.CertStore.from_store(d, "test2") @@ -411,7 +452,9 @@ class TestPrivkeyGen(test.ServerTestBase): class TestPrivkeyGenNoFlags(test.ServerTestBase): + class handler(tcp.BaseHandler): + def handle(self): with tutils.tmpdir() as d: ca1 = certutils.CertStore.from_store(d, "test2") @@ -426,14 +469,15 @@ class TestPrivkeyGenNoFlags(test.ServerTestBase): tutils.raises("sslv3 alert handshake failure", c.convert_to_ssl) - class TestTCPClient: + def test_conerr(self): c = tcp.TCPClient(("127.0.0.1", 0)) tutils.raises(tcp.NetLibError, c.connect) class TestFileLike: + def test_blocksize(self): s = cStringIO.StringIO("1234567890abcdefghijklmnopqrstuvwxyz") s = tcp.Reader(s) @@ -460,7 +504,7 @@ class TestFileLike: assert s.readline(3) == "foo" def test_limitless(self): - s = cStringIO.StringIO("f"*(50*1024)) + s = cStringIO.StringIO("f" * (50 * 1024)) s = tcp.Reader(s) ret = s.read(-1) assert len(ret) == 50 * 1024 @@ -551,7 +595,9 @@ class TestFileLike: s = tcp.Reader(o) tutils.raises(tcp.NetLibDisconnect, s.readline, 10) + class TestAddress: + def test_simple(self): a = tcp.Address("localhost", True) assert a.use_ipv6 @@ -566,12 +612,12 @@ class TestAddress: class TestSSLKeyLogger(test.ServerTestBase): handler = EchoHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - cipher_list = "AES256-SHA" - ) + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + cipher_list="AES256-SHA" + ) def test_log(self): testval = "echo!\n" @@ -597,4 +643,4 @@ class TestSSLKeyLogger(test.ServerTestBase): def test_create_logfun(self): assert isinstance(tcp.SSLKeyLogger.create_logfun("test"), tcp.SSLKeyLogger) - assert not tcp.SSLKeyLogger.create_logfun(False)
\ No newline at end of file + assert not tcp.SSLKeyLogger.create_logfun(False) diff --git a/test/test_utils.py b/test/test_utils.py index 942136fd..8e66bce4 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -12,7 +12,7 @@ def test_bidi(): def test_hexdump(): - assert utils.hexdump("one\0"*10) + assert utils.hexdump("one\0" * 10) def test_cleanBin(): @@ -25,5 +25,5 @@ def test_cleanBin(): def test_pretty_size(): assert utils.pretty_size(100) == "100B" assert utils.pretty_size(1024) == "1kB" - assert utils.pretty_size(1024 + (1024/2.0)) == "1.5kB" - assert utils.pretty_size(1024*1024) == "1MB" + assert utils.pretty_size(1024 + (1024 / 2.0)) == "1.5kB" + assert utils.pretty_size(1024 * 1024) == "1MB" diff --git a/test/test_websockets.py b/test/test_websockets.py index 7bd5d74e..38947295 100644 --- a/test/test_websockets.py +++ b/test/test_websockets.py @@ -7,6 +7,7 @@ import tutils class WebSocketsEchoHandler(tcp.BaseHandler): + def __init__(self, connection, address, server): super(WebSocketsEchoHandler, self).__init__( connection, address, server @@ -25,7 +26,7 @@ class WebSocketsEchoHandler(tcp.BaseHandler): self.on_message(frame.payload) def send_message(self, message): - frame = websockets.Frame.default(message, from_client = False) + frame = websockets.Frame.default(message, from_client=False) frame.to_file(self.wfile) def handshake(self): @@ -44,6 +45,7 @@ class WebSocketsEchoHandler(tcp.BaseHandler): class WebSocketsClient(tcp.TCPClient): + def __init__(self, address, source_address=None): super(WebSocketsClient, self).__init__(address, source_address) self.client_nonce = None @@ -68,14 +70,14 @@ class WebSocketsClient(tcp.TCPClient): return websockets.Frame.from_file(self.rfile).payload def send_message(self, message): - frame = websockets.Frame.default(message, from_client = True) + frame = websockets.Frame.default(message, from_client=True) frame.to_file(self.wfile) class TestWebSockets(test.ServerTestBase): handler = WebSocketsEchoHandler - def random_bytes(self, n = 100): + def random_bytes(self, n=100): return os.urandom(n) def echo(self, msg): @@ -105,8 +107,8 @@ class TestWebSockets(test.ServerTestBase): default builder should always generate valid frames """ msg = self.random_bytes() - client_frame = websockets.Frame.default(msg, from_client = True) - server_frame = websockets.Frame.default(msg, from_client = False) + client_frame = websockets.Frame.default(msg, from_client=True) + server_frame = websockets.Frame.default(msg, from_client=False) def test_serialization_bijection(self): """ @@ -140,6 +142,7 @@ class TestWebSockets(test.ServerTestBase): class BadHandshakeHandler(WebSocketsEchoHandler): + def handshake(self): client_hs = http.read_request(self.rfile) websockets.check_client_handshake(client_hs.headers) @@ -152,6 +155,7 @@ class BadHandshakeHandler(WebSocketsEchoHandler): class TestBadHandshake(test.ServerTestBase): + """ Ensure that the client disconnects if the server handshake is malformed """ @@ -165,6 +169,7 @@ class TestBadHandshake(test.ServerTestBase): class TestFrameHeader: + def test_roundtrip(self): def round(*args, **kwargs): f = websockets.FrameHeader(*args, **kwargs) @@ -216,6 +221,7 @@ class TestFrameHeader: class TestFrame: + def test_roundtrip(self): def round(*args, **kwargs): f = websockets.Frame(*args, **kwargs) @@ -240,7 +246,7 @@ def test_masker(): ["fourf"], ["fourfive"], ["a", "aasdfasdfa", "asdf"], - ["a"*50, "aasdfasdfa", "asdf"], + ["a" * 50, "aasdfasdfa", "asdf"], ] for i in tests: m = websockets.Masker("abcd") diff --git a/test/test_wsgi.py b/test/test_wsgi.py index 1c8c5263..41572d49 100644 --- a/test/test_wsgi.py +++ b/test/test_wsgi.py @@ -1,4 +1,5 @@ -import cStringIO, sys +import cStringIO +import sys from netlib import wsgi, odict @@ -10,6 +11,7 @@ def tflow(): class TestApp: + def __init__(self): self.called = False @@ -22,6 +24,7 @@ class TestApp: class TestWSGI: + def test_make_environ(self): w = wsgi.WSGIAdaptor(None, "foo", 80, "version") tf = tflow() @@ -53,7 +56,7 @@ class TestWSGI: f.request.host = "foo" f.request.port = 80 wfile = cStringIO.StringIO() - err = w.serve(f, wfile) + w.serve(f, wfile) return wfile.getvalue() def test_serve_empty_body(self): @@ -69,7 +72,7 @@ class TestWSGI: try: raise ValueError("foo") except: - ei = sys.exc_info() + sys.exc_info() status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) diff --git a/test/tutils.py b/test/tutils.py index 141979f8..95c8b80a 100644 --- a/test/tutils.py +++ b/test/tutils.py @@ -44,14 +44,14 @@ def raises(exc, obj, *args, **kwargs): :kwargs Arguments to be passed to the callable. """ try: - ret = apply(obj, args, kwargs) - except Exception, v: + ret = obj(*args, **kwargs) + except Exception as v: if isinstance(exc, basestring): if exc.lower() in str(v).lower(): return else: raise AssertionError( - "Expected %s, but caught %s"%( + "Expected %s, but caught %s" % ( repr(str(exc)), v ) ) @@ -60,7 +60,7 @@ def raises(exc, obj, *args, **kwargs): return else: raise AssertionError( - "Expected %s, but caught %s %s"%( + "Expected %s, but caught %s %s" % ( exc.__name__, v.__class__.__name__, str(v) ) ) |