diff options
author | Aldo Cortesi <aldo@nullcube.com> | 2015-05-28 12:12:37 +1200 |
---|---|---|
committer | Aldo Cortesi <aldo@nullcube.com> | 2015-05-28 12:12:37 +1200 |
commit | 41af65a1c478825d4df6239b33fbcb971dcf1df8 (patch) | |
tree | 9830657189938900adad330beba0478f0d0761c2 /test/test_tcp.py | |
parent | 5265b289575d3935e8af29b5c27c963832efc8ad (diff) | |
parent | 80378306960379f12aca72309dc47437cd1a825c (diff) | |
download | mitmproxy-41af65a1c478825d4df6239b33fbcb971dcf1df8.tar.gz mitmproxy-41af65a1c478825d4df6239b33fbcb971dcf1df8.tar.bz2 mitmproxy-41af65a1c478825d4df6239b33fbcb971dcf1df8.zip |
Merge branch 'Kriechi-cleanup'
Diffstat (limited to 'test/test_tcp.py')
-rw-r--r-- | test/test_tcp.py | 190 |
1 files changed, 118 insertions, 72 deletions
diff --git a/test/test_tcp.py b/test/test_tcp.py index 4dbdd780..ef00e029 100644 --- a/test/test_tcp.py +++ b/test/test_tcp.py @@ -1,4 +1,8 @@ -import cStringIO, Queue, time, socket, random +import cStringIO +import Queue +import time +import socket +import random import os from netlib import tcp, certutils, test, certffi import threading @@ -6,8 +10,10 @@ import mock import tutils from OpenSSL import SSL + class EchoHandler(tcp.BaseHandler): sni = None + def handle_sni(self, connection): self.sni = connection.get_servername() @@ -19,19 +25,22 @@ class EchoHandler(tcp.BaseHandler): class ClientCipherListHandler(tcp.BaseHandler): sni = None + def handle(self): - self.wfile.write("%s"%self.connection.get_cipher_list()) + self.wfile.write("%s" % self.connection.get_cipher_list()) self.wfile.flush() class HangHandler(tcp.BaseHandler): + def handle(self): - while 1: + while True: time.sleep(1) class TestServer(test.ServerTestBase): handler = EchoHandler + def test_echo(self): testval = "echo!\n" c = tcp.TCPClient(("127.0.0.1", self.port)) @@ -51,7 +60,9 @@ class TestServer(test.ServerTestBase): class TestServerBind(test.ServerTestBase): + class handler(tcp.BaseHandler): + def handle(self): self.wfile.write(str(self.connection.getpeername())) self.wfile.flush() @@ -65,7 +76,7 @@ class TestServerBind(test.ServerTestBase): c.connect() assert c.rfile.readline() == str(("127.0.0.1", random_port)) return - except tcp.NetLibError: # port probably already in use + except tcp.NetLibError: # port probably already in use pass @@ -84,6 +95,7 @@ class TestServerIPv6(test.ServerTestBase): class TestEcho(test.ServerTestBase): handler = EchoHandler + def test_echo(self): testval = "echo!\n" c = tcp.TCPClient(("127.0.0.1", self.port)) @@ -94,16 +106,19 @@ class TestEcho(test.ServerTestBase): class HardDisconnectHandler(tcp.BaseHandler): + def handle(self): self.connection.close() class TestFinishFail(test.ServerTestBase): + """ This tests a difficult-to-trigger exception in the .finish() method of the handler. """ handler = EchoHandler + def test_disconnect_in_finish(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -115,13 +130,14 @@ class TestFinishFail(test.ServerTestBase): class TestServerSSL(test.ServerTestBase): handler = EchoHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - cipher_list = "AES256-SHA", - chain_file=tutils.test_data.path("data/server.crt") - ) + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + cipher_list="AES256-SHA", + chain_file=tutils.test_data.path("data/server.crt") + ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -144,11 +160,12 @@ class TestServerSSL(test.ServerTestBase): class TestSSLv3Only(test.ServerTestBase): handler = EchoHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = True + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=True ) + def test_failure(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -156,20 +173,23 @@ class TestSSLv3Only(test.ServerTestBase): class TestSSLClientCert(test.ServerTestBase): + class handler(tcp.BaseHandler): sni = None + def handle_sni(self, connection): self.sni = connection.get_servername() def handle(self): - self.wfile.write("%s\n"%self.clientcert.serial) + self.wfile.write("%s\n" % self.clientcert.serial) self.wfile.flush() ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = True, - v3_only = False + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=True, + v3_only=False ) + def test_clientcert(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -187,8 +207,10 @@ class TestSSLClientCert(test.ServerTestBase): class TestSNI(test.ServerTestBase): + class handler(tcp.BaseHandler): sni = None + def handle_sni(self, connection): self.sni = connection.get_servername() @@ -197,11 +219,12 @@ class TestSNI(test.ServerTestBase): self.wfile.flush() ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -213,12 +236,13 @@ class TestSNI(test.ServerTestBase): class TestServerCipherList(test.ServerTestBase): handler = ClientCipherListHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - cipher_list = 'RC4-SHA' + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + cipher_list='RC4-SHA' ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -227,18 +251,21 @@ class TestServerCipherList(test.ServerTestBase): class TestServerCurrentCipher(test.ServerTestBase): + class handler(tcp.BaseHandler): sni = None + def handle(self): - self.wfile.write("%s"%str(self.get_current_cipher())) + self.wfile.write("%s" % str(self.get_current_cipher())) self.wfile.flush() ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - cipher_list = 'RC4-SHA' + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + cipher_list='RC4-SHA' ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -249,12 +276,13 @@ class TestServerCurrentCipher(test.ServerTestBase): class TestServerCipherListError(test.ServerTestBase): handler = ClientCipherListHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - cipher_list = 'bogus' + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + cipher_list='bogus' ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -264,12 +292,13 @@ class TestServerCipherListError(test.ServerTestBase): class TestClientCipherListError(test.ServerTestBase): handler = ClientCipherListHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - cipher_list = 'RC4-SHA' + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + cipher_list='RC4-SHA' ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -277,15 +306,18 @@ class TestClientCipherListError(test.ServerTestBase): class TestSSLDisconnect(test.ServerTestBase): + class handler(tcp.BaseHandler): + def handle(self): self.finish() ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -300,11 +332,12 @@ class TestSSLDisconnect(test.ServerTestBase): class TestSSLHardDisconnect(test.ServerTestBase): handler = HardDisconnectHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False ) + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -316,6 +349,7 @@ class TestSSLHardDisconnect(test.ServerTestBase): class TestDisconnect(test.ServerTestBase): + def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -326,7 +360,9 @@ class TestDisconnect(test.ServerTestBase): class TestServerTimeOut(test.ServerTestBase): + class handler(tcp.BaseHandler): + def handle(self): self.timeout = False self.settimeout(0.01) @@ -344,6 +380,7 @@ class TestServerTimeOut(test.ServerTestBase): class TestTimeOut(test.ServerTestBase): handler = HangHandler + def test_timeout(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -355,11 +392,12 @@ class TestTimeOut(test.ServerTestBase): class TestSSLTimeOut(test.ServerTestBase): handler = HangHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False ) + def test_timeout_client(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -371,15 +409,16 @@ class TestSSLTimeOut(test.ServerTestBase): class TestDHParams(test.ServerTestBase): handler = HangHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - dhparams = certutils.CertStore.load_dhparam( + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + dhparams=certutils.CertStore.load_dhparam( tutils.test_data.path("data/dhparam.pem"), ), - cipher_list = "DHE-RSA-AES256-SHA" + cipher_list="DHE-RSA-AES256-SHA" ) + def test_dhparams(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() @@ -395,7 +434,9 @@ class TestDHParams(test.ServerTestBase): class TestPrivkeyGen(test.ServerTestBase): + class handler(tcp.BaseHandler): + def handle(self): with tutils.tmpdir() as d: ca1 = certutils.CertStore.from_store(d, "test2") @@ -411,7 +452,9 @@ class TestPrivkeyGen(test.ServerTestBase): class TestPrivkeyGenNoFlags(test.ServerTestBase): + class handler(tcp.BaseHandler): + def handle(self): with tutils.tmpdir() as d: ca1 = certutils.CertStore.from_store(d, "test2") @@ -426,14 +469,15 @@ class TestPrivkeyGenNoFlags(test.ServerTestBase): tutils.raises("sslv3 alert handshake failure", c.convert_to_ssl) - class TestTCPClient: + def test_conerr(self): c = tcp.TCPClient(("127.0.0.1", 0)) tutils.raises(tcp.NetLibError, c.connect) class TestFileLike: + def test_blocksize(self): s = cStringIO.StringIO("1234567890abcdefghijklmnopqrstuvwxyz") s = tcp.Reader(s) @@ -460,7 +504,7 @@ class TestFileLike: assert s.readline(3) == "foo" def test_limitless(self): - s = cStringIO.StringIO("f"*(50*1024)) + s = cStringIO.StringIO("f" * (50 * 1024)) s = tcp.Reader(s) ret = s.read(-1) assert len(ret) == 50 * 1024 @@ -551,7 +595,9 @@ class TestFileLike: s = tcp.Reader(o) tutils.raises(tcp.NetLibDisconnect, s.readline, 10) + class TestAddress: + def test_simple(self): a = tcp.Address("localhost", True) assert a.use_ipv6 @@ -566,12 +612,12 @@ class TestAddress: class TestSSLKeyLogger(test.ServerTestBase): handler = EchoHandler ssl = dict( - cert = tutils.test_data.path("data/server.crt"), - key = tutils.test_data.path("data/server.key"), - request_client_cert = False, - v3_only = False, - cipher_list = "AES256-SHA" - ) + cert=tutils.test_data.path("data/server.crt"), + key=tutils.test_data.path("data/server.key"), + request_client_cert=False, + v3_only=False, + cipher_list="AES256-SHA" + ) def test_log(self): testval = "echo!\n" @@ -597,4 +643,4 @@ class TestSSLKeyLogger(test.ServerTestBase): def test_create_logfun(self): assert isinstance(tcp.SSLKeyLogger.create_logfun("test"), tcp.SSLKeyLogger) - assert not tcp.SSLKeyLogger.create_logfun(False)
\ No newline at end of file + assert not tcp.SSLKeyLogger.create_logfun(False) |