diff options
author | Aldo Cortesi <aldo@nullcube.com> | 2016-10-20 11:02:52 +1300 |
---|---|---|
committer | Aldo Cortesi <aldo@nullcube.com> | 2016-10-20 11:02:52 +1300 |
commit | f964d49853a3f0d22e0f6d4cff7cfbc49008e40e (patch) | |
tree | 6aef6ca8942dccc8c879e851f99afa7c0a1e2cb7 /test | |
parent | 9870844b38c84e7446b15909758497cecb26301e (diff) | |
download | mitmproxy-f964d49853a3f0d22e0f6d4cff7cfbc49008e40e.tar.gz mitmproxy-f964d49853a3f0d22e0f6d4cff7cfbc49008e40e.tar.bz2 mitmproxy-f964d49853a3f0d22e0f6d4cff7cfbc49008e40e.zip |
netlib.certutils -> mitmproxy.certs
Diffstat (limited to 'test')
-rw-r--r-- | test/mitmproxy/test_certs.py (renamed from test/netlib/test_certutils.py) | 41 | ||||
-rw-r--r-- | test/mitmproxy/test_server.py | 33 | ||||
-rw-r--r-- | test/netlib/test_tcp.py | 53 |
3 files changed, 66 insertions, 61 deletions
diff --git a/test/netlib/test_certutils.py b/test/mitmproxy/test_certs.py index cf9a671b..35407fd6 100644 --- a/test/netlib/test_certutils.py +++ b/test/mitmproxy/test_certs.py @@ -1,9 +1,10 @@ import os -from netlib import certutils, tutils +from mitmproxy import certs +from netlib import tutils # class TestDNTree: # def test_simple(self): -# d = certutils.DNTree() +# d = certs.DNTree() # d.add("foo.com", "foo") # d.add("bar.com", "bar") # assert d.get("foo.com") == "foo" @@ -19,12 +20,12 @@ from netlib import certutils, tutils # assert d.get("foo.foo.match.org") == "match" # # def test_wildcard(self): -# d = certutils.DNTree() +# d = certs.DNTree() # d.add("foo.com", "foo") # assert not d.get("*.foo.com") # d.add("*.foo.com", "wild") # -# d = certutils.DNTree() +# d = certs.DNTree() # d.add("*", "foo") # assert d.get("foo.com") == "foo" # assert d.get("*.foo.com") == "foo" @@ -35,22 +36,22 @@ class TestCertStore: def test_create_explicit(self): with tutils.tmpdir() as d: - ca = certutils.CertStore.from_store(d, "test") + ca = certs.CertStore.from_store(d, "test") assert ca.get_cert(b"foo", []) - ca2 = certutils.CertStore.from_store(d, "test") + ca2 = certs.CertStore.from_store(d, "test") assert ca2.get_cert(b"foo", []) assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() def test_create_no_common_name(self): with tutils.tmpdir() as d: - ca = certutils.CertStore.from_store(d, "test") + ca = certs.CertStore.from_store(d, "test") assert ca.get_cert(None, [])[0].cn is None def test_create_tmp(self): with tutils.tmpdir() as d: - ca = certutils.CertStore.from_store(d, "test") + ca = certs.CertStore.from_store(d, "test") assert ca.get_cert(b"foo.com", []) assert ca.get_cert(b"foo.com", []) assert ca.get_cert(b"*.foo.com", []) @@ -60,7 +61,7 @@ class TestCertStore: def test_sans(self): with tutils.tmpdir() as d: - ca = certutils.CertStore.from_store(d, "test") + ca = certs.CertStore.from_store(d, "test") c1 = ca.get_cert(b"foo.com", [b"*.bar.com"]) ca.get_cert(b"foo.bar.com", []) # assert c1 == c2 @@ -69,14 +70,14 @@ class TestCertStore: def test_sans_change(self): with tutils.tmpdir() as d: - ca = certutils.CertStore.from_store(d, "test") + ca = certs.CertStore.from_store(d, "test") ca.get_cert(b"foo.com", [b"*.bar.com"]) cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"]) assert b"*.baz.com" in cert.altnames def test_expire(self): with tutils.tmpdir() as d: - ca = certutils.CertStore.from_store(d, "test") + ca = certs.CertStore.from_store(d, "test") ca.STORE_CAP = 3 ca.get_cert(b"one.com", []) ca.get_cert(b"two.com", []) @@ -101,8 +102,8 @@ class TestCertStore: def test_overrides(self): with tutils.tmpdir() as d: - ca1 = certutils.CertStore.from_store(os.path.join(d, "ca1"), "test") - ca2 = certutils.CertStore.from_store(os.path.join(d, "ca2"), "test") + ca1 = certs.CertStore.from_store(os.path.join(d, "ca1"), "test") + ca2 = certs.CertStore.from_store(os.path.join(d, "ca2"), "test") assert not ca1.default_ca.get_serial_number( ) == ca2.default_ca.get_serial_number() @@ -121,8 +122,8 @@ class TestDummyCert: def test_with_ca(self): with tutils.tmpdir() as d: - ca = certutils.CertStore.from_store(d, "test") - r = certutils.dummy_cert( + ca = certs.CertStore.from_store(d, "test") + r = certs.dummy_cert( ca.default_privatekey, ca.default_ca, b"foo.com", @@ -130,7 +131,7 @@ class TestDummyCert: ) assert r.cn == b"foo.com" - r = certutils.dummy_cert( + r = certs.dummy_cert( ca.default_privatekey, ca.default_ca, None, @@ -144,13 +145,13 @@ class TestSSLCert: def test_simple(self): with open(tutils.test_data.path("data/text_cert"), "rb") as f: d = f.read() - c1 = certutils.SSLCert.from_pem(d) + c1 = certs.SSLCert.from_pem(d) assert c1.cn == b"google.com" assert len(c1.altnames) == 436 with open(tutils.test_data.path("data/text_cert_2"), "rb") as f: d = f.read() - c2 = certutils.SSLCert.from_pem(d) + c2 = certs.SSLCert.from_pem(d) assert c2.cn == b"www.inode.co.nz" assert len(c2.altnames) == 2 assert c2.digest("sha1") @@ -169,12 +170,12 @@ class TestSSLCert: def test_err_broken_sans(self): with open(tutils.test_data.path("data/text_cert_weird1"), "rb") as f: d = f.read() - c = certutils.SSLCert.from_pem(d) + c = certs.SSLCert.from_pem(d) # This breaks unless we ignore a decoding error. assert c.altnames is not None def test_der(self): with open(tutils.test_data.path("data/dercert"), "rb") as f: d = f.read() - s = certutils.SSLCert.from_der(d) + s = certs.SSLCert.from_der(d) assert s.cn diff --git a/test/mitmproxy/test_server.py b/test/mitmproxy/test_server.py index 93a82954..cadc67a8 100644 --- a/test/mitmproxy/test_server.py +++ b/test/mitmproxy/test_server.py @@ -9,13 +9,16 @@ from mitmproxy.addons import script from mitmproxy import http from mitmproxy.proxy.config import HostMatcher, parse_server_spec import netlib.http -from netlib import tcp, socks -from netlib.certutils import SSLCert -from netlib.exceptions import HttpReadDisconnect, HttpException -from netlib.http import authentication, http1 +from netlib import tcp +from netlib import socks +from mitmproxy import certs +from netlib import exceptions +from netlib.http import authentication +from netlib.http import http1 from netlib.tcp import Address from netlib.tutils import raises -from pathod import pathoc, pathod +from pathod import pathoc +from pathod import pathod from . import tutils, tservers @@ -144,9 +147,9 @@ class TcpMixin: # Test that we get the original SSL cert if self.ssl: - i_cert = SSLCert(i.sslinfo.certchain[0]) - i2_cert = SSLCert(i2.sslinfo.certchain[0]) - n_cert = SSLCert(n.sslinfo.certchain[0]) + i_cert = certs.SSLCert(i.sslinfo.certchain[0]) + i2_cert = certs.SSLCert(i2.sslinfo.certchain[0]) + n_cert = certs.SSLCert(n.sslinfo.certchain[0]) assert i_cert == i2_cert assert i_cert != n_cert @@ -156,7 +159,7 @@ class TcpMixin: # mitmproxy responds with bad gateway assert self.pathod(spec).status_code == 502 self._ignore_on() - with raises(HttpException): + with raises(exceptions.HttpException): self.pathod(spec) # pathoc tries to parse answer as HTTP self._ignore_off() @@ -190,9 +193,9 @@ class TcpMixin: # Test that we get the original SSL cert if self.ssl: - i_cert = SSLCert(i.sslinfo.certchain[0]) - i2_cert = SSLCert(i2.sslinfo.certchain[0]) - n_cert = SSLCert(n.sslinfo.certchain[0]) + i_cert = certs.SSLCert(i.sslinfo.certchain[0]) + i2_cert = certs.SSLCert(i2.sslinfo.certchain[0]) + n_cert = certs.SSLCert(n.sslinfo.certchain[0]) assert i_cert == i2_cert == n_cert @@ -830,7 +833,7 @@ class TestKillRequest(tservers.HTTPProxyTest): masterclass = MasterKillRequest def test_kill(self): - with raises(HttpReadDisconnect): + with raises(exceptions.HttpReadDisconnect): self.pathod("200") # Nothing should have hit the server assert not self.server.last_log() @@ -847,7 +850,7 @@ class TestKillResponse(tservers.HTTPProxyTest): masterclass = MasterKillResponse def test_kill(self): - with raises(HttpReadDisconnect): + with raises(exceptions.HttpReadDisconnect): self.pathod("200") # The server should have seen a request assert self.server.last_log() @@ -1050,7 +1053,7 @@ class AddUpstreamCertsToClientChainMixin: def test_add_upstream_certs_to_client_chain(self): with open(self.servercert, "rb") as f: d = f.read() - upstreamCert = SSLCert.from_pem(d) + upstreamCert = certs.SSLCert.from_pem(d) p = self.pathoc() with p.connect(): upstream_cert_found_in_client_chain = False diff --git a/test/netlib/test_tcp.py b/test/netlib/test_tcp.py index 797a5a04..2c1b92dc 100644 --- a/test/netlib/test_tcp.py +++ b/test/netlib/test_tcp.py @@ -9,9 +9,10 @@ import mock from OpenSSL import SSL -from netlib import tcp, certutils, tutils -from netlib.exceptions import InvalidCertificateException, TcpReadIncomplete, TlsException, \ - TcpTimeout, TcpDisconnect, TcpException, NetlibException +from mitmproxy import certs +from netlib import tcp +from netlib import tutils +from netlib import exceptions from . import tservers @@ -108,7 +109,7 @@ class TestServerBind(tservers.ServerTestBase): with c.connect(): assert c.rfile.readline() == str(("127.0.0.1", random_port)).encode() return - except TcpException: # port probably already in use + except exceptions.TcpException: # port probably already in use pass @@ -155,7 +156,7 @@ class TestFinishFail(tservers.ServerTestBase): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): c.wfile.write(b"foo\n") - c.wfile.flush = mock.Mock(side_effect=TcpDisconnect) + c.wfile.flush = mock.Mock(side_effect=exceptions.TcpDisconnect) c.finish() @@ -195,7 +196,7 @@ class TestSSLv3Only(tservers.ServerTestBase): def test_failure(self): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): - tutils.raises(TlsException, c.convert_to_ssl, sni="foo.com") + tutils.raises(exceptions.TlsException, c.convert_to_ssl, sni="foo.com") class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase): @@ -236,7 +237,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase): def test_mode_strict_should_fail(self): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): - with tutils.raises(InvalidCertificateException): + with tutils.raises(exceptions.InvalidCertificateException): c.convert_to_ssl( sni="example.mitmproxy.org", verify_options=SSL.VERIFY_PEER, @@ -261,7 +262,7 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase): def test_should_fail_without_sni(self): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): - with tutils.raises(TlsException): + with tutils.raises(exceptions.TlsException): c.convert_to_ssl( verify_options=SSL.VERIFY_PEER, ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt") @@ -270,7 +271,7 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase): def test_should_fail(self): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): - with tutils.raises(InvalidCertificateException): + with tutils.raises(exceptions.InvalidCertificateException): c.convert_to_ssl( sni="mitmproxy.org", verify_options=SSL.VERIFY_PEER, @@ -348,7 +349,7 @@ class TestSSLClientCert(tservers.ServerTestBase): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): tutils.raises( - TlsException, + exceptions.TlsException, c.convert_to_ssl, cert=tutils.test_data.path("data/clientcert/make") ) @@ -454,7 +455,7 @@ class TestSSLDisconnect(tservers.ServerTestBase): # Excercise SSL.ZeroReturnError c.rfile.read(10) c.close() - tutils.raises(TcpDisconnect, c.wfile.write, b"foo") + tutils.raises(exceptions.TcpDisconnect, c.wfile.write, b"foo") tutils.raises(queue.Empty, self.q.get_nowait) @@ -469,7 +470,7 @@ class TestSSLHardDisconnect(tservers.ServerTestBase): # Exercise SSL.SysCallError c.rfile.read(10) c.close() - tutils.raises(TcpDisconnect, c.wfile.write, b"foo") + tutils.raises(exceptions.TcpDisconnect, c.wfile.write, b"foo") class TestDisconnect(tservers.ServerTestBase): @@ -492,7 +493,7 @@ class TestServerTimeOut(tservers.ServerTestBase): self.settimeout(0.01) try: self.rfile.read(10) - except TcpTimeout: + except exceptions.TcpTimeout: self.timeout = True def test_timeout(self): @@ -510,7 +511,7 @@ class TestTimeOut(tservers.ServerTestBase): with c.connect(): c.settimeout(0.1) assert c.gettimeout() == 0.1 - tutils.raises(TcpTimeout, c.rfile.read, 10) + tutils.raises(exceptions.TcpTimeout, c.rfile.read, 10) class TestALPNClient(tservers.ServerTestBase): @@ -562,13 +563,13 @@ class TestSSLTimeOut(tservers.ServerTestBase): with c.connect(): c.convert_to_ssl() c.settimeout(0.1) - tutils.raises(TcpTimeout, c.rfile.read, 10) + tutils.raises(exceptions.TcpTimeout, c.rfile.read, 10) class TestDHParams(tservers.ServerTestBase): handler = HangHandler ssl = dict( - dhparams=certutils.CertStore.load_dhparam( + dhparams=certs.CertStore.load_dhparam( tutils.test_data.path("data/dhparam.pem"), ), cipher_list="DHE-RSA-AES256-SHA" @@ -584,7 +585,7 @@ class TestDHParams(tservers.ServerTestBase): def test_create_dhparams(self): with tutils.tmpdir() as d: filename = os.path.join(d, "dhparam.pem") - certutils.CertStore.load_dhparam(filename) + certs.CertStore.load_dhparam(filename) assert os.path.exists(filename) @@ -592,7 +593,7 @@ class TestTCPClient: def test_conerr(self): c = tcp.TCPClient(("127.0.0.1", 0)) - tutils.raises(TcpException, c.connect) + tutils.raises(exceptions.TcpException, c.connect) class TestFileLike: @@ -661,7 +662,7 @@ class TestFileLike: o = mock.MagicMock() o.flush = mock.MagicMock(side_effect=socket.error) s.o = o - tutils.raises(TcpDisconnect, s.flush) + tutils.raises(exceptions.TcpDisconnect, s.flush) def test_reader_read_error(self): s = BytesIO(b"foobar\nfoobar") @@ -669,7 +670,7 @@ class TestFileLike: o = mock.MagicMock() o.read = mock.MagicMock(side_effect=socket.error) s.o = o - tutils.raises(TcpDisconnect, s.read, 10) + tutils.raises(exceptions.TcpDisconnect, s.read, 10) def test_reset_timestamps(self): s = BytesIO(b"foobar\nfoobar") @@ -700,24 +701,24 @@ class TestFileLike: s = mock.MagicMock() s.read = mock.MagicMock(side_effect=SSL.Error()) s = tcp.Reader(s) - tutils.raises(TlsException, s.read, 1) + tutils.raises(exceptions.TlsException, s.read, 1) def test_read_syscall_ssl_error(self): s = mock.MagicMock() s.read = mock.MagicMock(side_effect=SSL.SysCallError()) s = tcp.Reader(s) - tutils.raises(TlsException, s.read, 1) + tutils.raises(exceptions.TlsException, s.read, 1) def test_reader_readline_disconnect(self): o = mock.MagicMock() o.read = mock.MagicMock(side_effect=socket.error) s = tcp.Reader(o) - tutils.raises(TcpDisconnect, s.readline, 10) + tutils.raises(exceptions.TcpDisconnect, s.readline, 10) def test_reader_incomplete_error(self): s = BytesIO(b"foobar") s = tcp.Reader(s) - tutils.raises(TcpReadIncomplete, s.safe_read, 10) + tutils.raises(exceptions.TcpReadIncomplete, s.safe_read, 10) class TestPeek(tservers.ServerTestBase): @@ -738,11 +739,11 @@ class TestPeek(tservers.ServerTestBase): assert c.rfile.readline() == testval c.close() - with tutils.raises(NetlibException): + with tutils.raises(exceptions.NetlibException): if c.rfile.peek(1) == b"": # Workaround for Python 2 on Unix: # Peeking a closed connection does not raise an exception here. - raise NetlibException() + raise exceptions.NetlibException() class TestPeekSSL(TestPeek): |