aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_tcp.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_tcp.py')
-rw-r--r--test/test_tcp.py105
1 files changed, 53 insertions, 52 deletions
diff --git a/test/test_tcp.py b/test/test_tcp.py
index 725aa8b0..c87bebb3 100644
--- a/test/test_tcp.py
+++ b/test/test_tcp.py
@@ -49,7 +49,7 @@ class ALPNHandler(tcp.BaseHandler):
def handle(self):
alp = self.get_alpn_proto_negotiated()
if alp:
- self.wfile.write(("%s" % alp).encode("ascii"))
+ self.wfile.write(alp)
else:
self.wfile.write(b"NONE")
self.wfile.flush()
@@ -59,7 +59,7 @@ class TestServer(tservers.ServerTestBase):
handler = EchoHandler
def test_echo(self):
- testval = "echo!\n"
+ testval = b"echo!\n"
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
c.wfile.write(testval)
@@ -81,7 +81,7 @@ class TestServerBind(tservers.ServerTestBase):
class handler(tcp.BaseHandler):
def handle(self):
- self.wfile.write(str(self.connection.getpeername()))
+ self.wfile.write(str(self.connection.getpeername()).encode())
self.wfile.flush()
def test_bind(self):
@@ -93,7 +93,7 @@ class TestServerBind(tservers.ServerTestBase):
("127.0.0.1", self.port), source_address=(
"127.0.0.1", random_port))
c.connect()
- assert c.rfile.readline() == str(("127.0.0.1", random_port))
+ assert c.rfile.readline() == str(("127.0.0.1", random_port)).encode()
return
except TcpException: # port probably already in use
pass
@@ -104,7 +104,7 @@ class TestServerIPv6(tservers.ServerTestBase):
addr = tcp.Address(("localhost", 0), use_ipv6=True)
def test_echo(self):
- testval = "echo!\n"
+ testval = b"echo!\n"
c = tcp.TCPClient(tcp.Address(("::1", self.port), use_ipv6=True))
c.connect()
c.wfile.write(testval)
@@ -116,7 +116,7 @@ class TestEcho(tservers.ServerTestBase):
handler = EchoHandler
def test_echo(self):
- testval = "echo!\n"
+ testval = b"echo!\n"
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
c.wfile.write(testval)
@@ -141,7 +141,7 @@ class TestFinishFail(tservers.ServerTestBase):
def test_disconnect_in_finish(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- c.wfile.write("foo\n")
+ c.wfile.write(b"foo\n")
c.wfile.flush = mock.Mock(side_effect=TcpDisconnect)
c.finish()
@@ -156,8 +156,8 @@ class TestServerSSL(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- c.convert_to_ssl(sni="foo.com", options=SSL.OP_ALL)
- testval = "echo!\n"
+ c.convert_to_ssl(sni=b"foo.com", options=SSL.OP_ALL)
+ testval = b"echo!\n"
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
@@ -166,7 +166,7 @@ class TestServerSSL(tservers.ServerTestBase):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
assert not c.get_current_cipher()
- c.convert_to_ssl(sni="foo.com")
+ c.convert_to_ssl(sni=b"foo.com")
ret = c.get_current_cipher()
assert ret
assert "AES" in ret[0]
@@ -182,7 +182,7 @@ class TestSSLv3Only(tservers.ServerTestBase):
def test_failure(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- tutils.raises(TlsException, c.convert_to_ssl, sni="foo.com")
+ tutils.raises(TlsException, c.convert_to_ssl, sni=b"foo.com")
class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
@@ -190,7 +190,8 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
ssl = dict(
cert=tutils.test_data.path("data/verificationcerts/untrusted.crt"),
- key=tutils.test_data.path("data/verificationcerts/verification-server.key"))
+ key=tutils.test_data.path("data/verificationcerts/verification-server.key")
+ )
def test_mode_default_should_pass(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
@@ -202,7 +203,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
# aborted
assert c.ssl_verification_error is not None
- testval = "echo!\n"
+ testval = b"echo!\n"
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
@@ -216,7 +217,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
# Verification errors should be saved even if connection isn't aborted
assert c.ssl_verification_error is not None
- testval = "echo!\n"
+ testval = b"echo!\n"
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
@@ -280,7 +281,7 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
assert c.ssl_verification_error is None
- testval = "echo!\n"
+ testval = b"echo!\n"
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
@@ -295,7 +296,7 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
assert c.ssl_verification_error is None
- testval = "echo!\n"
+ testval = b"echo!\n"
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
@@ -310,7 +311,7 @@ class TestSSLClientCert(tservers.ServerTestBase):
self.sni = connection.get_servername()
def handle(self):
- self.wfile.write("%s\n" % self.clientcert.serial)
+ self.wfile.write(b"%d\n" % self.clientcert.serial)
self.wfile.flush()
ssl = dict(
@@ -323,7 +324,7 @@ class TestSSLClientCert(tservers.ServerTestBase):
c.connect()
c.convert_to_ssl(
cert=tutils.test_data.path("data/clientcert/client.pem"))
- assert c.rfile.readline().strip() == "1"
+ assert c.rfile.readline().strip() == b"1"
def test_clientcert_err(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
@@ -352,9 +353,9 @@ class TestSNI(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- c.convert_to_ssl(sni="foo.com")
- assert c.sni == "foo.com"
- assert c.rfile.readline() == "foo.com"
+ c.convert_to_ssl(sni=b"foo.com")
+ assert c.sni == b"foo.com"
+ assert c.rfile.readline() == b"foo.com"
class TestServerCipherList(tservers.ServerTestBase):
@@ -366,8 +367,8 @@ class TestServerCipherList(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- c.convert_to_ssl(sni="foo.com")
- assert c.rfile.readline() == "['RC4-SHA']"
+ c.convert_to_ssl(sni=b"foo.com")
+ assert c.rfile.readline() == b"['RC4-SHA']"
class TestServerCurrentCipher(tservers.ServerTestBase):
@@ -376,7 +377,7 @@ class TestServerCurrentCipher(tservers.ServerTestBase):
sni = None
def handle(self):
- self.wfile.write("%s" % str(self.get_current_cipher()))
+ self.wfile.write(str(self.get_current_cipher()).encode())
self.wfile.flush()
ssl = dict(
@@ -386,8 +387,8 @@ class TestServerCurrentCipher(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- c.convert_to_ssl(sni="foo.com")
- assert "RC4-SHA" in c.rfile.readline()
+ c.convert_to_ssl(sni=b"foo.com")
+ assert b"RC4-SHA" in c.rfile.readline()
class TestServerCipherListError(tservers.ServerTestBase):
@@ -399,7 +400,7 @@ class TestServerCipherListError(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- tutils.raises("handshake error", c.convert_to_ssl, sni="foo.com")
+ tutils.raises("handshake error", c.convert_to_ssl, sni=b"foo.com")
class TestClientCipherListError(tservers.ServerTestBase):
@@ -414,7 +415,7 @@ class TestClientCipherListError(tservers.ServerTestBase):
tutils.raises(
"cipher specification",
c.convert_to_ssl,
- sni="foo.com",
+ sni=b"foo.com",
cipher_list="bogus")
@@ -434,7 +435,7 @@ class TestSSLDisconnect(tservers.ServerTestBase):
# Excercise SSL.ZeroReturnError
c.rfile.read(10)
c.close()
- tutils.raises(TcpDisconnect, c.wfile.write, "foo")
+ tutils.raises(TcpDisconnect, c.wfile.write, b"foo")
tutils.raises(queue.Empty, self.q.get_nowait)
@@ -449,7 +450,7 @@ class TestSSLHardDisconnect(tservers.ServerTestBase):
# Exercise SSL.SysCallError
c.rfile.read(10)
c.close()
- tutils.raises(TcpDisconnect, c.wfile.write, "foo")
+ tutils.raises(TcpDisconnect, c.wfile.write, b"foo")
class TestDisconnect(tservers.ServerTestBase):
@@ -458,7 +459,7 @@ class TestDisconnect(tservers.ServerTestBase):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
c.rfile.read(10)
- c.wfile.write("foo")
+ c.wfile.write(b"foo")
c.close()
c.close()
@@ -496,7 +497,7 @@ class TestTimeOut(tservers.ServerTestBase):
class TestALPNClient(tservers.ServerTestBase):
handler = ALPNHandler
ssl = dict(
- alpn_select="bar"
+ alpn_select=b"bar"
)
if OpenSSL._util.lib.Cryptography_HAS_ALPN:
@@ -529,8 +530,8 @@ class TestNoSSLNoALPNClient(tservers.ServerTestBase):
def test_no_ssl_no_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- assert c.get_alpn_proto_negotiated() == ""
- assert c.rfile.readline().strip() == "NONE"
+ assert c.get_alpn_proto_negotiated() == b""
+ assert c.rfile.readline().strip() == b"NONE"
class TestSSLTimeOut(tservers.ServerTestBase):
@@ -581,26 +582,26 @@ class TestFileLike:
s = BytesIO(b"1234567890abcdefghijklmnopqrstuvwxyz")
s = tcp.Reader(s)
s.BLOCKSIZE = 2
- assert s.read(1) == "1"
- assert s.read(2) == "23"
- assert s.read(3) == "456"
- assert s.read(4) == "7890"
+ assert s.read(1) == b"1"
+ assert s.read(2) == b"23"
+ assert s.read(3) == b"456"
+ assert s.read(4) == b"7890"
d = s.read(-1)
- assert d.startswith("abc") and d.endswith("xyz")
+ assert d.startswith(b"abc") and d.endswith(b"xyz")
def test_wrap(self):
s = BytesIO(b"foobar\nfoobar")
s.flush()
s = tcp.Reader(s)
- assert s.readline() == "foobar\n"
- assert s.readline() == "foobar"
+ assert s.readline() == b"foobar\n"
+ assert s.readline() == b"foobar"
# Test __getattr__
assert s.isatty
def test_limit(self):
s = BytesIO(b"foobar\nfoobar")
s = tcp.Reader(s)
- assert s.readline(3) == "foo"
+ assert s.readline(3) == b"foo"
def test_limitless(self):
s = BytesIO(b"f" * (50 * 1024))
@@ -615,13 +616,13 @@ class TestFileLike:
s.start_log()
assert s.is_logging()
s.readline()
- assert s.get_log() == "foobar\n"
+ assert s.get_log() == b"foobar\n"
s.read(1)
- assert s.get_log() == "foobar\nf"
+ assert s.get_log() == b"foobar\nf"
s.start_log()
- assert s.get_log() == ""
+ assert s.get_log() == b""
s.read(1)
- assert s.get_log() == "o"
+ assert s.get_log() == b"o"
s.stop_log()
tutils.raises(ValueError, s.get_log)
@@ -630,10 +631,10 @@ class TestFileLike:
s = tcp.Writer(s)
s.start_log()
assert s.is_logging()
- s.write("x")
- assert s.get_log() == "x"
- s.write("x")
- assert s.get_log() == "xx"
+ s.write(b"x")
+ assert s.get_log() == b"x"
+ s.write(b"x")
+ assert s.get_log() == b"xx"
def test_writer_flush_error(self):
s = BytesIO()
@@ -721,7 +722,7 @@ class TestSSLKeyLogger(tservers.ServerTestBase):
)
def test_log(self):
- testval = "echo!\n"
+ testval = b"echo!\n"
_logfun = tcp.log_ssl_key
with tutils.tmpdir() as d:
@@ -738,7 +739,7 @@ class TestSSLKeyLogger(tservers.ServerTestBase):
tcp.log_ssl_key.close()
with open(logfile, "rb") as f:
- assert f.read().count("CLIENT_RANDOM") == 2
+ assert f.read().count(b"CLIENT_RANDOM") == 2
tcp.log_ssl_key = _logfun