aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/mitmproxy/addons/test_view.py2
-rw-r--r--test/mitmproxy/net/http/test_encoding.py1
-rw-r--r--test/mitmproxy/net/http/test_url.py16
-rw-r--r--test/mitmproxy/proxy/test_config.py9
-rw-r--r--test/mitmproxy/proxy/test_server.py51
-rw-r--r--test/mitmproxy/script/test_concurrent.py26
-rw-r--r--test/mitmproxy/test_flowfilter.py23
7 files changed, 112 insertions, 16 deletions
diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py
index 976c14b7..f5088a68 100644
--- a/test/mitmproxy/addons/test_view.py
+++ b/test/mitmproxy/addons/test_view.py
@@ -471,7 +471,7 @@ def test_focus():
v = view.View()
v.add([tft()])
f = view.Focus(v)
- assert f.index is 0
+ assert f.index == 0
assert f.flow is v[0]
# Start empty
diff --git a/test/mitmproxy/net/http/test_encoding.py b/test/mitmproxy/net/http/test_encoding.py
index 8dac12cb..7f768f39 100644
--- a/test/mitmproxy/net/http/test_encoding.py
+++ b/test/mitmproxy/net/http/test_encoding.py
@@ -19,6 +19,7 @@ def test_identity(encoder):
'gzip',
'br',
'deflate',
+ 'zstd',
])
def test_encoders(encoder):
"""
diff --git a/test/mitmproxy/net/http/test_url.py b/test/mitmproxy/net/http/test_url.py
index ecf8e896..48277859 100644
--- a/test/mitmproxy/net/http/test_url.py
+++ b/test/mitmproxy/net/http/test_url.py
@@ -49,6 +49,17 @@ def test_parse():
url.parse('http://lo[calhost')
+def test_ascii_check():
+
+ test_url = "https://xyz.tax-edu.net?flag=selectCourse&lc_id=42825&lc_name=茅莽莽猫氓猫氓".encode()
+ scheme, host, port, full_path = url.parse(test_url)
+ assert scheme == b'https'
+ assert host == b'xyz.tax-edu.net'
+ assert port == 443
+ assert full_path == b'/?flag%3DselectCourse%26lc_id%3D42825%26lc_name%3D%E8%8C%85%E8%8E%BD%E8%8E' \
+ b'%BD%E7%8C%AB%E6%B0%93%E7%8C%AB%E6%B0%93'
+
+
@pytest.mark.skipif(sys.version_info < (3, 6), reason='requires Python 3.6 or higher')
def test_parse_port_range():
# Port out of range
@@ -61,6 +72,7 @@ def test_unparse():
assert url.unparse("http", "foo.com", 80, "/bar") == "http://foo.com/bar"
assert url.unparse("https", "foo.com", 80, "") == "https://foo.com:80"
assert url.unparse("https", "foo.com", 443, "") == "https://foo.com"
+ assert url.unparse("https", "foo.com", 443, "*") == "https://foo.com"
# We ignore the byte 126: '~' because of an incompatibility in Python 3.6 and 3.7
@@ -131,3 +143,7 @@ def test_unquote():
assert url.unquote("foo") == "foo"
assert url.unquote("foo%20bar") == "foo bar"
assert url.unquote(surrogates_quoted) == surrogates
+
+
+def test_hostport():
+ assert url.hostport(b"https", b"foo.com", 8080) == b"foo.com:8080"
diff --git a/test/mitmproxy/proxy/test_config.py b/test/mitmproxy/proxy/test_config.py
index 1da031c6..1319d1a9 100644
--- a/test/mitmproxy/proxy/test_config.py
+++ b/test/mitmproxy/proxy/test_config.py
@@ -17,3 +17,12 @@ class TestProxyConfig:
opts.certs = [tdata.path("mitmproxy/data/dumpfile-011")]
with pytest.raises(exceptions.OptionsError, match="Invalid certificate format"):
ProxyConfig(opts)
+
+ def test_cannot_set_both_allow_and_filter_options(self):
+ opts = options.Options()
+ opts.ignore_hosts = ["foo"]
+ opts.allow_hosts = ["bar"]
+ with pytest.raises(exceptions.OptionsError, match="--ignore-hosts and --allow-hosts are "
+ "mutually exclusive; please choose "
+ "one."):
+ ProxyConfig(opts)
diff --git a/test/mitmproxy/proxy/test_server.py b/test/mitmproxy/proxy/test_server.py
index 01ab068d..b5852d60 100644
--- a/test/mitmproxy/proxy/test_server.py
+++ b/test/mitmproxy/proxy/test_server.py
@@ -78,6 +78,16 @@ class TcpMixin:
self.options.ignore_hosts = self._ignore_backup
del self._ignore_backup
+ def _allow_on(self):
+ assert not hasattr(self, "_allow_backup")
+ self._allow_backup = self.options.allow_hosts
+ self.options.allow_hosts = ["(127.0.0.1|None):\\d+"] + self.options.allow_hosts
+
+ def _allow_off(self):
+ assert hasattr(self, "_allow_backup")
+ self.options.allow_hosts = self._allow_backup
+ del self._allow_backup
+
def test_ignore(self):
n = self.pathod("304")
self._ignore_on()
@@ -111,6 +121,40 @@ class TcpMixin:
self._ignore_off()
+ def test_allow(self):
+ n = self.pathod("304")
+ self._allow_on()
+ i = self.pathod("305")
+ i2 = self.pathod("306")
+ self._allow_off()
+
+ assert n.status_code == 304
+ assert i.status_code == 305
+ assert i2.status_code == 306
+
+ assert any(f.response.status_code == 304 for f in self.master.state.flows)
+ assert any(f.response.status_code == 305 for f in self.master.state.flows)
+ assert any(f.response.status_code == 306 for f in self.master.state.flows)
+
+ # Test that we get the original SSL cert
+ if self.ssl:
+ i_cert = certs.Cert(i.sslinfo.certchain[0])
+ i2_cert = certs.Cert(i2.sslinfo.certchain[0])
+ n_cert = certs.Cert(n.sslinfo.certchain[0])
+
+ assert i_cert == i2_cert
+ assert i_cert != n_cert
+
+ # Test Non-HTTP traffic
+ spec = "200:i0,@100:d0" # this results in just 100 random bytes
+ # mitmproxy responds with bad gateway
+ assert self.pathod(spec).status_code == 502
+ self._allow_on()
+
+ self.pathod(spec) # pathoc parses answer as HTTP
+
+ self._allow_off()
+
def _tcpproxy_on(self):
assert not hasattr(self, "_tcpproxy_backup")
self._tcpproxy_backup = self.options.tcp_hosts
@@ -852,10 +896,12 @@ class TestUpstreamProxySSL(
def _host_pattern_on(self, attr):
"""
- Updates config.check_tcp or check_ignore, depending on attr.
+ Updates config.check_tcp or check_filter, depending on attr.
"""
assert not hasattr(self, "_ignore_%s_backup" % attr)
backup = []
+ handle = attr
+ attr = "filter" if attr in ["allow", "ignore"] else attr
for proxy in self.chain:
old_matcher = getattr(
proxy.tmaster.server.config,
@@ -865,12 +911,13 @@ class TestUpstreamProxySSL(
setattr(
proxy.tmaster.server.config,
"check_%s" % attr,
- HostMatcher([".+:%s" % self.server.port] + old_matcher.patterns)
+ HostMatcher(handle, [".+:%s" % self.server.port] + old_matcher.patterns)
)
setattr(self, "_ignore_%s_backup" % attr, backup)
def _host_pattern_off(self, attr):
+ attr = "filter" if attr in ["allow", "ignore"] else attr
backup = getattr(self, "_ignore_%s_backup" % attr)
for proxy in reversed(self.chain):
setattr(
diff --git a/test/mitmproxy/script/test_concurrent.py b/test/mitmproxy/script/test_concurrent.py
index 3ec58760..70d41511 100644
--- a/test/mitmproxy/script/test_concurrent.py
+++ b/test/mitmproxy/script/test_concurrent.py
@@ -43,17 +43,17 @@ class TestConcurrent(tservers.MasterTest):
assert await tctx.master.await_log("decorator not supported")
def test_concurrent_class(self, tdata):
- with taddons.context() as tctx:
- sc = tctx.script(
- tdata.path(
- "mitmproxy/data/addonscripts/concurrent_decorator_class.py"
- )
+ with taddons.context() as tctx:
+ sc = tctx.script(
+ tdata.path(
+ "mitmproxy/data/addonscripts/concurrent_decorator_class.py"
)
- f1, f2 = tflow.tflow(), tflow.tflow()
- tctx.cycle(sc, f1)
- tctx.cycle(sc, f2)
- start = time.time()
- while time.time() - start < 5:
- if f1.reply.state == f2.reply.state == "committed":
- return
- raise ValueError("Script never acked")
+ )
+ f1, f2 = tflow.tflow(), tflow.tflow()
+ tctx.cycle(sc, f1)
+ tctx.cycle(sc, f2)
+ start = time.time()
+ while time.time() - start < 5:
+ if f1.reply.state == f2.reply.state == "committed":
+ return
+ raise ValueError("Script never acked")
diff --git a/test/mitmproxy/test_flowfilter.py b/test/mitmproxy/test_flowfilter.py
index 4eb37d81..d53cec7d 100644
--- a/test/mitmproxy/test_flowfilter.py
+++ b/test/mitmproxy/test_flowfilter.py
@@ -28,6 +28,9 @@ class TestParsing:
self._dump(p)
assert len(p.lst) == 2
+ def test_non_ascii(self):
+ assert flowfilter.parse("~s шгн")
+
def test_naked_url(self):
a = flowfilter.parse("foobar ~h rex")
assert a.lst[0].expr == "foobar"
@@ -173,10 +176,30 @@ class TestMatchingHTTPFlow:
assert not self.q("~bq message", q)
assert not self.q("~bq message", s)
+ s.response.text = 'яч' # Cyrillic
+ assert self.q("~bs яч", s)
+ s.response.text = '测试' # Chinese
+ assert self.q('~bs 测试', s)
+ s.response.text = 'ॐ' # Hindi
+ assert self.q('~bs ॐ', s)
+ s.response.text = 'لله' # Arabic
+ assert self.q('~bs لله', s)
+ s.response.text = 'θεός' # Greek
+ assert self.q('~bs θεός', s)
+ s.response.text = 'לוהים' # Hebrew
+ assert self.q('~bs לוהים', s)
+ s.response.text = '神' # Japanese
+ assert self.q('~bs 神', s)
+ s.response.text = '하나님' # Korean
+ assert self.q('~bs 하나님', s)
+ s.response.text = 'Äÿ' # Latin
+ assert self.q('~bs Äÿ', s)
+
assert not self.q("~bs nomatch", s)
assert not self.q("~bs content", q)
assert not self.q("~bs content", s)
assert not self.q("~bs message", q)
+ s.response.text = 'message'
assert self.q("~bs message", s)
def test_body(self):