aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_tcp.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_tcp.py')
-rw-r--r--test/test_tcp.py103
1 files changed, 76 insertions, 27 deletions
diff --git a/test/test_tcp.py b/test/test_tcp.py
index ce96f16f..21fea23e 100644
--- a/test/test_tcp.py
+++ b/test/test_tcp.py
@@ -1,4 +1,5 @@
import cStringIO, Queue, time, socket, random
+import os
from netlib import tcp, certutils, test, certffi
import mock
import tutils
@@ -71,30 +72,6 @@ class TestServerIPv6(test.ServerTestBase):
assert c.rfile.readline() == testval
-class FinishFailHandler(tcp.BaseHandler):
- def handle(self):
- v = self.rfile.readline()
- self.wfile.write(v)
- self.wfile.flush()
- self.wfile.close()
- self.rfile.close()
- self.close = mock.MagicMock(side_effect=socket.error)
-
-
-class TestFinishFail(test.ServerTestBase):
- """
- This tests a difficult-to-trigger exception in the .finish() method of
- the handler.
- """
- handler = FinishFailHandler
- def test_disconnect_in_finish(self):
- testval = "echo!\n"
- c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.wfile.write("foo\n")
- c.wfile.flush()
- c.rfile.read(4)
-
class TestDisconnect(test.ServerTestBase):
handler = EchoHandler
def test_echo(self):
@@ -111,6 +88,20 @@ class HardDisconnectHandler(tcp.BaseHandler):
self.connection.close()
+class TestFinishFail(test.ServerTestBase):
+ """
+ This tests a difficult-to-trigger exception in the .finish() method of
+ the handler.
+ """
+ handler = EchoHandler
+ def test_disconnect_in_finish(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.wfile.write("foo\n")
+ c.wfile.flush = mock.Mock(side_effect=tcp.NetLibDisconnect)
+ c.finish()
+
+
class TestServerSSL(test.ServerTestBase):
handler = EchoHandler
ssl = dict(
@@ -118,7 +109,8 @@ class TestServerSSL(test.ServerTestBase):
key = tutils.test_data.path("data/server.key"),
request_client_cert = False,
v3_only = False,
- cipher_list = "AES256-SHA"
+ cipher_list = "AES256-SHA",
+ chain_file=tutils.test_data.path("data/server.crt")
)
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
@@ -150,7 +142,7 @@ class TestSSLv3Only(test.ServerTestBase):
def test_failure(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- tutils.raises(tcp.NetLibError, c.convert_to_ssl, sni="foo.com", method=tcp.TLSv1_METHOD)
+ tutils.raises(tcp.NetLibError, c.convert_to_ssl, sni="foo.com")
class TestSSLClientCert(test.ServerTestBase):
@@ -385,6 +377,11 @@ class TestDHParams(test.ServerTestBase):
ret = c.get_current_cipher()
assert ret[0] == "DHE-RSA-AES256-SHA"
+ def test_create_dhparams(self):
+ with tutils.tmpdir() as d:
+ filename = os.path.join(d, "dhparam.pem")
+ certutils.CertStore.load_dhparam(filename)
+ assert os.path.exists(filename)
class TestPrivkeyGen(test.ServerTestBase):
@@ -527,12 +524,22 @@ class TestFileLike:
assert s.first_byte_timestamp == expected
def test_read_ssl_error(self):
- s = cStringIO.StringIO("foobar\nfoobar")
s = mock.MagicMock()
s.read = mock.MagicMock(side_effect=SSL.Error())
s = tcp.Reader(s)
tutils.raises(tcp.NetLibSSLError, s.read, 1)
+ def test_read_syscall_ssl_error(self):
+ s = mock.MagicMock()
+ s.read = mock.MagicMock(side_effect=SSL.SysCallError())
+ s = tcp.Reader(s)
+ tutils.raises(tcp.NetLibSSLError, s.read, 1)
+
+ def test_reader_readline_disconnect(self):
+ o = mock.MagicMock()
+ o.read = mock.MagicMock(side_effect=socket.error)
+ s = tcp.Reader(o)
+ tutils.raises(tcp.NetLibDisconnect, s.readline, 10)
class TestAddress:
def test_simple(self):
@@ -542,3 +549,45 @@ class TestAddress:
assert not a == b
c = tcp.Address("localhost", True)
assert a == c
+ assert not a != c
+ assert repr(a)
+
+
+class TestServer(test.ServerTestBase):
+ handler = EchoHandler
+ def test_echo(self):
+ testval = "echo!\n"
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
+
+class TestSSLKeyLogger(test.ServerTestBase):
+ handler = EchoHandler
+ ssl = dict(
+ cert = tutils.test_data.path("data/server.crt"),
+ key = tutils.test_data.path("data/server.key"),
+ request_client_cert = False,
+ v3_only = False,
+ cipher_list = "AES256-SHA"
+ )
+
+ def test_log(self):
+ _logfun = tcp.log_ssl_key
+
+ with tutils.tmpdir() as d:
+ logfile = os.path.join(d, "foo", "bar", "logfile")
+ tcp.log_ssl_key = tcp.SSLKeyLogger(logfile)
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ tcp.log_ssl_key.close()
+ with open(logfile, "rb") as f:
+ assert f.read().count("CLIENT_RANDOM") == 2
+
+ tcp.log_ssl_key = _logfun
+
+ def test_create_logfun(self):
+ assert isinstance(tcp.SSLKeyLogger.create_logfun("test"), tcp.SSLKeyLogger)
+ assert not tcp.SSLKeyLogger.create_logfun(False) \ No newline at end of file