diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_certutils.py | 37 | ||||
-rw-r--r-- | test/test_http.py | 6 | ||||
-rw-r--r-- | test/test_socks.py | 14 | ||||
-rw-r--r-- | test/test_tcp.py | 103 |
4 files changed, 115 insertions, 45 deletions
diff --git a/test/test_certutils.py b/test/test_certutils.py index 59c9dcd5..c96c5087 100644 --- a/test/test_certutils.py +++ b/test/test_certutils.py @@ -80,7 +80,7 @@ class TestCertStore: ca2 = certutils.CertStore.from_store(os.path.join(d, "ca2"), "test") assert not ca1.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() - dc = ca2.get_cert("foo.com", []) + dc = ca2.get_cert("foo.com", ["sans.example.com"]) dcp = os.path.join(d, "dc") f = open(dcp, "wb") f.write(dc[0].to_pem()) @@ -118,31 +118,34 @@ class TestSSLCert: def test_simple(self): with open(tutils.test_data.path("data/text_cert"), "rb") as f: d = f.read() - c = certutils.SSLCert.from_pem(d) - assert c.cn == "google.com" - assert len(c.altnames) == 436 + c1 = certutils.SSLCert.from_pem(d) + assert c1.cn == "google.com" + assert len(c1.altnames) == 436 with open(tutils.test_data.path("data/text_cert_2"), "rb") as f: d = f.read() - c = certutils.SSLCert.from_pem(d) - assert c.cn == "www.inode.co.nz" - assert len(c.altnames) == 2 - assert c.digest("sha1") - assert c.notbefore - assert c.notafter - assert c.subject - assert c.keyinfo == ("RSA", 2048) - assert c.serial - assert c.issuer - assert c.to_pem() - c.has_expired + c2 = certutils.SSLCert.from_pem(d) + assert c2.cn == "www.inode.co.nz" + assert len(c2.altnames) == 2 + assert c2.digest("sha1") + assert c2.notbefore + assert c2.notafter + assert c2.subject + assert c2.keyinfo == ("RSA", 2048) + assert c2.serial + assert c2.issuer + assert c2.to_pem() + assert c2.has_expired is not None + + assert not c1 == c2 + assert c1 != c2 def test_err_broken_sans(self): with open(tutils.test_data.path("data/text_cert_weird1"), "rb") as f: d = f.read() c = certutils.SSLCert.from_pem(d) # This breaks unless we ignore a decoding error. - c.altnames + assert c.altnames is not None def test_der(self): with open(tutils.test_data.path("data/dercert"), "rb") as f: diff --git a/test/test_http.py b/test/test_http.py index e3e92a1e..fed60946 100644 --- a/test/test_http.py +++ b/test/test_http.py @@ -325,6 +325,12 @@ def test_parse_url(): assert po == 80 assert pa == "/bar" + s, h, po, pa = http.parse_url("http://user:pass@foo/bar") + assert s == "http" + assert h == "foo" + assert po == 80 + assert pa == "/bar" + s, h, po, pa = http.parse_url("http://foo") assert pa == "/" diff --git a/test/test_socks.py b/test/test_socks.py index 740fdb9c..aa4f9c11 100644 --- a/test/test_socks.py +++ b/test/test_socks.py @@ -1,5 +1,6 @@ from cStringIO import StringIO import socket +import mock from nose.plugins.skip import SkipTest from netlib import socks, tcp import tutils @@ -81,4 +82,15 @@ def test_message_unknown_atyp(): tutils.raises(socks.SocksError, socks.Message.from_file, raw) m = socks.Message(5, 1, 0x02, tcp.Address(("example.com", 5050))) - tutils.raises(socks.SocksError, m.to_file, StringIO())
\ No newline at end of file + tutils.raises(socks.SocksError, m.to_file, StringIO()) + +def test_read(): + cs = StringIO("1234") + assert socks._read(cs, 3) == "123" + + cs = StringIO("123") + tutils.raises(socks.SocksError, socks._read, cs, 4) + + cs = mock.Mock() + cs.read = mock.Mock(side_effect=socket.error) + tutils.raises(socks.SocksError, socks._read, cs, 4)
\ No newline at end of file diff --git a/test/test_tcp.py b/test/test_tcp.py index ce96f16f..21fea23e 100644 --- a/test/test_tcp.py +++ b/test/test_tcp.py @@ -1,4 +1,5 @@ import cStringIO, Queue, time, socket, random +import os from netlib import tcp, certutils, test, certffi import mock import tutils @@ -71,30 +72,6 @@ class TestServerIPv6(test.ServerTestBase): assert c.rfile.readline() == testval -class FinishFailHandler(tcp.BaseHandler): - def handle(self): - v = self.rfile.readline() - self.wfile.write(v) - self.wfile.flush() - self.wfile.close() - self.rfile.close() - self.close = mock.MagicMock(side_effect=socket.error) - - -class TestFinishFail(test.ServerTestBase): - """ - This tests a difficult-to-trigger exception in the .finish() method of - the handler. - """ - handler = FinishFailHandler - def test_disconnect_in_finish(self): - testval = "echo!\n" - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - c.wfile.write("foo\n") - c.wfile.flush() - c.rfile.read(4) - class TestDisconnect(test.ServerTestBase): handler = EchoHandler def test_echo(self): @@ -111,6 +88,20 @@ class HardDisconnectHandler(tcp.BaseHandler): self.connection.close() +class TestFinishFail(test.ServerTestBase): + """ + This tests a difficult-to-trigger exception in the .finish() method of + the handler. + """ + handler = EchoHandler + def test_disconnect_in_finish(self): + c = tcp.TCPClient(("127.0.0.1", self.port)) + c.connect() + c.wfile.write("foo\n") + c.wfile.flush = mock.Mock(side_effect=tcp.NetLibDisconnect) + c.finish() + + class TestServerSSL(test.ServerTestBase): handler = EchoHandler ssl = dict( @@ -118,7 +109,8 @@ class TestServerSSL(test.ServerTestBase): key = tutils.test_data.path("data/server.key"), request_client_cert = False, v3_only = False, - cipher_list = "AES256-SHA" + cipher_list = "AES256-SHA", + chain_file=tutils.test_data.path("data/server.crt") ) def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) @@ -150,7 +142,7 @@ class TestSSLv3Only(test.ServerTestBase): def test_failure(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - tutils.raises(tcp.NetLibError, c.convert_to_ssl, sni="foo.com", method=tcp.TLSv1_METHOD) + tutils.raises(tcp.NetLibError, c.convert_to_ssl, sni="foo.com") class TestSSLClientCert(test.ServerTestBase): @@ -385,6 +377,11 @@ class TestDHParams(test.ServerTestBase): ret = c.get_current_cipher() assert ret[0] == "DHE-RSA-AES256-SHA" + def test_create_dhparams(self): + with tutils.tmpdir() as d: + filename = os.path.join(d, "dhparam.pem") + certutils.CertStore.load_dhparam(filename) + assert os.path.exists(filename) class TestPrivkeyGen(test.ServerTestBase): @@ -527,12 +524,22 @@ class TestFileLike: assert s.first_byte_timestamp == expected def test_read_ssl_error(self): - s = cStringIO.StringIO("foobar\nfoobar") s = mock.MagicMock() s.read = mock.MagicMock(side_effect=SSL.Error()) s = tcp.Reader(s) tutils.raises(tcp.NetLibSSLError, s.read, 1) + def test_read_syscall_ssl_error(self): + s = mock.MagicMock() + s.read = mock.MagicMock(side_effect=SSL.SysCallError()) + s = tcp.Reader(s) + tutils.raises(tcp.NetLibSSLError, s.read, 1) + + def test_reader_readline_disconnect(self): + o = mock.MagicMock() + o.read = mock.MagicMock(side_effect=socket.error) + s = tcp.Reader(o) + tutils.raises(tcp.NetLibDisconnect, s.readline, 10) class TestAddress: def test_simple(self): @@ -542,3 +549,45 @@ class TestAddress: assert not a == b c = tcp.Address("localhost", True) assert a == c + assert not a != c + assert repr(a) + + +class TestServer(test.ServerTestBase): + handler = EchoHandler + def test_echo(self): + testval = "echo!\n" + c = tcp.TCPClient(("127.0.0.1", self.port)) + c.connect() + c.wfile.write(testval) + c.wfile.flush() + assert c.rfile.readline() == testval + +class TestSSLKeyLogger(test.ServerTestBase): + handler = EchoHandler + ssl = dict( + cert = tutils.test_data.path("data/server.crt"), + key = tutils.test_data.path("data/server.key"), + request_client_cert = False, + v3_only = False, + cipher_list = "AES256-SHA" + ) + + def test_log(self): + _logfun = tcp.log_ssl_key + + with tutils.tmpdir() as d: + logfile = os.path.join(d, "foo", "bar", "logfile") + tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) + c = tcp.TCPClient(("127.0.0.1", self.port)) + c.connect() + c.convert_to_ssl() + tcp.log_ssl_key.close() + with open(logfile, "rb") as f: + assert f.read().count("CLIENT_RANDOM") == 2 + + tcp.log_ssl_key = _logfun + + def test_create_logfun(self): + assert isinstance(tcp.SSLKeyLogger.create_logfun("test"), tcp.SSLKeyLogger) + assert not tcp.SSLKeyLogger.create_logfun(False)
\ No newline at end of file |