aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@nullcube.com>2016-10-20 11:02:52 +1300
committerAldo Cortesi <aldo@nullcube.com>2016-10-20 11:02:52 +1300
commitf964d49853a3f0d22e0f6d4cff7cfbc49008e40e (patch)
tree6aef6ca8942dccc8c879e851f99afa7c0a1e2cb7 /test
parent9870844b38c84e7446b15909758497cecb26301e (diff)
downloadmitmproxy-f964d49853a3f0d22e0f6d4cff7cfbc49008e40e.tar.gz
mitmproxy-f964d49853a3f0d22e0f6d4cff7cfbc49008e40e.tar.bz2
mitmproxy-f964d49853a3f0d22e0f6d4cff7cfbc49008e40e.zip
netlib.certutils -> mitmproxy.certs
Diffstat (limited to 'test')
-rw-r--r--test/mitmproxy/test_certs.py (renamed from test/netlib/test_certutils.py)41
-rw-r--r--test/mitmproxy/test_server.py33
-rw-r--r--test/netlib/test_tcp.py53
3 files changed, 66 insertions, 61 deletions
diff --git a/test/netlib/test_certutils.py b/test/mitmproxy/test_certs.py
index cf9a671b..35407fd6 100644
--- a/test/netlib/test_certutils.py
+++ b/test/mitmproxy/test_certs.py
@@ -1,9 +1,10 @@
import os
-from netlib import certutils, tutils
+from mitmproxy import certs
+from netlib import tutils
# class TestDNTree:
# def test_simple(self):
-# d = certutils.DNTree()
+# d = certs.DNTree()
# d.add("foo.com", "foo")
# d.add("bar.com", "bar")
# assert d.get("foo.com") == "foo"
@@ -19,12 +20,12 @@ from netlib import certutils, tutils
# assert d.get("foo.foo.match.org") == "match"
#
# def test_wildcard(self):
-# d = certutils.DNTree()
+# d = certs.DNTree()
# d.add("foo.com", "foo")
# assert not d.get("*.foo.com")
# d.add("*.foo.com", "wild")
#
-# d = certutils.DNTree()
+# d = certs.DNTree()
# d.add("*", "foo")
# assert d.get("foo.com") == "foo"
# assert d.get("*.foo.com") == "foo"
@@ -35,22 +36,22 @@ class TestCertStore:
def test_create_explicit(self):
with tutils.tmpdir() as d:
- ca = certutils.CertStore.from_store(d, "test")
+ ca = certs.CertStore.from_store(d, "test")
assert ca.get_cert(b"foo", [])
- ca2 = certutils.CertStore.from_store(d, "test")
+ ca2 = certs.CertStore.from_store(d, "test")
assert ca2.get_cert(b"foo", [])
assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
def test_create_no_common_name(self):
with tutils.tmpdir() as d:
- ca = certutils.CertStore.from_store(d, "test")
+ ca = certs.CertStore.from_store(d, "test")
assert ca.get_cert(None, [])[0].cn is None
def test_create_tmp(self):
with tutils.tmpdir() as d:
- ca = certutils.CertStore.from_store(d, "test")
+ ca = certs.CertStore.from_store(d, "test")
assert ca.get_cert(b"foo.com", [])
assert ca.get_cert(b"foo.com", [])
assert ca.get_cert(b"*.foo.com", [])
@@ -60,7 +61,7 @@ class TestCertStore:
def test_sans(self):
with tutils.tmpdir() as d:
- ca = certutils.CertStore.from_store(d, "test")
+ ca = certs.CertStore.from_store(d, "test")
c1 = ca.get_cert(b"foo.com", [b"*.bar.com"])
ca.get_cert(b"foo.bar.com", [])
# assert c1 == c2
@@ -69,14 +70,14 @@ class TestCertStore:
def test_sans_change(self):
with tutils.tmpdir() as d:
- ca = certutils.CertStore.from_store(d, "test")
+ ca = certs.CertStore.from_store(d, "test")
ca.get_cert(b"foo.com", [b"*.bar.com"])
cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"])
assert b"*.baz.com" in cert.altnames
def test_expire(self):
with tutils.tmpdir() as d:
- ca = certutils.CertStore.from_store(d, "test")
+ ca = certs.CertStore.from_store(d, "test")
ca.STORE_CAP = 3
ca.get_cert(b"one.com", [])
ca.get_cert(b"two.com", [])
@@ -101,8 +102,8 @@ class TestCertStore:
def test_overrides(self):
with tutils.tmpdir() as d:
- ca1 = certutils.CertStore.from_store(os.path.join(d, "ca1"), "test")
- ca2 = certutils.CertStore.from_store(os.path.join(d, "ca2"), "test")
+ ca1 = certs.CertStore.from_store(os.path.join(d, "ca1"), "test")
+ ca2 = certs.CertStore.from_store(os.path.join(d, "ca2"), "test")
assert not ca1.default_ca.get_serial_number(
) == ca2.default_ca.get_serial_number()
@@ -121,8 +122,8 @@ class TestDummyCert:
def test_with_ca(self):
with tutils.tmpdir() as d:
- ca = certutils.CertStore.from_store(d, "test")
- r = certutils.dummy_cert(
+ ca = certs.CertStore.from_store(d, "test")
+ r = certs.dummy_cert(
ca.default_privatekey,
ca.default_ca,
b"foo.com",
@@ -130,7 +131,7 @@ class TestDummyCert:
)
assert r.cn == b"foo.com"
- r = certutils.dummy_cert(
+ r = certs.dummy_cert(
ca.default_privatekey,
ca.default_ca,
None,
@@ -144,13 +145,13 @@ class TestSSLCert:
def test_simple(self):
with open(tutils.test_data.path("data/text_cert"), "rb") as f:
d = f.read()
- c1 = certutils.SSLCert.from_pem(d)
+ c1 = certs.SSLCert.from_pem(d)
assert c1.cn == b"google.com"
assert len(c1.altnames) == 436
with open(tutils.test_data.path("data/text_cert_2"), "rb") as f:
d = f.read()
- c2 = certutils.SSLCert.from_pem(d)
+ c2 = certs.SSLCert.from_pem(d)
assert c2.cn == b"www.inode.co.nz"
assert len(c2.altnames) == 2
assert c2.digest("sha1")
@@ -169,12 +170,12 @@ class TestSSLCert:
def test_err_broken_sans(self):
with open(tutils.test_data.path("data/text_cert_weird1"), "rb") as f:
d = f.read()
- c = certutils.SSLCert.from_pem(d)
+ c = certs.SSLCert.from_pem(d)
# This breaks unless we ignore a decoding error.
assert c.altnames is not None
def test_der(self):
with open(tutils.test_data.path("data/dercert"), "rb") as f:
d = f.read()
- s = certutils.SSLCert.from_der(d)
+ s = certs.SSLCert.from_der(d)
assert s.cn
diff --git a/test/mitmproxy/test_server.py b/test/mitmproxy/test_server.py
index 93a82954..cadc67a8 100644
--- a/test/mitmproxy/test_server.py
+++ b/test/mitmproxy/test_server.py
@@ -9,13 +9,16 @@ from mitmproxy.addons import script
from mitmproxy import http
from mitmproxy.proxy.config import HostMatcher, parse_server_spec
import netlib.http
-from netlib import tcp, socks
-from netlib.certutils import SSLCert
-from netlib.exceptions import HttpReadDisconnect, HttpException
-from netlib.http import authentication, http1
+from netlib import tcp
+from netlib import socks
+from mitmproxy import certs
+from netlib import exceptions
+from netlib.http import authentication
+from netlib.http import http1
from netlib.tcp import Address
from netlib.tutils import raises
-from pathod import pathoc, pathod
+from pathod import pathoc
+from pathod import pathod
from . import tutils, tservers
@@ -144,9 +147,9 @@ class TcpMixin:
# Test that we get the original SSL cert
if self.ssl:
- i_cert = SSLCert(i.sslinfo.certchain[0])
- i2_cert = SSLCert(i2.sslinfo.certchain[0])
- n_cert = SSLCert(n.sslinfo.certchain[0])
+ i_cert = certs.SSLCert(i.sslinfo.certchain[0])
+ i2_cert = certs.SSLCert(i2.sslinfo.certchain[0])
+ n_cert = certs.SSLCert(n.sslinfo.certchain[0])
assert i_cert == i2_cert
assert i_cert != n_cert
@@ -156,7 +159,7 @@ class TcpMixin:
# mitmproxy responds with bad gateway
assert self.pathod(spec).status_code == 502
self._ignore_on()
- with raises(HttpException):
+ with raises(exceptions.HttpException):
self.pathod(spec) # pathoc tries to parse answer as HTTP
self._ignore_off()
@@ -190,9 +193,9 @@ class TcpMixin:
# Test that we get the original SSL cert
if self.ssl:
- i_cert = SSLCert(i.sslinfo.certchain[0])
- i2_cert = SSLCert(i2.sslinfo.certchain[0])
- n_cert = SSLCert(n.sslinfo.certchain[0])
+ i_cert = certs.SSLCert(i.sslinfo.certchain[0])
+ i2_cert = certs.SSLCert(i2.sslinfo.certchain[0])
+ n_cert = certs.SSLCert(n.sslinfo.certchain[0])
assert i_cert == i2_cert == n_cert
@@ -830,7 +833,7 @@ class TestKillRequest(tservers.HTTPProxyTest):
masterclass = MasterKillRequest
def test_kill(self):
- with raises(HttpReadDisconnect):
+ with raises(exceptions.HttpReadDisconnect):
self.pathod("200")
# Nothing should have hit the server
assert not self.server.last_log()
@@ -847,7 +850,7 @@ class TestKillResponse(tservers.HTTPProxyTest):
masterclass = MasterKillResponse
def test_kill(self):
- with raises(HttpReadDisconnect):
+ with raises(exceptions.HttpReadDisconnect):
self.pathod("200")
# The server should have seen a request
assert self.server.last_log()
@@ -1050,7 +1053,7 @@ class AddUpstreamCertsToClientChainMixin:
def test_add_upstream_certs_to_client_chain(self):
with open(self.servercert, "rb") as f:
d = f.read()
- upstreamCert = SSLCert.from_pem(d)
+ upstreamCert = certs.SSLCert.from_pem(d)
p = self.pathoc()
with p.connect():
upstream_cert_found_in_client_chain = False
diff --git a/test/netlib/test_tcp.py b/test/netlib/test_tcp.py
index 797a5a04..2c1b92dc 100644
--- a/test/netlib/test_tcp.py
+++ b/test/netlib/test_tcp.py
@@ -9,9 +9,10 @@ import mock
from OpenSSL import SSL
-from netlib import tcp, certutils, tutils
-from netlib.exceptions import InvalidCertificateException, TcpReadIncomplete, TlsException, \
- TcpTimeout, TcpDisconnect, TcpException, NetlibException
+from mitmproxy import certs
+from netlib import tcp
+from netlib import tutils
+from netlib import exceptions
from . import tservers
@@ -108,7 +109,7 @@ class TestServerBind(tservers.ServerTestBase):
with c.connect():
assert c.rfile.readline() == str(("127.0.0.1", random_port)).encode()
return
- except TcpException: # port probably already in use
+ except exceptions.TcpException: # port probably already in use
pass
@@ -155,7 +156,7 @@ class TestFinishFail(tservers.ServerTestBase):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.wfile.write(b"foo\n")
- c.wfile.flush = mock.Mock(side_effect=TcpDisconnect)
+ c.wfile.flush = mock.Mock(side_effect=exceptions.TcpDisconnect)
c.finish()
@@ -195,7 +196,7 @@ class TestSSLv3Only(tservers.ServerTestBase):
def test_failure(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
- tutils.raises(TlsException, c.convert_to_ssl, sni="foo.com")
+ tutils.raises(exceptions.TlsException, c.convert_to_ssl, sni="foo.com")
class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
@@ -236,7 +237,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
def test_mode_strict_should_fail(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
- with tutils.raises(InvalidCertificateException):
+ with tutils.raises(exceptions.InvalidCertificateException):
c.convert_to_ssl(
sni="example.mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
@@ -261,7 +262,7 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase):
def test_should_fail_without_sni(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
- with tutils.raises(TlsException):
+ with tutils.raises(exceptions.TlsException):
c.convert_to_ssl(
verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
@@ -270,7 +271,7 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase):
def test_should_fail(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
- with tutils.raises(InvalidCertificateException):
+ with tutils.raises(exceptions.InvalidCertificateException):
c.convert_to_ssl(
sni="mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
@@ -348,7 +349,7 @@ class TestSSLClientCert(tservers.ServerTestBase):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
tutils.raises(
- TlsException,
+ exceptions.TlsException,
c.convert_to_ssl,
cert=tutils.test_data.path("data/clientcert/make")
)
@@ -454,7 +455,7 @@ class TestSSLDisconnect(tservers.ServerTestBase):
# Excercise SSL.ZeroReturnError
c.rfile.read(10)
c.close()
- tutils.raises(TcpDisconnect, c.wfile.write, b"foo")
+ tutils.raises(exceptions.TcpDisconnect, c.wfile.write, b"foo")
tutils.raises(queue.Empty, self.q.get_nowait)
@@ -469,7 +470,7 @@ class TestSSLHardDisconnect(tservers.ServerTestBase):
# Exercise SSL.SysCallError
c.rfile.read(10)
c.close()
- tutils.raises(TcpDisconnect, c.wfile.write, b"foo")
+ tutils.raises(exceptions.TcpDisconnect, c.wfile.write, b"foo")
class TestDisconnect(tservers.ServerTestBase):
@@ -492,7 +493,7 @@ class TestServerTimeOut(tservers.ServerTestBase):
self.settimeout(0.01)
try:
self.rfile.read(10)
- except TcpTimeout:
+ except exceptions.TcpTimeout:
self.timeout = True
def test_timeout(self):
@@ -510,7 +511,7 @@ class TestTimeOut(tservers.ServerTestBase):
with c.connect():
c.settimeout(0.1)
assert c.gettimeout() == 0.1
- tutils.raises(TcpTimeout, c.rfile.read, 10)
+ tutils.raises(exceptions.TcpTimeout, c.rfile.read, 10)
class TestALPNClient(tservers.ServerTestBase):
@@ -562,13 +563,13 @@ class TestSSLTimeOut(tservers.ServerTestBase):
with c.connect():
c.convert_to_ssl()
c.settimeout(0.1)
- tutils.raises(TcpTimeout, c.rfile.read, 10)
+ tutils.raises(exceptions.TcpTimeout, c.rfile.read, 10)
class TestDHParams(tservers.ServerTestBase):
handler = HangHandler
ssl = dict(
- dhparams=certutils.CertStore.load_dhparam(
+ dhparams=certs.CertStore.load_dhparam(
tutils.test_data.path("data/dhparam.pem"),
),
cipher_list="DHE-RSA-AES256-SHA"
@@ -584,7 +585,7 @@ class TestDHParams(tservers.ServerTestBase):
def test_create_dhparams(self):
with tutils.tmpdir() as d:
filename = os.path.join(d, "dhparam.pem")
- certutils.CertStore.load_dhparam(filename)
+ certs.CertStore.load_dhparam(filename)
assert os.path.exists(filename)
@@ -592,7 +593,7 @@ class TestTCPClient:
def test_conerr(self):
c = tcp.TCPClient(("127.0.0.1", 0))
- tutils.raises(TcpException, c.connect)
+ tutils.raises(exceptions.TcpException, c.connect)
class TestFileLike:
@@ -661,7 +662,7 @@ class TestFileLike:
o = mock.MagicMock()
o.flush = mock.MagicMock(side_effect=socket.error)
s.o = o
- tutils.raises(TcpDisconnect, s.flush)
+ tutils.raises(exceptions.TcpDisconnect, s.flush)
def test_reader_read_error(self):
s = BytesIO(b"foobar\nfoobar")
@@ -669,7 +670,7 @@ class TestFileLike:
o = mock.MagicMock()
o.read = mock.MagicMock(side_effect=socket.error)
s.o = o
- tutils.raises(TcpDisconnect, s.read, 10)
+ tutils.raises(exceptions.TcpDisconnect, s.read, 10)
def test_reset_timestamps(self):
s = BytesIO(b"foobar\nfoobar")
@@ -700,24 +701,24 @@ class TestFileLike:
s = mock.MagicMock()
s.read = mock.MagicMock(side_effect=SSL.Error())
s = tcp.Reader(s)
- tutils.raises(TlsException, s.read, 1)
+ tutils.raises(exceptions.TlsException, s.read, 1)
def test_read_syscall_ssl_error(self):
s = mock.MagicMock()
s.read = mock.MagicMock(side_effect=SSL.SysCallError())
s = tcp.Reader(s)
- tutils.raises(TlsException, s.read, 1)
+ tutils.raises(exceptions.TlsException, s.read, 1)
def test_reader_readline_disconnect(self):
o = mock.MagicMock()
o.read = mock.MagicMock(side_effect=socket.error)
s = tcp.Reader(o)
- tutils.raises(TcpDisconnect, s.readline, 10)
+ tutils.raises(exceptions.TcpDisconnect, s.readline, 10)
def test_reader_incomplete_error(self):
s = BytesIO(b"foobar")
s = tcp.Reader(s)
- tutils.raises(TcpReadIncomplete, s.safe_read, 10)
+ tutils.raises(exceptions.TcpReadIncomplete, s.safe_read, 10)
class TestPeek(tservers.ServerTestBase):
@@ -738,11 +739,11 @@ class TestPeek(tservers.ServerTestBase):
assert c.rfile.readline() == testval
c.close()
- with tutils.raises(NetlibException):
+ with tutils.raises(exceptions.NetlibException):
if c.rfile.peek(1) == b"":
# Workaround for Python 2 on Unix:
# Peeking a closed connection does not raise an exception here.
- raise NetlibException()
+ raise exceptions.NetlibException()
class TestPeekSSL(TestPeek):