aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorThomas Kriechbaumer <thomas@kriechbaumer.name>2017-02-24 17:53:08 +0100
committerThomas Kriechbaumer <thomas@kriechbaumer.name>2017-02-24 18:47:56 +0100
commitd17309eda8f3cf5a0a701eb4dd0ce4378655be16 (patch)
tree6f751dcca399d349d6a0f5d50dbbac9f229f3ecf /test
parent83f1e2eec01ab3edec723b252b0c109f00b77c56 (diff)
downloadmitmproxy-d17309eda8f3cf5a0a701eb4dd0ce4378655be16.tar.gz
mitmproxy-d17309eda8f3cf5a0a701eb4dd0ce4378655be16.tar.bz2
mitmproxy-d17309eda8f3cf5a0a701eb4dd0ce4378655be16.zip
flowfilter: coverage++
Diffstat (limited to 'test')
-rw-r--r--test/mitmproxy/addons/test_dumper.py2
-rw-r--r--test/mitmproxy/test_eventsequence.py2
-rw-r--r--test/mitmproxy/test_flowfilter.py99
3 files changed, 102 insertions, 1 deletions
diff --git a/test/mitmproxy/addons/test_dumper.py b/test/mitmproxy/addons/test_dumper.py
index 75b994fa..22d2c2c6 100644
--- a/test/mitmproxy/addons/test_dumper.py
+++ b/test/mitmproxy/addons/test_dumper.py
@@ -176,7 +176,7 @@ def test_websocket():
ctx.configure(d, flow_detail=3, showhost=True)
f = tflow.twebsocketflow()
d.websocket_message(f)
- assert "hello text" in sio.getvalue()
+ assert "it's me" in sio.getvalue()
sio.truncate(0)
d.websocket_end(f)
diff --git a/test/mitmproxy/test_eventsequence.py b/test/mitmproxy/test_eventsequence.py
index fe0f92b3..871d4b9d 100644
--- a/test/mitmproxy/test_eventsequence.py
+++ b/test/mitmproxy/test_eventsequence.py
@@ -32,6 +32,8 @@ def test_websocket_flow(err):
assert len(f.messages) == 1
assert next(i) == ("websocket_message", f)
assert len(f.messages) == 2
+ assert next(i) == ("websocket_message", f)
+ assert len(f.messages) == 3
if err:
assert next(i) == ("websocket_error", f)
assert next(i) == ("websocket_end", f)
diff --git a/test/mitmproxy/test_flowfilter.py b/test/mitmproxy/test_flowfilter.py
index 646c210f..46fff477 100644
--- a/test/mitmproxy/test_flowfilter.py
+++ b/test/mitmproxy/test_flowfilter.py
@@ -1,4 +1,5 @@
import io
+import pytest
from unittest.mock import patch
from mitmproxy.test import tflow
@@ -134,6 +135,12 @@ class TestMatchingHTTPFlow:
e = self.err()
assert self.q("~e", e)
+ def test_fmarked(self):
+ q = self.req()
+ assert not self.q("~marked", q)
+ q.marked = True
+ assert self.q("~marked", q)
+
def test_head(self):
q = self.req()
s = self.resp()
@@ -279,6 +286,7 @@ class TestMatchingTCPFlow:
f = self.flow()
assert self.q("~tcp", f)
assert not self.q("~http", f)
+ assert not self.q("~websocket", f)
def test_ferr(self):
e = self.err()
@@ -388,6 +396,87 @@ class TestMatchingTCPFlow:
assert not self.q("~u whatever", f)
+class TestMatchingWebSocketFlow:
+
+ def flow(self):
+ return tflow.twebsocketflow()
+
+ def err(self):
+ return tflow.twebsocketflow(err=True)
+
+ def q(self, q, o):
+ return flowfilter.parse(q)(o)
+
+ def test_websocket(self):
+ f = self.flow()
+ assert self.q("~websocket", f)
+ assert not self.q("~tcp", f)
+ assert not self.q("~http", f)
+
+ def test_ferr(self):
+ e = self.err()
+ assert self.q("~e", e)
+
+ def test_body(self):
+ f = self.flow()
+
+ # Messages sent by client or server
+ assert self.q("~b hello", f)
+ assert self.q("~b me", f)
+ assert not self.q("~b nonexistent", f)
+
+ # Messages sent by client
+ assert self.q("~bq hello", f)
+ assert not self.q("~bq me", f)
+ assert not self.q("~bq nonexistent", f)
+
+ # Messages sent by server
+ assert self.q("~bs me", f)
+ assert not self.q("~bs hello", f)
+ assert not self.q("~bs nonexistent", f)
+
+ def test_src(self):
+ f = self.flow()
+ assert self.q("~src address", f)
+ assert not self.q("~src foobar", f)
+ assert self.q("~src :22", f)
+ assert not self.q("~src :99", f)
+ assert self.q("~src address:22", f)
+
+ def test_dst(self):
+ f = self.flow()
+ f.server_conn = tflow.tserver_conn()
+ assert self.q("~dst address", f)
+ assert not self.q("~dst foobar", f)
+ assert self.q("~dst :22", f)
+ assert not self.q("~dst :99", f)
+ assert self.q("~dst address:22", f)
+
+ def test_and(self):
+ f = self.flow()
+ f.server_conn = tflow.tserver_conn()
+ assert self.q("~b hello & ~b me", f)
+ assert not self.q("~src wrongaddress & ~b hello", f)
+ assert self.q("(~src :22 & ~dst :22) & ~b hello", f)
+ assert not self.q("(~src address:22 & ~dst :22) & ~b nonexistent", f)
+ assert not self.q("(~src address:22 & ~dst :99) & ~b hello", f)
+
+ def test_or(self):
+ f = self.flow()
+ f.server_conn = tflow.tserver_conn()
+ assert self.q("~b hello | ~b me", f)
+ assert self.q("~src :22 | ~b me", f)
+ assert not self.q("~src :99 | ~dst :99", f)
+ assert self.q("(~src :22 | ~dst :22) | ~b me", f)
+
+ def test_not(self):
+ f = self.flow()
+ assert not self.q("! ~src :22", f)
+ assert self.q("! ~src :99", f)
+ assert self.q("!~src :99 !~src :99", f)
+ assert not self.q("!~src :99 !~src :22", f)
+
+
class TestMatchingDummyFlow:
def flow(self):
@@ -421,6 +510,8 @@ class TestMatchingDummyFlow:
assert not self.q("~e", f)
assert not self.q("~http", f)
+ assert not self.q("~tcp", f)
+ assert not self.q("~websocket", f)
assert not self.q("~h whatever", f)
assert not self.q("~hq whatever", f)
@@ -450,3 +541,11 @@ def test_pyparsing_bug(extract_tb):
# The text is a string with leading and trailing whitespace stripped; if the source is not available it is None.
extract_tb.return_value = [("", 1, "test", None)]
assert flowfilter.parse("test")
+
+
+def test_match():
+ with pytest.raises(ValueError):
+ flowfilter.match('[foobar', None)
+
+ assert flowfilter.match(None, None)
+ assert not flowfilter.match('foobar', None)