diff options
Diffstat (limited to 'test/netlib/test_tcp.py')
-rw-r--r-- | test/netlib/test_tcp.py | 802 |
1 files changed, 0 insertions, 802 deletions
diff --git a/test/netlib/test_tcp.py b/test/netlib/test_tcp.py deleted file mode 100644 index 594ee21c..00000000 --- a/test/netlib/test_tcp.py +++ /dev/null @@ -1,802 +0,0 @@ -from io import BytesIO -import queue -import time -import socket -import random -import os -import threading -import mock - -from OpenSSL import SSL - -from mitmproxy import certs -from netlib import tcp -from mitmproxy.test import tutils -from mitmproxy import exceptions - -from . import tservers - - -class EchoHandler(tcp.BaseHandler): - sni = None - - def handle_sni(self, connection): - self.sni = connection.get_servername() - - def handle(self): - v = self.rfile.readline() - self.wfile.write(v) - self.wfile.flush() - - -class ClientCipherListHandler(tcp.BaseHandler): - sni = None - - def handle(self): - self.wfile.write("%s" % self.connection.get_cipher_list()) - self.wfile.flush() - - -class HangHandler(tcp.BaseHandler): - - def handle(self): - # Hang as long as the client connection is alive - while True: - try: - self.connection.setblocking(0) - ret = self.connection.recv(1) - # Client connection is dead... - if ret == "" or ret == b"": - return - except socket.error: - pass - except SSL.WantReadError: - pass - except Exception: - return - time.sleep(0.1) - - -class ALPNHandler(tcp.BaseHandler): - sni = None - - def handle(self): - alp = self.get_alpn_proto_negotiated() - if alp: - self.wfile.write(alp) - else: - self.wfile.write(b"NONE") - self.wfile.flush() - - -class TestServer(tservers.ServerTestBase): - handler = EchoHandler - - def test_echo(self): - testval = b"echo!\n" - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.wfile.write(testval) - c.wfile.flush() - assert c.rfile.readline() == testval - - def test_thread_start_error(self): - with mock.patch.object(threading.Thread, "start", side_effect=threading.ThreadError("nonewthread")) as m: - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - assert not c.rfile.read(1) - assert m.called - assert "nonewthread" in self.q.get_nowait() - self.test_echo() - - -class TestServerBind(tservers.ServerTestBase): - - class handler(tcp.BaseHandler): - - def handle(self): - self.wfile.write(str(self.connection.getpeername()).encode()) - self.wfile.flush() - - def test_bind(self): - """ Test to bind to a given random port. Try again if the random port turned out to be blocked. """ - for i in range(20): - random_port = random.randrange(1024, 65535) - try: - c = tcp.TCPClient( - ("127.0.0.1", self.port), source_address=( - "127.0.0.1", random_port)) - with c.connect(): - assert c.rfile.readline() == str(("127.0.0.1", random_port)).encode() - return - except exceptions.TcpException: # port probably already in use - pass - - -class TestServerIPv6(tservers.ServerTestBase): - handler = EchoHandler - addr = tcp.Address(("localhost", 0), use_ipv6=True) - - def test_echo(self): - testval = b"echo!\n" - c = tcp.TCPClient(tcp.Address(("::1", self.port), use_ipv6=True)) - with c.connect(): - c.wfile.write(testval) - c.wfile.flush() - assert c.rfile.readline() == testval - - -class TestEcho(tservers.ServerTestBase): - handler = EchoHandler - - def test_echo(self): - testval = b"echo!\n" - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.wfile.write(testval) - c.wfile.flush() - assert c.rfile.readline() == testval - - -class HardDisconnectHandler(tcp.BaseHandler): - - def handle(self): - self.connection.close() - - -class TestFinishFail(tservers.ServerTestBase): - - """ - This tests a difficult-to-trigger exception in the .finish() method of - the handler. - """ - handler = EchoHandler - - def test_disconnect_in_finish(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.wfile.write(b"foo\n") - c.wfile.flush = mock.Mock(side_effect=exceptions.TcpDisconnect) - c.finish() - - -class TestServerSSL(tservers.ServerTestBase): - handler = EchoHandler - ssl = dict( - cipher_list="AES256-SHA", - chain_file=tutils.test_data.path("data/server.crt") - ) - - def test_echo(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl(sni="foo.com", options=SSL.OP_ALL) - testval = b"echo!\n" - c.wfile.write(testval) - c.wfile.flush() - assert c.rfile.readline() == testval - - def test_get_current_cipher(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - assert not c.get_current_cipher() - c.convert_to_ssl(sni="foo.com") - ret = c.get_current_cipher() - assert ret - assert "AES" in ret[0] - - -class TestSSLv3Only(tservers.ServerTestBase): - handler = EchoHandler - ssl = dict( - request_client_cert=False, - v3_only=True - ) - - def test_failure(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - tutils.raises(exceptions.TlsException, c.convert_to_ssl, sni="foo.com") - - -class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase): - handler = EchoHandler - - ssl = dict( - cert=tutils.test_data.path("data/verificationcerts/self-signed.crt"), - key=tutils.test_data.path("data/verificationcerts/self-signed.key") - ) - - def test_mode_default_should_pass(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl() - - # Verification errors should be saved even if connection isn't aborted - # aborted - assert c.ssl_verification_error - - testval = b"echo!\n" - c.wfile.write(testval) - c.wfile.flush() - assert c.rfile.readline() == testval - - def test_mode_none_should_pass(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl(verify_options=SSL.VERIFY_NONE) - - # Verification errors should be saved even if connection isn't aborted - assert c.ssl_verification_error - - testval = b"echo!\n" - c.wfile.write(testval) - c.wfile.flush() - assert c.rfile.readline() == testval - - def test_mode_strict_should_fail(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - with tutils.raises(exceptions.InvalidCertificateException): - c.convert_to_ssl( - sni="example.mitmproxy.org", - verify_options=SSL.VERIFY_PEER, - ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt") - ) - - assert c.ssl_verification_error - - # Unknown issuing certificate authority for first certificate - assert "errno: 18" in str(c.ssl_verification_error) - assert "depth: 0" in str(c.ssl_verification_error) - - -class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase): - handler = EchoHandler - - ssl = dict( - cert=tutils.test_data.path("data/verificationcerts/trusted-leaf.crt"), - key=tutils.test_data.path("data/verificationcerts/trusted-leaf.key") - ) - - def test_should_fail_without_sni(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - with tutils.raises(exceptions.TlsException): - c.convert_to_ssl( - verify_options=SSL.VERIFY_PEER, - ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt") - ) - - def test_should_fail(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - with tutils.raises(exceptions.InvalidCertificateException): - c.convert_to_ssl( - sni="mitmproxy.org", - verify_options=SSL.VERIFY_PEER, - ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt") - ) - assert c.ssl_verification_error - - -class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase): - handler = EchoHandler - - ssl = dict( - cert=tutils.test_data.path("data/verificationcerts/trusted-leaf.crt"), - key=tutils.test_data.path("data/verificationcerts/trusted-leaf.key") - ) - - def test_mode_strict_w_pemfile_should_pass(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl( - sni="example.mitmproxy.org", - verify_options=SSL.VERIFY_PEER, - ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt") - ) - - assert c.ssl_verification_error is None - - testval = b"echo!\n" - c.wfile.write(testval) - c.wfile.flush() - assert c.rfile.readline() == testval - - def test_mode_strict_w_cadir_should_pass(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl( - sni="example.mitmproxy.org", - verify_options=SSL.VERIFY_PEER, - ca_path=tutils.test_data.path("data/verificationcerts/") - ) - - assert c.ssl_verification_error is None - - testval = b"echo!\n" - c.wfile.write(testval) - c.wfile.flush() - assert c.rfile.readline() == testval - - -class TestSSLClientCert(tservers.ServerTestBase): - - class handler(tcp.BaseHandler): - sni = None - - def handle_sni(self, connection): - self.sni = connection.get_servername() - - def handle(self): - self.wfile.write(b"%d\n" % self.clientcert.serial) - self.wfile.flush() - - ssl = dict( - request_client_cert=True, - v3_only=False - ) - - def test_clientcert(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl( - cert=tutils.test_data.path("data/clientcert/client.pem")) - assert c.rfile.readline().strip() == b"1" - - def test_clientcert_err(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - tutils.raises( - exceptions.TlsException, - c.convert_to_ssl, - cert=tutils.test_data.path("data/clientcert/make") - ) - - -class TestSNI(tservers.ServerTestBase): - - class handler(tcp.BaseHandler): - sni = None - - def handle_sni(self, connection): - self.sni = connection.get_servername() - - def handle(self): - self.wfile.write(self.sni) - self.wfile.flush() - - ssl = True - - def test_echo(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl(sni="foo.com") - assert c.sni == "foo.com" - assert c.rfile.readline() == b"foo.com" - - -class TestServerCipherList(tservers.ServerTestBase): - handler = ClientCipherListHandler - ssl = dict( - cipher_list='RC4-SHA' - ) - - def test_echo(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl(sni="foo.com") - assert c.rfile.readline() == b"['RC4-SHA']" - - -class TestServerCurrentCipher(tservers.ServerTestBase): - - class handler(tcp.BaseHandler): - sni = None - - def handle(self): - self.wfile.write(str(self.get_current_cipher()).encode()) - self.wfile.flush() - - ssl = dict( - cipher_list='RC4-SHA' - ) - - def test_echo(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl(sni="foo.com") - assert b"RC4-SHA" in c.rfile.readline() - - -class TestServerCipherListError(tservers.ServerTestBase): - handler = ClientCipherListHandler - ssl = dict( - cipher_list='bogus' - ) - - def test_echo(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - tutils.raises("handshake error", c.convert_to_ssl, sni="foo.com") - - -class TestClientCipherListError(tservers.ServerTestBase): - handler = ClientCipherListHandler - ssl = dict( - cipher_list='RC4-SHA' - ) - - def test_echo(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - tutils.raises( - "cipher specification", - c.convert_to_ssl, - sni="foo.com", - cipher_list="bogus" - ) - - -class TestSSLDisconnect(tservers.ServerTestBase): - - class handler(tcp.BaseHandler): - - def handle(self): - self.finish() - - ssl = True - - def test_echo(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl() - # Excercise SSL.ZeroReturnError - c.rfile.read(10) - c.close() - tutils.raises(exceptions.TcpDisconnect, c.wfile.write, b"foo") - tutils.raises(queue.Empty, self.q.get_nowait) - - -class TestSSLHardDisconnect(tservers.ServerTestBase): - handler = HardDisconnectHandler - ssl = True - - def test_echo(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl() - # Exercise SSL.SysCallError - c.rfile.read(10) - c.close() - tutils.raises(exceptions.TcpDisconnect, c.wfile.write, b"foo") - - -class TestDisconnect(tservers.ServerTestBase): - - def test_echo(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.rfile.read(10) - c.wfile.write(b"foo") - c.close() - c.close() - - -class TestServerTimeOut(tservers.ServerTestBase): - - class handler(tcp.BaseHandler): - - def handle(self): - self.timeout = False - self.settimeout(0.01) - try: - self.rfile.read(10) - except exceptions.TcpTimeout: - self.timeout = True - - def test_timeout(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - time.sleep(0.3) - assert self.last_handler.timeout - - -class TestTimeOut(tservers.ServerTestBase): - handler = HangHandler - - def test_timeout(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.settimeout(0.1) - assert c.gettimeout() == 0.1 - tutils.raises(exceptions.TcpTimeout, c.rfile.read, 10) - - -class TestALPNClient(tservers.ServerTestBase): - handler = ALPNHandler - ssl = dict( - alpn_select=b"bar" - ) - - if tcp.HAS_ALPN: - def test_alpn(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl(alpn_protos=[b"foo", b"bar", b"fasel"]) - assert c.get_alpn_proto_negotiated() == b"bar" - assert c.rfile.readline().strip() == b"bar" - - def test_no_alpn(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl() - assert c.get_alpn_proto_negotiated() == b"" - assert c.rfile.readline().strip() == b"NONE" - - else: - def test_none_alpn(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl(alpn_protos=[b"foo", b"bar", b"fasel"]) - assert c.get_alpn_proto_negotiated() == b"" - assert c.rfile.readline() == b"NONE" - - -class TestNoSSLNoALPNClient(tservers.ServerTestBase): - handler = ALPNHandler - - def test_no_ssl_no_alpn(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - assert c.get_alpn_proto_negotiated() == b"" - assert c.rfile.readline().strip() == b"NONE" - - -class TestSSLTimeOut(tservers.ServerTestBase): - handler = HangHandler - ssl = True - - def test_timeout_client(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl() - c.settimeout(0.1) - tutils.raises(exceptions.TcpTimeout, c.rfile.read, 10) - - -class TestDHParams(tservers.ServerTestBase): - handler = HangHandler - ssl = dict( - dhparams=certs.CertStore.load_dhparam( - tutils.test_data.path("data/dhparam.pem"), - ), - cipher_list="DHE-RSA-AES256-SHA" - ) - - def test_dhparams(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl() - ret = c.get_current_cipher() - assert ret[0] == "DHE-RSA-AES256-SHA" - - def test_create_dhparams(self): - with tutils.tmpdir() as d: - filename = os.path.join(d, "dhparam.pem") - certs.CertStore.load_dhparam(filename) - assert os.path.exists(filename) - - -class TestTCPClient: - - def test_conerr(self): - c = tcp.TCPClient(("127.0.0.1", 0)) - tutils.raises(exceptions.TcpException, c.connect) - - -class TestFileLike: - - def test_blocksize(self): - s = BytesIO(b"1234567890abcdefghijklmnopqrstuvwxyz") - s = tcp.Reader(s) - s.BLOCKSIZE = 2 - assert s.read(1) == b"1" - assert s.read(2) == b"23" - assert s.read(3) == b"456" - assert s.read(4) == b"7890" - d = s.read(-1) - assert d.startswith(b"abc") and d.endswith(b"xyz") - - def test_wrap(self): - s = BytesIO(b"foobar\nfoobar") - s.flush() - s = tcp.Reader(s) - assert s.readline() == b"foobar\n" - assert s.readline() == b"foobar" - # Test __getattr__ - assert s.isatty - - def test_limit(self): - s = BytesIO(b"foobar\nfoobar") - s = tcp.Reader(s) - assert s.readline(3) == b"foo" - - def test_limitless(self): - s = BytesIO(b"f" * (50 * 1024)) - s = tcp.Reader(s) - ret = s.read(-1) - assert len(ret) == 50 * 1024 - - def test_readlog(self): - s = BytesIO(b"foobar\nfoobar") - s = tcp.Reader(s) - assert not s.is_logging() - s.start_log() - assert s.is_logging() - s.readline() - assert s.get_log() == b"foobar\n" - s.read(1) - assert s.get_log() == b"foobar\nf" - s.start_log() - assert s.get_log() == b"" - s.read(1) - assert s.get_log() == b"o" - s.stop_log() - tutils.raises(ValueError, s.get_log) - - def test_writelog(self): - s = BytesIO() - s = tcp.Writer(s) - s.start_log() - assert s.is_logging() - s.write(b"x") - assert s.get_log() == b"x" - s.write(b"x") - assert s.get_log() == b"xx" - - def test_writer_flush_error(self): - s = BytesIO() - s = tcp.Writer(s) - o = mock.MagicMock() - o.flush = mock.MagicMock(side_effect=socket.error) - s.o = o - tutils.raises(exceptions.TcpDisconnect, s.flush) - - def test_reader_read_error(self): - s = BytesIO(b"foobar\nfoobar") - s = tcp.Reader(s) - o = mock.MagicMock() - o.read = mock.MagicMock(side_effect=socket.error) - s.o = o - tutils.raises(exceptions.TcpDisconnect, s.read, 10) - - def test_reset_timestamps(self): - s = BytesIO(b"foobar\nfoobar") - s = tcp.Reader(s) - s.first_byte_timestamp = 500 - s.reset_timestamps() - assert not s.first_byte_timestamp - - def test_first_byte_timestamp_updated_on_read(self): - s = BytesIO(b"foobar\nfoobar") - s = tcp.Reader(s) - s.read(1) - assert s.first_byte_timestamp - expected = s.first_byte_timestamp - s.read(5) - assert s.first_byte_timestamp == expected - - def test_first_byte_timestamp_updated_on_readline(self): - s = BytesIO(b"foobar\nfoobar\nfoobar") - s = tcp.Reader(s) - s.readline() - assert s.first_byte_timestamp - expected = s.first_byte_timestamp - s.readline() - assert s.first_byte_timestamp == expected - - def test_read_ssl_error(self): - s = mock.MagicMock() - s.read = mock.MagicMock(side_effect=SSL.Error()) - s = tcp.Reader(s) - tutils.raises(exceptions.TlsException, s.read, 1) - - def test_read_syscall_ssl_error(self): - s = mock.MagicMock() - s.read = mock.MagicMock(side_effect=SSL.SysCallError()) - s = tcp.Reader(s) - tutils.raises(exceptions.TlsException, s.read, 1) - - def test_reader_readline_disconnect(self): - o = mock.MagicMock() - o.read = mock.MagicMock(side_effect=socket.error) - s = tcp.Reader(o) - tutils.raises(exceptions.TcpDisconnect, s.readline, 10) - - def test_reader_incomplete_error(self): - s = BytesIO(b"foobar") - s = tcp.Reader(s) - tutils.raises(exceptions.TcpReadIncomplete, s.safe_read, 10) - - -class TestPeek(tservers.ServerTestBase): - handler = EchoHandler - - def _connect(self, c): - return c.connect() - - def test_peek(self): - testval = b"peek!\n" - c = tcp.TCPClient(("127.0.0.1", self.port)) - with self._connect(c): - c.wfile.write(testval) - c.wfile.flush() - - assert c.rfile.peek(4) == b"peek" - assert c.rfile.peek(6) == b"peek!\n" - assert c.rfile.readline() == testval - - c.close() - with tutils.raises(exceptions.NetlibException): - if c.rfile.peek(1) == b"": - # Workaround for Python 2 on Unix: - # Peeking a closed connection does not raise an exception here. - raise exceptions.NetlibException() - - -class TestPeekSSL(TestPeek): - ssl = True - - def _connect(self, c): - with c.connect() as conn: - c.convert_to_ssl() - return conn.pop() - - -class TestAddress: - def test_simple(self): - a = tcp.Address(("localhost", 80), True) - assert a.use_ipv6 - b = tcp.Address(("foo.com", 80), True) - assert not a == b - c = tcp.Address(("localhost", 80), True) - assert a == c - assert not a != c - assert repr(a) == "localhost:80" - - -class TestSSLKeyLogger(tservers.ServerTestBase): - handler = EchoHandler - ssl = dict( - cipher_list="AES256-SHA" - ) - - def test_log(self): - testval = b"echo!\n" - _logfun = tcp.log_ssl_key - - with tutils.tmpdir() as d: - logfile = os.path.join(d, "foo", "bar", "logfile") - tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) - - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl() - c.wfile.write(testval) - c.wfile.flush() - assert c.rfile.readline() == testval - c.finish() - - tcp.log_ssl_key.close() - with open(logfile, "rb") as f: - assert f.read().count(b"CLIENT_RANDOM") == 2 - - tcp.log_ssl_key = _logfun - - def test_create_logfun(self): - assert isinstance( - tcp.SSLKeyLogger.create_logfun("test"), - tcp.SSLKeyLogger) - assert not tcp.SSLKeyLogger.create_logfun(False) |