diff options
author | Maximilian Hils <git@maximilianhils.com> | 2015-09-20 20:35:45 +0200 |
---|---|---|
committer | Maximilian Hils <git@maximilianhils.com> | 2015-09-20 20:35:45 +0200 |
commit | daebd1bd275a398d42cc4dbfe5c6399c7fe3b3a0 (patch) | |
tree | 5dcc3f742539b6cfd6f3dc18b53c310c37bbf291 /test/test_tcp.py | |
parent | 292a0aa9e671748f0ad77a5e8f8f21d40314b030 (diff) | |
download | mitmproxy-daebd1bd275a398d42cc4dbfe5c6399c7fe3b3a0.tar.gz mitmproxy-daebd1bd275a398d42cc4dbfe5c6399c7fe3b3a0.tar.bz2 mitmproxy-daebd1bd275a398d42cc4dbfe5c6399c7fe3b3a0.zip |
python3++
Diffstat (limited to 'test/test_tcp.py')
-rw-r--r-- | test/test_tcp.py | 105 |
1 files changed, 53 insertions, 52 deletions
diff --git a/test/test_tcp.py b/test/test_tcp.py index 725aa8b0..c87bebb3 100644 --- a/test/test_tcp.py +++ b/test/test_tcp.py @@ -49,7 +49,7 @@ class ALPNHandler(tcp.BaseHandler): def handle(self): alp = self.get_alpn_proto_negotiated() if alp: - self.wfile.write(("%s" % alp).encode("ascii")) + self.wfile.write(alp) else: self.wfile.write(b"NONE") self.wfile.flush() @@ -59,7 +59,7 @@ class TestServer(tservers.ServerTestBase): handler = EchoHandler def test_echo(self): - testval = "echo!\n" + testval = b"echo!\n" c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() c.wfile.write(testval) @@ -81,7 +81,7 @@ class TestServerBind(tservers.ServerTestBase): class handler(tcp.BaseHandler): def handle(self): - self.wfile.write(str(self.connection.getpeername())) + self.wfile.write(str(self.connection.getpeername()).encode()) self.wfile.flush() def test_bind(self): @@ -93,7 +93,7 @@ class TestServerBind(tservers.ServerTestBase): ("127.0.0.1", self.port), source_address=( "127.0.0.1", random_port)) c.connect() - assert c.rfile.readline() == str(("127.0.0.1", random_port)) + assert c.rfile.readline() == str(("127.0.0.1", random_port)).encode() return except TcpException: # port probably already in use pass @@ -104,7 +104,7 @@ class TestServerIPv6(tservers.ServerTestBase): addr = tcp.Address(("localhost", 0), use_ipv6=True) def test_echo(self): - testval = "echo!\n" + testval = b"echo!\n" c = tcp.TCPClient(tcp.Address(("::1", self.port), use_ipv6=True)) c.connect() c.wfile.write(testval) @@ -116,7 +116,7 @@ class TestEcho(tservers.ServerTestBase): handler = EchoHandler def test_echo(self): - testval = "echo!\n" + testval = b"echo!\n" c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() c.wfile.write(testval) @@ -141,7 +141,7 @@ class TestFinishFail(tservers.ServerTestBase): def test_disconnect_in_finish(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - c.wfile.write("foo\n") + c.wfile.write(b"foo\n") c.wfile.flush = mock.Mock(side_effect=TcpDisconnect) c.finish() @@ -156,8 +156,8 @@ class TestServerSSL(tservers.ServerTestBase): def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - c.convert_to_ssl(sni="foo.com", options=SSL.OP_ALL) - testval = "echo!\n" + c.convert_to_ssl(sni=b"foo.com", options=SSL.OP_ALL) + testval = b"echo!\n" c.wfile.write(testval) c.wfile.flush() assert c.rfile.readline() == testval @@ -166,7 +166,7 @@ class TestServerSSL(tservers.ServerTestBase): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() assert not c.get_current_cipher() - c.convert_to_ssl(sni="foo.com") + c.convert_to_ssl(sni=b"foo.com") ret = c.get_current_cipher() assert ret assert "AES" in ret[0] @@ -182,7 +182,7 @@ class TestSSLv3Only(tservers.ServerTestBase): def test_failure(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - tutils.raises(TlsException, c.convert_to_ssl, sni="foo.com") + tutils.raises(TlsException, c.convert_to_ssl, sni=b"foo.com") class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase): @@ -190,7 +190,8 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase): ssl = dict( cert=tutils.test_data.path("data/verificationcerts/untrusted.crt"), - key=tutils.test_data.path("data/verificationcerts/verification-server.key")) + key=tutils.test_data.path("data/verificationcerts/verification-server.key") + ) def test_mode_default_should_pass(self): c = tcp.TCPClient(("127.0.0.1", self.port)) @@ -202,7 +203,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase): # aborted assert c.ssl_verification_error is not None - testval = "echo!\n" + testval = b"echo!\n" c.wfile.write(testval) c.wfile.flush() assert c.rfile.readline() == testval @@ -216,7 +217,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase): # Verification errors should be saved even if connection isn't aborted assert c.ssl_verification_error is not None - testval = "echo!\n" + testval = b"echo!\n" c.wfile.write(testval) c.wfile.flush() assert c.rfile.readline() == testval @@ -280,7 +281,7 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase): assert c.ssl_verification_error is None - testval = "echo!\n" + testval = b"echo!\n" c.wfile.write(testval) c.wfile.flush() assert c.rfile.readline() == testval @@ -295,7 +296,7 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase): assert c.ssl_verification_error is None - testval = "echo!\n" + testval = b"echo!\n" c.wfile.write(testval) c.wfile.flush() assert c.rfile.readline() == testval @@ -310,7 +311,7 @@ class TestSSLClientCert(tservers.ServerTestBase): self.sni = connection.get_servername() def handle(self): - self.wfile.write("%s\n" % self.clientcert.serial) + self.wfile.write(b"%d\n" % self.clientcert.serial) self.wfile.flush() ssl = dict( @@ -323,7 +324,7 @@ class TestSSLClientCert(tservers.ServerTestBase): c.connect() c.convert_to_ssl( cert=tutils.test_data.path("data/clientcert/client.pem")) - assert c.rfile.readline().strip() == "1" + assert c.rfile.readline().strip() == b"1" def test_clientcert_err(self): c = tcp.TCPClient(("127.0.0.1", self.port)) @@ -352,9 +353,9 @@ class TestSNI(tservers.ServerTestBase): def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - c.convert_to_ssl(sni="foo.com") - assert c.sni == "foo.com" - assert c.rfile.readline() == "foo.com" + c.convert_to_ssl(sni=b"foo.com") + assert c.sni == b"foo.com" + assert c.rfile.readline() == b"foo.com" class TestServerCipherList(tservers.ServerTestBase): @@ -366,8 +367,8 @@ class TestServerCipherList(tservers.ServerTestBase): def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - c.convert_to_ssl(sni="foo.com") - assert c.rfile.readline() == "['RC4-SHA']" + c.convert_to_ssl(sni=b"foo.com") + assert c.rfile.readline() == b"['RC4-SHA']" class TestServerCurrentCipher(tservers.ServerTestBase): @@ -376,7 +377,7 @@ class TestServerCurrentCipher(tservers.ServerTestBase): sni = None def handle(self): - self.wfile.write("%s" % str(self.get_current_cipher())) + self.wfile.write(str(self.get_current_cipher()).encode()) self.wfile.flush() ssl = dict( @@ -386,8 +387,8 @@ class TestServerCurrentCipher(tservers.ServerTestBase): def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - c.convert_to_ssl(sni="foo.com") - assert "RC4-SHA" in c.rfile.readline() + c.convert_to_ssl(sni=b"foo.com") + assert b"RC4-SHA" in c.rfile.readline() class TestServerCipherListError(tservers.ServerTestBase): @@ -399,7 +400,7 @@ class TestServerCipherListError(tservers.ServerTestBase): def test_echo(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - tutils.raises("handshake error", c.convert_to_ssl, sni="foo.com") + tutils.raises("handshake error", c.convert_to_ssl, sni=b"foo.com") class TestClientCipherListError(tservers.ServerTestBase): @@ -414,7 +415,7 @@ class TestClientCipherListError(tservers.ServerTestBase): tutils.raises( "cipher specification", c.convert_to_ssl, - sni="foo.com", + sni=b"foo.com", cipher_list="bogus") @@ -434,7 +435,7 @@ class TestSSLDisconnect(tservers.ServerTestBase): # Excercise SSL.ZeroReturnError c.rfile.read(10) c.close() - tutils.raises(TcpDisconnect, c.wfile.write, "foo") + tutils.raises(TcpDisconnect, c.wfile.write, b"foo") tutils.raises(queue.Empty, self.q.get_nowait) @@ -449,7 +450,7 @@ class TestSSLHardDisconnect(tservers.ServerTestBase): # Exercise SSL.SysCallError c.rfile.read(10) c.close() - tutils.raises(TcpDisconnect, c.wfile.write, "foo") + tutils.raises(TcpDisconnect, c.wfile.write, b"foo") class TestDisconnect(tservers.ServerTestBase): @@ -458,7 +459,7 @@ class TestDisconnect(tservers.ServerTestBase): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() c.rfile.read(10) - c.wfile.write("foo") + c.wfile.write(b"foo") c.close() c.close() @@ -496,7 +497,7 @@ class TestTimeOut(tservers.ServerTestBase): class TestALPNClient(tservers.ServerTestBase): handler = ALPNHandler ssl = dict( - alpn_select="bar" + alpn_select=b"bar" ) if OpenSSL._util.lib.Cryptography_HAS_ALPN: @@ -529,8 +530,8 @@ class TestNoSSLNoALPNClient(tservers.ServerTestBase): def test_no_ssl_no_alpn(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - assert c.get_alpn_proto_negotiated() == "" - assert c.rfile.readline().strip() == "NONE" + assert c.get_alpn_proto_negotiated() == b"" + assert c.rfile.readline().strip() == b"NONE" class TestSSLTimeOut(tservers.ServerTestBase): @@ -581,26 +582,26 @@ class TestFileLike: s = BytesIO(b"1234567890abcdefghijklmnopqrstuvwxyz") s = tcp.Reader(s) s.BLOCKSIZE = 2 - assert s.read(1) == "1" - assert s.read(2) == "23" - assert s.read(3) == "456" - assert s.read(4) == "7890" + assert s.read(1) == b"1" + assert s.read(2) == b"23" + assert s.read(3) == b"456" + assert s.read(4) == b"7890" d = s.read(-1) - assert d.startswith("abc") and d.endswith("xyz") + assert d.startswith(b"abc") and d.endswith(b"xyz") def test_wrap(self): s = BytesIO(b"foobar\nfoobar") s.flush() s = tcp.Reader(s) - assert s.readline() == "foobar\n" - assert s.readline() == "foobar" + assert s.readline() == b"foobar\n" + assert s.readline() == b"foobar" # Test __getattr__ assert s.isatty def test_limit(self): s = BytesIO(b"foobar\nfoobar") s = tcp.Reader(s) - assert s.readline(3) == "foo" + assert s.readline(3) == b"foo" def test_limitless(self): s = BytesIO(b"f" * (50 * 1024)) @@ -615,13 +616,13 @@ class TestFileLike: s.start_log() assert s.is_logging() s.readline() - assert s.get_log() == "foobar\n" + assert s.get_log() == b"foobar\n" s.read(1) - assert s.get_log() == "foobar\nf" + assert s.get_log() == b"foobar\nf" s.start_log() - assert s.get_log() == "" + assert s.get_log() == b"" s.read(1) - assert s.get_log() == "o" + assert s.get_log() == b"o" s.stop_log() tutils.raises(ValueError, s.get_log) @@ -630,10 +631,10 @@ class TestFileLike: s = tcp.Writer(s) s.start_log() assert s.is_logging() - s.write("x") - assert s.get_log() == "x" - s.write("x") - assert s.get_log() == "xx" + s.write(b"x") + assert s.get_log() == b"x" + s.write(b"x") + assert s.get_log() == b"xx" def test_writer_flush_error(self): s = BytesIO() @@ -721,7 +722,7 @@ class TestSSLKeyLogger(tservers.ServerTestBase): ) def test_log(self): - testval = "echo!\n" + testval = b"echo!\n" _logfun = tcp.log_ssl_key with tutils.tmpdir() as d: @@ -738,7 +739,7 @@ class TestSSLKeyLogger(tservers.ServerTestBase): tcp.log_ssl_key.close() with open(logfile, "rb") as f: - assert f.read().count("CLIENT_RANDOM") == 2 + assert f.read().count(b"CLIENT_RANDOM") == 2 tcp.log_ssl_key = _logfun |