diff options
Diffstat (limited to 'test/test_tcp.py')
-rw-r--r-- | test/test_tcp.py | 103 |
1 files changed, 76 insertions, 27 deletions
diff --git a/test/test_tcp.py b/test/test_tcp.py index ce96f16f..21fea23e 100644 --- a/test/test_tcp.py +++ b/test/test_tcp.py @@ -1,4 +1,5 @@ import cStringIO, Queue, time, socket, random +import os from netlib import tcp, certutils, test, certffi import mock import tutils @@ -71,30 +72,6 @@ class TestServerIPv6(test.ServerTestBase): assert c.rfile.readline() == testval -class FinishFailHandler(tcp.BaseHandler): - def handle(self): - v = self.rfile.readline() - self.wfile.write(v) - self.wfile.flush() - self.wfile.close() - self.rfile.close() - self.close = mock.MagicMock(side_effect=socket.error) - - -class TestFinishFail(test.ServerTestBase): - """ - This tests a difficult-to-trigger exception in the .finish() method of - the handler. - """ - handler = FinishFailHandler - def test_disconnect_in_finish(self): - testval = "echo!\n" - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - c.wfile.write("foo\n") - c.wfile.flush() - c.rfile.read(4) - class TestDisconnect(test.ServerTestBase): handler = EchoHandler def test_echo(self): @@ -111,6 +88,20 @@ class HardDisconnectHandler(tcp.BaseHandler): self.connection.close() +class TestFinishFail(test.ServerTestBase): + """ + This tests a difficult-to-trigger exception in the .finish() method of + the handler. + """ + handler = EchoHandler + def test_disconnect_in_finish(self): + c = tcp.TCPClient(("127.0.0.1", self.port)) + c.connect() + c.wfile.write("foo\n") + c.wfile.flush = mock.Mock(side_effect=tcp.NetLibDisconnect) + c.finish() + + class TestServerSSL(test.ServerTestBase): handler = EchoHandler ssl = dict( @@ -118,7 +109,8 @@ class TestServerSSL(test.ServerTestBase): key = tutils.test_data.path("data/server.key"), request_client_cert = False, v3_only = False, - cipher_list = "AES256-SHA" + cipher_list = "AES256-SHA", + chain_file=tutils.test_data.path("data/server.crt") ) def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) @@ -150,7 +142,7 @@ class TestSSLv3Only(test.ServerTestBase): def test_failure(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - tutils.raises(tcp.NetLibError, c.convert_to_ssl, sni="foo.com", method=tcp.TLSv1_METHOD) + tutils.raises(tcp.NetLibError, c.convert_to_ssl, sni="foo.com") class TestSSLClientCert(test.ServerTestBase): @@ -385,6 +377,11 @@ class TestDHParams(test.ServerTestBase): ret = c.get_current_cipher() assert ret[0] == "DHE-RSA-AES256-SHA" + def test_create_dhparams(self): + with tutils.tmpdir() as d: + filename = os.path.join(d, "dhparam.pem") + certutils.CertStore.load_dhparam(filename) + assert os.path.exists(filename) class TestPrivkeyGen(test.ServerTestBase): @@ -527,12 +524,22 @@ class TestFileLike: assert s.first_byte_timestamp == expected def test_read_ssl_error(self): - s = cStringIO.StringIO("foobar\nfoobar") s = mock.MagicMock() s.read = mock.MagicMock(side_effect=SSL.Error()) s = tcp.Reader(s) tutils.raises(tcp.NetLibSSLError, s.read, 1) + def test_read_syscall_ssl_error(self): + s = mock.MagicMock() + s.read = mock.MagicMock(side_effect=SSL.SysCallError()) + s = tcp.Reader(s) + tutils.raises(tcp.NetLibSSLError, s.read, 1) + + def test_reader_readline_disconnect(self): + o = mock.MagicMock() + o.read = mock.MagicMock(side_effect=socket.error) + s = tcp.Reader(o) + tutils.raises(tcp.NetLibDisconnect, s.readline, 10) class TestAddress: def test_simple(self): @@ -542,3 +549,45 @@ class TestAddress: assert not a == b c = tcp.Address("localhost", True) assert a == c + assert not a != c + assert repr(a) + + +class TestServer(test.ServerTestBase): + handler = EchoHandler + def test_echo(self): + testval = "echo!\n" + c = tcp.TCPClient(("127.0.0.1", self.port)) + c.connect() + c.wfile.write(testval) + c.wfile.flush() + assert c.rfile.readline() == testval + +class TestSSLKeyLogger(test.ServerTestBase): + handler = EchoHandler + ssl = dict( + cert = tutils.test_data.path("data/server.crt"), + key = tutils.test_data.path("data/server.key"), + request_client_cert = False, + v3_only = False, + cipher_list = "AES256-SHA" + ) + + def test_log(self): + _logfun = tcp.log_ssl_key + + with tutils.tmpdir() as d: + logfile = os.path.join(d, "foo", "bar", "logfile") + tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) + c = tcp.TCPClient(("127.0.0.1", self.port)) + c.connect() + c.convert_to_ssl() + tcp.log_ssl_key.close() + with open(logfile, "rb") as f: + assert f.read().count("CLIENT_RANDOM") == 2 + + tcp.log_ssl_key = _logfun + + def test_create_logfun(self): + assert isinstance(tcp.SSLKeyLogger.create_logfun("test"), tcp.SSLKeyLogger) + assert not tcp.SSLKeyLogger.create_logfun(False)
\ No newline at end of file |