aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/h2/test_frames.py32
-rw-r--r--test/test_certutils.py12
-rw-r--r--test/test_http.py4
-rw-r--r--test/test_http_auth.py21
-rw-r--r--test/test_http_cookies.py3
-rw-r--r--test/test_http_uastrings.py1
-rw-r--r--test/test_imports.py2
-rw-r--r--test/test_odict.py7
-rw-r--r--test/test_tcp.py190
-rw-r--r--test/test_utils.py6
-rw-r--r--test/test_websockets.py18
-rw-r--r--test/test_wsgi.py9
-rw-r--r--test/tutils.py8
13 files changed, 199 insertions, 114 deletions
diff --git a/test/h2/test_frames.py b/test/h2/test_frames.py
index d04c7c8b..90162984 100644
--- a/test/h2/test_frames.py
+++ b/test/h2/test_frames.py
@@ -4,20 +4,22 @@ import tutils
from nose.tools import assert_equal
-
# TODO test stream association if valid or not
def test_invalid_flags():
tutils.raises(ValueError, DataFrame, ContinuationFrame.FLAG_END_HEADERS, 0x1234567, 'foobar')
+
def test_frame_equality():
a = DataFrame(6, Frame.FLAG_END_STREAM, 0x1234567, 'foobar')
b = DataFrame(6, Frame.FLAG_END_STREAM, 0x1234567, 'foobar')
assert_equal(a, b)
+
def test_too_large_frames():
DataFrame(6, Frame.FLAG_END_STREAM, 0x1234567)
+
def test_data_frame_to_bytes():
f = DataFrame(6, Frame.FLAG_END_STREAM, 0x1234567, 'foobar')
assert_equal(f.to_bytes().encode('hex'), '000006000101234567666f6f626172')
@@ -28,6 +30,7 @@ def test_data_frame_to_bytes():
f = DataFrame(6, Frame.FLAG_NO_FLAGS, 0x0, 'foobar')
tutils.raises(ValueError, f.to_bytes)
+
def test_data_frame_from_bytes():
f = Frame.from_bytes('000006000101234567666f6f626172'.decode('hex'))
assert isinstance(f, DataFrame)
@@ -45,6 +48,7 @@ def test_data_frame_from_bytes():
assert_equal(f.stream_id, 0x1234567)
assert_equal(f.payload, 'foobar')
+
def test_headers_frame_to_bytes():
f = HeadersFrame(6, Frame.FLAG_NO_FLAGS, 0x1234567, 'foobar')
assert_equal(f.to_bytes().encode('hex'), '000006010001234567666f6f626172')
@@ -55,15 +59,18 @@ def test_headers_frame_to_bytes():
f = HeadersFrame(10, HeadersFrame.FLAG_PRIORITY, 0x1234567, 'foobar', exclusive=True, stream_dependency=0x7654321, weight=42)
assert_equal(f.to_bytes().encode('hex'), '00000b012001234567876543212a666f6f626172')
- f = HeadersFrame(14, HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY, 0x1234567, 'foobar', pad_length=3, exclusive=True, stream_dependency=0x7654321, weight=42)
+ f = HeadersFrame(14, HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY, 0x1234567,
+ 'foobar', pad_length=3, exclusive=True, stream_dependency=0x7654321, weight=42)
assert_equal(f.to_bytes().encode('hex'), '00000f01280123456703876543212a666f6f626172000000')
- f = HeadersFrame(14, HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY, 0x1234567, 'foobar', pad_length=3, exclusive=False, stream_dependency=0x7654321, weight=42)
+ f = HeadersFrame(14, HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY, 0x1234567, 'foobar',
+ pad_length=3, exclusive=False, stream_dependency=0x7654321, weight=42)
assert_equal(f.to_bytes().encode('hex'), '00000f01280123456703076543212a666f6f626172000000')
f = HeadersFrame(6, Frame.FLAG_NO_FLAGS, 0x0, 'foobar')
tutils.raises(ValueError, f.to_bytes)
+
def test_headers_frame_from_bytes():
f = Frame.from_bytes('000006010001234567666f6f626172'.decode('hex'))
assert isinstance(f, HeadersFrame)
@@ -114,6 +121,7 @@ def test_headers_frame_from_bytes():
assert_equal(f.stream_dependency, 0x7654321)
assert_equal(f.weight, 42)
+
def test_priority_frame_to_bytes():
f = PriorityFrame(5, Frame.FLAG_NO_FLAGS, 0x1234567, exclusive=True, stream_dependency=0x7654321, weight=42)
assert_equal(f.to_bytes().encode('hex'), '000005020001234567876543212a')
@@ -127,6 +135,7 @@ def test_priority_frame_to_bytes():
f = PriorityFrame(5, Frame.FLAG_NO_FLAGS, 0x1234567, stream_dependency=0x0)
tutils.raises(ValueError, f.to_bytes)
+
def test_priority_frame_from_bytes():
f = Frame.from_bytes('000005020001234567876543212a'.decode('hex'))
assert isinstance(f, PriorityFrame)
@@ -148,6 +157,7 @@ def test_priority_frame_from_bytes():
assert_equal(f.stream_dependency, 0x7654321)
assert_equal(f.weight, 21)
+
def test_rst_stream_frame_to_bytes():
f = RstStreamFrame(4, Frame.FLAG_NO_FLAGS, 0x1234567, error_code=0x7654321)
assert_equal(f.to_bytes().encode('hex'), '00000403000123456707654321')
@@ -155,6 +165,7 @@ def test_rst_stream_frame_to_bytes():
f = RstStreamFrame(4, Frame.FLAG_NO_FLAGS, 0x0)
tutils.raises(ValueError, f.to_bytes)
+
def test_rst_stream_frame_from_bytes():
f = Frame.from_bytes('00000403000123456707654321'.decode('hex'))
assert isinstance(f, RstStreamFrame)
@@ -164,6 +175,7 @@ def test_rst_stream_frame_from_bytes():
assert_equal(f.stream_id, 0x1234567)
assert_equal(f.error_code, 0x07654321)
+
def test_settings_frame_to_bytes():
f = SettingsFrame(0, Frame.FLAG_NO_FLAGS, 0x0)
assert_equal(f.to_bytes().encode('hex'), '000000040000000000')
@@ -174,12 +186,14 @@ def test_settings_frame_to_bytes():
f = SettingsFrame(6, SettingsFrame.FLAG_ACK, 0x0, settings={SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 1})
assert_equal(f.to_bytes().encode('hex'), '000006040100000000000200000001')
- f = SettingsFrame(12, Frame.FLAG_NO_FLAGS, 0x0, settings={SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 1, SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS: 0x12345678})
+ f = SettingsFrame(12, Frame.FLAG_NO_FLAGS, 0x0, settings={
+ SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 1, SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS: 0x12345678})
assert_equal(f.to_bytes().encode('hex'), '00000c040000000000000200000001000312345678')
f = SettingsFrame(0, Frame.FLAG_NO_FLAGS, 0x1234567)
tutils.raises(ValueError, f.to_bytes)
+
def test_settings_frame_from_bytes():
f = Frame.from_bytes('000000040000000000'.decode('hex'))
assert isinstance(f, SettingsFrame)
@@ -214,6 +228,7 @@ def test_settings_frame_from_bytes():
assert_equal(f.settings[SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH], 1)
assert_equal(f.settings[SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS], 0x12345678)
+
def test_push_promise_frame_to_bytes():
f = PushPromiseFrame(10, Frame.FLAG_NO_FLAGS, 0x1234567, 0x7654321, 'foobar')
assert_equal(f.to_bytes().encode('hex'), '00000a05000123456707654321666f6f626172')
@@ -227,6 +242,7 @@ def test_push_promise_frame_to_bytes():
f = PushPromiseFrame(4, Frame.FLAG_NO_FLAGS, 0x1234567, 0x0)
tutils.raises(ValueError, f.to_bytes)
+
def test_push_promise_frame_from_bytes():
f = Frame.from_bytes('00000a05000123456707654321666f6f626172'.decode('hex'))
assert isinstance(f, PushPromiseFrame)
@@ -244,6 +260,7 @@ def test_push_promise_frame_from_bytes():
assert_equal(f.stream_id, 0x1234567)
assert_equal(f.header_block_fragment, 'foobar')
+
def test_ping_frame_to_bytes():
f = PingFrame(8, PingFrame.FLAG_ACK, 0x0, payload=b'foobar')
assert_equal(f.to_bytes().encode('hex'), '000008060100000000666f6f6261720000')
@@ -254,6 +271,7 @@ def test_ping_frame_to_bytes():
f = PingFrame(8, Frame.FLAG_NO_FLAGS, 0x1234567)
tutils.raises(ValueError, f.to_bytes)
+
def test_ping_frame_from_bytes():
f = Frame.from_bytes('000008060100000000666f6f6261720000'.decode('hex'))
assert isinstance(f, PingFrame)
@@ -271,6 +289,7 @@ def test_ping_frame_from_bytes():
assert_equal(f.stream_id, 0x0)
assert_equal(f.payload, b'foobarde')
+
def test_goaway_frame_to_bytes():
f = GoAwayFrame(8, Frame.FLAG_NO_FLAGS, 0x0, last_stream=0x1234567, error_code=0x87654321, data=b'')
assert_equal(f.to_bytes().encode('hex'), '0000080700000000000123456787654321')
@@ -281,6 +300,7 @@ def test_goaway_frame_to_bytes():
f = GoAwayFrame(8, Frame.FLAG_NO_FLAGS, 0x1234567, last_stream=0x1234567, error_code=0x87654321)
tutils.raises(ValueError, f.to_bytes)
+
def test_goaway_frame_from_bytes():
f = Frame.from_bytes('0000080700000000000123456787654321'.decode('hex'))
assert isinstance(f, GoAwayFrame)
@@ -302,6 +322,7 @@ def test_goaway_frame_from_bytes():
assert_equal(f.error_code, 0x87654321)
assert_equal(f.data, b'foobar')
+
def test_window_update_frame_to_bytes():
f = WindowUpdateFrame(4, Frame.FLAG_NO_FLAGS, 0x0, window_size_increment=0x1234567)
assert_equal(f.to_bytes().encode('hex'), '00000408000000000001234567')
@@ -315,6 +336,7 @@ def test_window_update_frame_to_bytes():
f = WindowUpdateFrame(4, Frame.FLAG_NO_FLAGS, 0x0, window_size_increment=0)
tutils.raises(ValueError, f.to_bytes)
+
def test_window_update_frame_from_bytes():
f = Frame.from_bytes('00000408000000000001234567'.decode('hex'))
assert isinstance(f, WindowUpdateFrame)
@@ -324,6 +346,7 @@ def test_window_update_frame_from_bytes():
assert_equal(f.stream_id, 0x0)
assert_equal(f.window_size_increment, 0x1234567)
+
def test_continuation_frame_to_bytes():
f = ContinuationFrame(6, ContinuationFrame.FLAG_END_HEADERS, 0x1234567, 'foobar')
assert_equal(f.to_bytes().encode('hex'), '000006090401234567666f6f626172')
@@ -331,6 +354,7 @@ def test_continuation_frame_to_bytes():
f = ContinuationFrame(6, ContinuationFrame.FLAG_END_HEADERS, 0x0, 'foobar')
tutils.raises(ValueError, f.to_bytes)
+
def test_continuation_frame_from_bytes():
f = Frame.from_bytes('000006090401234567666f6f626172'.decode('hex'))
assert isinstance(f, ContinuationFrame)
diff --git a/test/test_certutils.py b/test/test_certutils.py
index c96c5087..115cac4d 100644
--- a/test/test_certutils.py
+++ b/test/test_certutils.py
@@ -1,6 +1,5 @@
import os
from netlib import certutils, certffi
-import OpenSSL
import tutils
# class TestDNTree:
@@ -34,6 +33,7 @@ import tutils
class TestCertStore:
+
def test_create_explicit(self):
with tutils.tmpdir() as d:
ca = certutils.CertStore.from_store(d, "test")
@@ -56,13 +56,13 @@ class TestCertStore:
def test_add_cert(self):
with tutils.tmpdir() as d:
- ca = certutils.CertStore.from_store(d, "test")
+ certutils.CertStore.from_store(d, "test")
def test_sans(self):
with tutils.tmpdir() as d:
ca = certutils.CertStore.from_store(d, "test")
c1 = ca.get_cert("foo.com", ["*.bar.com"])
- c2 = ca.get_cert("foo.bar.com", [])
+ ca.get_cert("foo.bar.com", [])
# assert c1 == c2
c3 = ca.get_cert("bar.com", [])
assert not c1 == c3
@@ -70,7 +70,7 @@ class TestCertStore:
def test_sans_change(self):
with tutils.tmpdir() as d:
ca = certutils.CertStore.from_store(d, "test")
- _ = ca.get_cert("foo.com", ["*.bar.com"])
+ ca.get_cert("foo.com", ["*.bar.com"])
cert, key, chain_file = ca.get_cert("foo.bar.com", ["*.baz.com"])
assert "*.baz.com" in cert.altnames
@@ -102,6 +102,7 @@ class TestCertStore:
class TestDummyCert:
+
def test_with_ca(self):
with tutils.tmpdir() as d:
ca = certutils.CertStore.from_store(d, "test")
@@ -115,6 +116,7 @@ class TestDummyCert:
class TestSSLCert:
+
def test_simple(self):
with open(tutils.test_data.path("data/text_cert"), "rb") as f:
d = f.read()
@@ -152,5 +154,3 @@ class TestSSLCert:
d = f.read()
s = certutils.SSLCert.from_der(d)
assert s.cn
-
-
diff --git a/test/test_http.py b/test/test_http.py
index 63b39f08..0a9e276f 100644
--- a/test/test_http.py
+++ b/test/test_http.py
@@ -230,6 +230,7 @@ def test_parse_init_http():
class TestReadHeaders:
+
def _read(self, data, verbatim=False):
if not verbatim:
data = textwrap.dedent(data)
@@ -277,6 +278,7 @@ class TestReadHeaders:
class NoContentLengthHTTPHandler(tcp.BaseHandler):
+
def handle(self):
self.wfile.write("HTTP/1.1 200 OK\r\n\r\nbar\r\n\r\n")
self.wfile.flush()
@@ -297,7 +299,7 @@ def test_read_response():
data = textwrap.dedent(data)
r = cStringIO.StringIO(data)
return http.read_response(
- r, method, limit, include_body = include_body
+ r, method, limit, include_body=include_body
)
tutils.raises("server disconnect", tst, "", "GET", None)
diff --git a/test/test_http_auth.py b/test/test_http_auth.py
index 176aa3ff..045fb13e 100644
--- a/test/test_http_auth.py
+++ b/test/test_http_auth.py
@@ -1,9 +1,9 @@
-import binascii, cStringIO
from netlib import odict, http_auth, http
-import mock
import tutils
+
class TestPassManNonAnon:
+
def test_simple(self):
p = http_auth.PassManNonAnon()
assert not p.test("", "")
@@ -11,6 +11,7 @@ class TestPassManNonAnon:
class TestPassManHtpasswd:
+
def test_file_errors(self):
tutils.raises("malformed htpasswd file", http_auth.PassManHtpasswd, tutils.test_data.path("data/server.crt"))
@@ -18,7 +19,7 @@ class TestPassManHtpasswd:
pm = http_auth.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
vals = ("basic", "test", "test")
- p = http.assemble_http_basic_auth(*vals)
+ http.assemble_http_basic_auth(*vals)
assert pm.test("test", "test")
assert not pm.test("test", "foo")
assert not pm.test("foo", "test")
@@ -27,6 +28,7 @@ class TestPassManHtpasswd:
class TestPassManSingleUser:
+
def test_simple(self):
pm = http_auth.PassManSingleUser("test", "test")
assert pm.test("test", "test")
@@ -35,6 +37,7 @@ class TestPassManSingleUser:
class TestNullProxyAuth:
+
def test_simple(self):
na = http_auth.NullProxyAuth(http_auth.PassManNonAnon())
assert not na.auth_challenge_headers()
@@ -43,6 +46,7 @@ class TestNullProxyAuth:
class TestBasicProxyAuth:
+
def test_simple(self):
ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test")
h = odict.ODictCaseless()
@@ -60,7 +64,6 @@ class TestBasicProxyAuth:
ba.clean(hdrs)
assert not ba.AUTH_HEADER in hdrs
-
hdrs[ba.AUTH_HEADER] = [""]
assert not ba.authenticate(hdrs)
@@ -77,25 +80,27 @@ class TestBasicProxyAuth:
assert not ba.authenticate(hdrs)
-class Bunch: pass
+class Bunch:
+ pass
class TestAuthAction:
+
def test_nonanonymous(self):
m = Bunch()
aa = http_auth.NonanonymousAuthAction(None, "authenticator")
aa(None, m, None, None)
- assert m.authenticator
+ assert m.authenticator
def test_singleuser(self):
m = Bunch()
aa = http_auth.SingleuserAuthAction(None, "authenticator")
aa(None, m, "foo:bar", None)
- assert m.authenticator
+ assert m.authenticator
tutils.raises("invalid", aa, None, m, "foo", None)
def test_httppasswd(self):
m = Bunch()
aa = http_auth.HtpasswdAuthAction(None, "authenticator")
aa(None, m, tutils.test_data.path("data/htpasswd"), None)
- assert m.authenticator
+ assert m.authenticator
diff --git a/test/test_http_cookies.py b/test/test_http_cookies.py
index 7438af7c..070849cf 100644
--- a/test/test_http_cookies.py
+++ b/test/test_http_cookies.py
@@ -1,7 +1,6 @@
-import pprint
import nose.tools
-from netlib import http_cookies, odict
+from netlib import http_cookies
def test_read_token():
diff --git a/test/test_http_uastrings.py b/test/test_http_uastrings.py
index c70b7048..3fa4f359 100644
--- a/test/test_http_uastrings.py
+++ b/test/test_http_uastrings.py
@@ -4,4 +4,3 @@ from netlib import http_uastrings
def test_get_shortcut():
assert http_uastrings.get_by_shortcut("c")[0] == "chrome"
assert not http_uastrings.get_by_shortcut("_")
-
diff --git a/test/test_imports.py b/test/test_imports.py
index 7b8a643b..b88ef26d 100644
--- a/test/test_imports.py
+++ b/test/test_imports.py
@@ -1,3 +1 @@
# These are actually tests!
-import netlib.http_status
-import netlib.version
diff --git a/test/test_odict.py b/test/test_odict.py
index c01c4dbe..d66ae59b 100644
--- a/test/test_odict.py
+++ b/test/test_odict.py
@@ -3,6 +3,7 @@ import tutils
class TestODict:
+
def setUp(self):
self.od = odict.ODict()
@@ -106,13 +107,13 @@ class TestODict:
def test_get(self):
self.od.add("one", "two")
assert self.od.get("one") == ["two"]
- assert self.od.get("two") == None
+ assert self.od.get("two") is None
def test_get_first(self):
self.od.add("one", "two")
self.od.add("one", "three")
assert self.od.get_first("one") == "two"
- assert self.od.get_first("two") == None
+ assert self.od.get_first("two") is None
def test_extend(self):
a = odict.ODict([["a", "b"], ["c", "d"]])
@@ -121,7 +122,9 @@ class TestODict:
assert len(a) == 4
assert a["a"] == ["b", "b"]
+
class TestODictCaseless:
+
def setUp(self):
self.od = odict.ODictCaseless()
diff --git a/test/test_tcp.py b/test/test_tcp.py
index 4dbdd780..ef00e029 100644
--- a/test/test_tcp.py
+++ b/test/test_tcp.py
@@ -1,4 +1,8 @@
-import cStringIO, Queue, time, socket, random
+import cStringIO
+import Queue
+import time
+import socket
+import random
import os
from netlib import tcp, certutils, test, certffi
import threading
@@ -6,8 +10,10 @@ import mock
import tutils
from OpenSSL import SSL
+
class EchoHandler(tcp.BaseHandler):
sni = None
+
def handle_sni(self, connection):
self.sni = connection.get_servername()
@@ -19,19 +25,22 @@ class EchoHandler(tcp.BaseHandler):
class ClientCipherListHandler(tcp.BaseHandler):
sni = None
+
def handle(self):
- self.wfile.write("%s"%self.connection.get_cipher_list())
+ self.wfile.write("%s" % self.connection.get_cipher_list())
self.wfile.flush()
class HangHandler(tcp.BaseHandler):
+
def handle(self):
- while 1:
+ while True:
time.sleep(1)
class TestServer(test.ServerTestBase):
handler = EchoHandler
+
def test_echo(self):
testval = "echo!\n"
c = tcp.TCPClient(("127.0.0.1", self.port))
@@ -51,7 +60,9 @@ class TestServer(test.ServerTestBase):
class TestServerBind(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
+
def handle(self):
self.wfile.write(str(self.connection.getpeername()))
self.wfile.flush()
@@ -65,7 +76,7 @@ class TestServerBind(test.ServerTestBase):
c.connect()
assert c.rfile.readline() == str(("127.0.0.1", random_port))
return
- except tcp.NetLibError: # port probably already in use
+ except tcp.NetLibError: # port probably already in use
pass
@@ -84,6 +95,7 @@ class TestServerIPv6(test.ServerTestBase):
class TestEcho(test.ServerTestBase):
handler = EchoHandler
+
def test_echo(self):
testval = "echo!\n"
c = tcp.TCPClient(("127.0.0.1", self.port))
@@ -94,16 +106,19 @@ class TestEcho(test.ServerTestBase):
class HardDisconnectHandler(tcp.BaseHandler):
+
def handle(self):
self.connection.close()
class TestFinishFail(test.ServerTestBase):
+
"""
This tests a difficult-to-trigger exception in the .finish() method of
the handler.
"""
handler = EchoHandler
+
def test_disconnect_in_finish(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -115,13 +130,14 @@ class TestFinishFail(test.ServerTestBase):
class TestServerSSL(test.ServerTestBase):
handler = EchoHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- cipher_list = "AES256-SHA",
- chain_file=tutils.test_data.path("data/server.crt")
- )
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ cipher_list="AES256-SHA",
+ chain_file=tutils.test_data.path("data/server.crt")
+ )
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -144,11 +160,12 @@ class TestServerSSL(test.ServerTestBase):
class TestSSLv3Only(test.ServerTestBase):
handler = EchoHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = True
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=True
)
+
def test_failure(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -156,20 +173,23 @@ class TestSSLv3Only(test.ServerTestBase):
class TestSSLClientCert(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
sni = None
+
def handle_sni(self, connection):
self.sni = connection.get_servername()
def handle(self):
- self.wfile.write("%s\n"%self.clientcert.serial)
+ self.wfile.write("%s\n" % self.clientcert.serial)
self.wfile.flush()
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = True,
- v3_only = False
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=True,
+ v3_only=False
)
+
def test_clientcert(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -187,8 +207,10 @@ class TestSSLClientCert(test.ServerTestBase):
class TestSNI(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
sni = None
+
def handle_sni(self, connection):
self.sni = connection.get_servername()
@@ -197,11 +219,12 @@ class TestSNI(test.ServerTestBase):
self.wfile.flush()
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -213,12 +236,13 @@ class TestSNI(test.ServerTestBase):
class TestServerCipherList(test.ServerTestBase):
handler = ClientCipherListHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- cipher_list = 'RC4-SHA'
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ cipher_list='RC4-SHA'
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -227,18 +251,21 @@ class TestServerCipherList(test.ServerTestBase):
class TestServerCurrentCipher(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
sni = None
+
def handle(self):
- self.wfile.write("%s"%str(self.get_current_cipher()))
+ self.wfile.write("%s" % str(self.get_current_cipher()))
self.wfile.flush()
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- cipher_list = 'RC4-SHA'
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ cipher_list='RC4-SHA'
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -249,12 +276,13 @@ class TestServerCurrentCipher(test.ServerTestBase):
class TestServerCipherListError(test.ServerTestBase):
handler = ClientCipherListHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- cipher_list = 'bogus'
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ cipher_list='bogus'
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -264,12 +292,13 @@ class TestServerCipherListError(test.ServerTestBase):
class TestClientCipherListError(test.ServerTestBase):
handler = ClientCipherListHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- cipher_list = 'RC4-SHA'
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ cipher_list='RC4-SHA'
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -277,15 +306,18 @@ class TestClientCipherListError(test.ServerTestBase):
class TestSSLDisconnect(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
+
def handle(self):
self.finish()
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -300,11 +332,12 @@ class TestSSLDisconnect(test.ServerTestBase):
class TestSSLHardDisconnect(test.ServerTestBase):
handler = HardDisconnectHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -316,6 +349,7 @@ class TestSSLHardDisconnect(test.ServerTestBase):
class TestDisconnect(test.ServerTestBase):
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -326,7 +360,9 @@ class TestDisconnect(test.ServerTestBase):
class TestServerTimeOut(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
+
def handle(self):
self.timeout = False
self.settimeout(0.01)
@@ -344,6 +380,7 @@ class TestServerTimeOut(test.ServerTestBase):
class TestTimeOut(test.ServerTestBase):
handler = HangHandler
+
def test_timeout(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -355,11 +392,12 @@ class TestTimeOut(test.ServerTestBase):
class TestSSLTimeOut(test.ServerTestBase):
handler = HangHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False
)
+
def test_timeout_client(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -371,15 +409,16 @@ class TestSSLTimeOut(test.ServerTestBase):
class TestDHParams(test.ServerTestBase):
handler = HangHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- dhparams = certutils.CertStore.load_dhparam(
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ dhparams=certutils.CertStore.load_dhparam(
tutils.test_data.path("data/dhparam.pem"),
),
- cipher_list = "DHE-RSA-AES256-SHA"
+ cipher_list="DHE-RSA-AES256-SHA"
)
+
def test_dhparams(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -395,7 +434,9 @@ class TestDHParams(test.ServerTestBase):
class TestPrivkeyGen(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
+
def handle(self):
with tutils.tmpdir() as d:
ca1 = certutils.CertStore.from_store(d, "test2")
@@ -411,7 +452,9 @@ class TestPrivkeyGen(test.ServerTestBase):
class TestPrivkeyGenNoFlags(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
+
def handle(self):
with tutils.tmpdir() as d:
ca1 = certutils.CertStore.from_store(d, "test2")
@@ -426,14 +469,15 @@ class TestPrivkeyGenNoFlags(test.ServerTestBase):
tutils.raises("sslv3 alert handshake failure", c.convert_to_ssl)
-
class TestTCPClient:
+
def test_conerr(self):
c = tcp.TCPClient(("127.0.0.1", 0))
tutils.raises(tcp.NetLibError, c.connect)
class TestFileLike:
+
def test_blocksize(self):
s = cStringIO.StringIO("1234567890abcdefghijklmnopqrstuvwxyz")
s = tcp.Reader(s)
@@ -460,7 +504,7 @@ class TestFileLike:
assert s.readline(3) == "foo"
def test_limitless(self):
- s = cStringIO.StringIO("f"*(50*1024))
+ s = cStringIO.StringIO("f" * (50 * 1024))
s = tcp.Reader(s)
ret = s.read(-1)
assert len(ret) == 50 * 1024
@@ -551,7 +595,9 @@ class TestFileLike:
s = tcp.Reader(o)
tutils.raises(tcp.NetLibDisconnect, s.readline, 10)
+
class TestAddress:
+
def test_simple(self):
a = tcp.Address("localhost", True)
assert a.use_ipv6
@@ -566,12 +612,12 @@ class TestAddress:
class TestSSLKeyLogger(test.ServerTestBase):
handler = EchoHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- cipher_list = "AES256-SHA"
- )
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ cipher_list="AES256-SHA"
+ )
def test_log(self):
testval = "echo!\n"
@@ -597,4 +643,4 @@ class TestSSLKeyLogger(test.ServerTestBase):
def test_create_logfun(self):
assert isinstance(tcp.SSLKeyLogger.create_logfun("test"), tcp.SSLKeyLogger)
- assert not tcp.SSLKeyLogger.create_logfun(False) \ No newline at end of file
+ assert not tcp.SSLKeyLogger.create_logfun(False)
diff --git a/test/test_utils.py b/test/test_utils.py
index 942136fd..8e66bce4 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -12,7 +12,7 @@ def test_bidi():
def test_hexdump():
- assert utils.hexdump("one\0"*10)
+ assert utils.hexdump("one\0" * 10)
def test_cleanBin():
@@ -25,5 +25,5 @@ def test_cleanBin():
def test_pretty_size():
assert utils.pretty_size(100) == "100B"
assert utils.pretty_size(1024) == "1kB"
- assert utils.pretty_size(1024 + (1024/2.0)) == "1.5kB"
- assert utils.pretty_size(1024*1024) == "1MB"
+ assert utils.pretty_size(1024 + (1024 / 2.0)) == "1.5kB"
+ assert utils.pretty_size(1024 * 1024) == "1MB"
diff --git a/test/test_websockets.py b/test/test_websockets.py
index 7bd5d74e..38947295 100644
--- a/test/test_websockets.py
+++ b/test/test_websockets.py
@@ -7,6 +7,7 @@ import tutils
class WebSocketsEchoHandler(tcp.BaseHandler):
+
def __init__(self, connection, address, server):
super(WebSocketsEchoHandler, self).__init__(
connection, address, server
@@ -25,7 +26,7 @@ class WebSocketsEchoHandler(tcp.BaseHandler):
self.on_message(frame.payload)
def send_message(self, message):
- frame = websockets.Frame.default(message, from_client = False)
+ frame = websockets.Frame.default(message, from_client=False)
frame.to_file(self.wfile)
def handshake(self):
@@ -44,6 +45,7 @@ class WebSocketsEchoHandler(tcp.BaseHandler):
class WebSocketsClient(tcp.TCPClient):
+
def __init__(self, address, source_address=None):
super(WebSocketsClient, self).__init__(address, source_address)
self.client_nonce = None
@@ -68,14 +70,14 @@ class WebSocketsClient(tcp.TCPClient):
return websockets.Frame.from_file(self.rfile).payload
def send_message(self, message):
- frame = websockets.Frame.default(message, from_client = True)
+ frame = websockets.Frame.default(message, from_client=True)
frame.to_file(self.wfile)
class TestWebSockets(test.ServerTestBase):
handler = WebSocketsEchoHandler
- def random_bytes(self, n = 100):
+ def random_bytes(self, n=100):
return os.urandom(n)
def echo(self, msg):
@@ -105,8 +107,8 @@ class TestWebSockets(test.ServerTestBase):
default builder should always generate valid frames
"""
msg = self.random_bytes()
- client_frame = websockets.Frame.default(msg, from_client = True)
- server_frame = websockets.Frame.default(msg, from_client = False)
+ client_frame = websockets.Frame.default(msg, from_client=True)
+ server_frame = websockets.Frame.default(msg, from_client=False)
def test_serialization_bijection(self):
"""
@@ -140,6 +142,7 @@ class TestWebSockets(test.ServerTestBase):
class BadHandshakeHandler(WebSocketsEchoHandler):
+
def handshake(self):
client_hs = http.read_request(self.rfile)
websockets.check_client_handshake(client_hs.headers)
@@ -152,6 +155,7 @@ class BadHandshakeHandler(WebSocketsEchoHandler):
class TestBadHandshake(test.ServerTestBase):
+
"""
Ensure that the client disconnects if the server handshake is malformed
"""
@@ -165,6 +169,7 @@ class TestBadHandshake(test.ServerTestBase):
class TestFrameHeader:
+
def test_roundtrip(self):
def round(*args, **kwargs):
f = websockets.FrameHeader(*args, **kwargs)
@@ -216,6 +221,7 @@ class TestFrameHeader:
class TestFrame:
+
def test_roundtrip(self):
def round(*args, **kwargs):
f = websockets.Frame(*args, **kwargs)
@@ -240,7 +246,7 @@ def test_masker():
["fourf"],
["fourfive"],
["a", "aasdfasdfa", "asdf"],
- ["a"*50, "aasdfasdfa", "asdf"],
+ ["a" * 50, "aasdfasdfa", "asdf"],
]
for i in tests:
m = websockets.Masker("abcd")
diff --git a/test/test_wsgi.py b/test/test_wsgi.py
index 1c8c5263..41572d49 100644
--- a/test/test_wsgi.py
+++ b/test/test_wsgi.py
@@ -1,4 +1,5 @@
-import cStringIO, sys
+import cStringIO
+import sys
from netlib import wsgi, odict
@@ -10,6 +11,7 @@ def tflow():
class TestApp:
+
def __init__(self):
self.called = False
@@ -22,6 +24,7 @@ class TestApp:
class TestWSGI:
+
def test_make_environ(self):
w = wsgi.WSGIAdaptor(None, "foo", 80, "version")
tf = tflow()
@@ -53,7 +56,7 @@ class TestWSGI:
f.request.host = "foo"
f.request.port = 80
wfile = cStringIO.StringIO()
- err = w.serve(f, wfile)
+ w.serve(f, wfile)
return wfile.getvalue()
def test_serve_empty_body(self):
@@ -69,7 +72,7 @@ class TestWSGI:
try:
raise ValueError("foo")
except:
- ei = sys.exc_info()
+ sys.exc_info()
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
diff --git a/test/tutils.py b/test/tutils.py
index 141979f8..95c8b80a 100644
--- a/test/tutils.py
+++ b/test/tutils.py
@@ -44,14 +44,14 @@ def raises(exc, obj, *args, **kwargs):
:kwargs Arguments to be passed to the callable.
"""
try:
- ret = apply(obj, args, kwargs)
- except Exception, v:
+ ret = obj(*args, **kwargs)
+ except Exception as v:
if isinstance(exc, basestring):
if exc.lower() in str(v).lower():
return
else:
raise AssertionError(
- "Expected %s, but caught %s"%(
+ "Expected %s, but caught %s" % (
repr(str(exc)), v
)
)
@@ -60,7 +60,7 @@ def raises(exc, obj, *args, **kwargs):
return
else:
raise AssertionError(
- "Expected %s, but caught %s %s"%(
+ "Expected %s, but caught %s %s" % (
exc.__name__, v.__class__.__name__, str(v)
)
)