aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_tcp.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_tcp.py')
-rw-r--r--test/test_tcp.py190
1 files changed, 118 insertions, 72 deletions
diff --git a/test/test_tcp.py b/test/test_tcp.py
index 4dbdd780..ef00e029 100644
--- a/test/test_tcp.py
+++ b/test/test_tcp.py
@@ -1,4 +1,8 @@
-import cStringIO, Queue, time, socket, random
+import cStringIO
+import Queue
+import time
+import socket
+import random
import os
from netlib import tcp, certutils, test, certffi
import threading
@@ -6,8 +10,10 @@ import mock
import tutils
from OpenSSL import SSL
+
class EchoHandler(tcp.BaseHandler):
sni = None
+
def handle_sni(self, connection):
self.sni = connection.get_servername()
@@ -19,19 +25,22 @@ class EchoHandler(tcp.BaseHandler):
class ClientCipherListHandler(tcp.BaseHandler):
sni = None
+
def handle(self):
- self.wfile.write("%s"%self.connection.get_cipher_list())
+ self.wfile.write("%s" % self.connection.get_cipher_list())
self.wfile.flush()
class HangHandler(tcp.BaseHandler):
+
def handle(self):
- while 1:
+ while True:
time.sleep(1)
class TestServer(test.ServerTestBase):
handler = EchoHandler
+
def test_echo(self):
testval = "echo!\n"
c = tcp.TCPClient(("127.0.0.1", self.port))
@@ -51,7 +60,9 @@ class TestServer(test.ServerTestBase):
class TestServerBind(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
+
def handle(self):
self.wfile.write(str(self.connection.getpeername()))
self.wfile.flush()
@@ -65,7 +76,7 @@ class TestServerBind(test.ServerTestBase):
c.connect()
assert c.rfile.readline() == str(("127.0.0.1", random_port))
return
- except tcp.NetLibError: # port probably already in use
+ except tcp.NetLibError: # port probably already in use
pass
@@ -84,6 +95,7 @@ class TestServerIPv6(test.ServerTestBase):
class TestEcho(test.ServerTestBase):
handler = EchoHandler
+
def test_echo(self):
testval = "echo!\n"
c = tcp.TCPClient(("127.0.0.1", self.port))
@@ -94,16 +106,19 @@ class TestEcho(test.ServerTestBase):
class HardDisconnectHandler(tcp.BaseHandler):
+
def handle(self):
self.connection.close()
class TestFinishFail(test.ServerTestBase):
+
"""
This tests a difficult-to-trigger exception in the .finish() method of
the handler.
"""
handler = EchoHandler
+
def test_disconnect_in_finish(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -115,13 +130,14 @@ class TestFinishFail(test.ServerTestBase):
class TestServerSSL(test.ServerTestBase):
handler = EchoHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- cipher_list = "AES256-SHA",
- chain_file=tutils.test_data.path("data/server.crt")
- )
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ cipher_list="AES256-SHA",
+ chain_file=tutils.test_data.path("data/server.crt")
+ )
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -144,11 +160,12 @@ class TestServerSSL(test.ServerTestBase):
class TestSSLv3Only(test.ServerTestBase):
handler = EchoHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = True
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=True
)
+
def test_failure(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -156,20 +173,23 @@ class TestSSLv3Only(test.ServerTestBase):
class TestSSLClientCert(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
sni = None
+
def handle_sni(self, connection):
self.sni = connection.get_servername()
def handle(self):
- self.wfile.write("%s\n"%self.clientcert.serial)
+ self.wfile.write("%s\n" % self.clientcert.serial)
self.wfile.flush()
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = True,
- v3_only = False
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=True,
+ v3_only=False
)
+
def test_clientcert(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -187,8 +207,10 @@ class TestSSLClientCert(test.ServerTestBase):
class TestSNI(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
sni = None
+
def handle_sni(self, connection):
self.sni = connection.get_servername()
@@ -197,11 +219,12 @@ class TestSNI(test.ServerTestBase):
self.wfile.flush()
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -213,12 +236,13 @@ class TestSNI(test.ServerTestBase):
class TestServerCipherList(test.ServerTestBase):
handler = ClientCipherListHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- cipher_list = 'RC4-SHA'
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ cipher_list='RC4-SHA'
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -227,18 +251,21 @@ class TestServerCipherList(test.ServerTestBase):
class TestServerCurrentCipher(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
sni = None
+
def handle(self):
- self.wfile.write("%s"%str(self.get_current_cipher()))
+ self.wfile.write("%s" % str(self.get_current_cipher()))
self.wfile.flush()
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- cipher_list = 'RC4-SHA'
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ cipher_list='RC4-SHA'
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -249,12 +276,13 @@ class TestServerCurrentCipher(test.ServerTestBase):
class TestServerCipherListError(test.ServerTestBase):
handler = ClientCipherListHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- cipher_list = 'bogus'
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ cipher_list='bogus'
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -264,12 +292,13 @@ class TestServerCipherListError(test.ServerTestBase):
class TestClientCipherListError(test.ServerTestBase):
handler = ClientCipherListHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- cipher_list = 'RC4-SHA'
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ cipher_list='RC4-SHA'
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -277,15 +306,18 @@ class TestClientCipherListError(test.ServerTestBase):
class TestSSLDisconnect(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
+
def handle(self):
self.finish()
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -300,11 +332,12 @@ class TestSSLDisconnect(test.ServerTestBase):
class TestSSLHardDisconnect(test.ServerTestBase):
handler = HardDisconnectHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False
)
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -316,6 +349,7 @@ class TestSSLHardDisconnect(test.ServerTestBase):
class TestDisconnect(test.ServerTestBase):
+
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -326,7 +360,9 @@ class TestDisconnect(test.ServerTestBase):
class TestServerTimeOut(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
+
def handle(self):
self.timeout = False
self.settimeout(0.01)
@@ -344,6 +380,7 @@ class TestServerTimeOut(test.ServerTestBase):
class TestTimeOut(test.ServerTestBase):
handler = HangHandler
+
def test_timeout(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -355,11 +392,12 @@ class TestTimeOut(test.ServerTestBase):
class TestSSLTimeOut(test.ServerTestBase):
handler = HangHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False
)
+
def test_timeout_client(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -371,15 +409,16 @@ class TestSSLTimeOut(test.ServerTestBase):
class TestDHParams(test.ServerTestBase):
handler = HangHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- dhparams = certutils.CertStore.load_dhparam(
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ dhparams=certutils.CertStore.load_dhparam(
tutils.test_data.path("data/dhparam.pem"),
),
- cipher_list = "DHE-RSA-AES256-SHA"
+ cipher_list="DHE-RSA-AES256-SHA"
)
+
def test_dhparams(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
@@ -395,7 +434,9 @@ class TestDHParams(test.ServerTestBase):
class TestPrivkeyGen(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
+
def handle(self):
with tutils.tmpdir() as d:
ca1 = certutils.CertStore.from_store(d, "test2")
@@ -411,7 +452,9 @@ class TestPrivkeyGen(test.ServerTestBase):
class TestPrivkeyGenNoFlags(test.ServerTestBase):
+
class handler(tcp.BaseHandler):
+
def handle(self):
with tutils.tmpdir() as d:
ca1 = certutils.CertStore.from_store(d, "test2")
@@ -426,14 +469,15 @@ class TestPrivkeyGenNoFlags(test.ServerTestBase):
tutils.raises("sslv3 alert handshake failure", c.convert_to_ssl)
-
class TestTCPClient:
+
def test_conerr(self):
c = tcp.TCPClient(("127.0.0.1", 0))
tutils.raises(tcp.NetLibError, c.connect)
class TestFileLike:
+
def test_blocksize(self):
s = cStringIO.StringIO("1234567890abcdefghijklmnopqrstuvwxyz")
s = tcp.Reader(s)
@@ -460,7 +504,7 @@ class TestFileLike:
assert s.readline(3) == "foo"
def test_limitless(self):
- s = cStringIO.StringIO("f"*(50*1024))
+ s = cStringIO.StringIO("f" * (50 * 1024))
s = tcp.Reader(s)
ret = s.read(-1)
assert len(ret) == 50 * 1024
@@ -551,7 +595,9 @@ class TestFileLike:
s = tcp.Reader(o)
tutils.raises(tcp.NetLibDisconnect, s.readline, 10)
+
class TestAddress:
+
def test_simple(self):
a = tcp.Address("localhost", True)
assert a.use_ipv6
@@ -566,12 +612,12 @@ class TestAddress:
class TestSSLKeyLogger(test.ServerTestBase):
handler = EchoHandler
ssl = dict(
- cert = tutils.test_data.path("data/server.crt"),
- key = tutils.test_data.path("data/server.key"),
- request_client_cert = False,
- v3_only = False,
- cipher_list = "AES256-SHA"
- )
+ cert=tutils.test_data.path("data/server.crt"),
+ key=tutils.test_data.path("data/server.key"),
+ request_client_cert=False,
+ v3_only=False,
+ cipher_list="AES256-SHA"
+ )
def test_log(self):
testval = "echo!\n"
@@ -597,4 +643,4 @@ class TestSSLKeyLogger(test.ServerTestBase):
def test_create_logfun(self):
assert isinstance(tcp.SSLKeyLogger.create_logfun("test"), tcp.SSLKeyLogger)
- assert not tcp.SSLKeyLogger.create_logfun(False) \ No newline at end of file
+ assert not tcp.SSLKeyLogger.create_logfun(False)