diff options
author | Thomas Kriechbaumer <thomas@kriechbaumer.name> | 2017-02-24 17:53:08 +0100 |
---|---|---|
committer | Thomas Kriechbaumer <thomas@kriechbaumer.name> | 2017-02-24 18:47:56 +0100 |
commit | d17309eda8f3cf5a0a701eb4dd0ce4378655be16 (patch) | |
tree | 6f751dcca399d349d6a0f5d50dbbac9f229f3ecf /test | |
parent | 83f1e2eec01ab3edec723b252b0c109f00b77c56 (diff) | |
download | mitmproxy-d17309eda8f3cf5a0a701eb4dd0ce4378655be16.tar.gz mitmproxy-d17309eda8f3cf5a0a701eb4dd0ce4378655be16.tar.bz2 mitmproxy-d17309eda8f3cf5a0a701eb4dd0ce4378655be16.zip |
flowfilter: coverage++
Diffstat (limited to 'test')
-rw-r--r-- | test/mitmproxy/addons/test_dumper.py | 2 | ||||
-rw-r--r-- | test/mitmproxy/test_eventsequence.py | 2 | ||||
-rw-r--r-- | test/mitmproxy/test_flowfilter.py | 99 |
3 files changed, 102 insertions, 1 deletions
diff --git a/test/mitmproxy/addons/test_dumper.py b/test/mitmproxy/addons/test_dumper.py index 75b994fa..22d2c2c6 100644 --- a/test/mitmproxy/addons/test_dumper.py +++ b/test/mitmproxy/addons/test_dumper.py @@ -176,7 +176,7 @@ def test_websocket(): ctx.configure(d, flow_detail=3, showhost=True) f = tflow.twebsocketflow() d.websocket_message(f) - assert "hello text" in sio.getvalue() + assert "it's me" in sio.getvalue() sio.truncate(0) d.websocket_end(f) diff --git a/test/mitmproxy/test_eventsequence.py b/test/mitmproxy/test_eventsequence.py index fe0f92b3..871d4b9d 100644 --- a/test/mitmproxy/test_eventsequence.py +++ b/test/mitmproxy/test_eventsequence.py @@ -32,6 +32,8 @@ def test_websocket_flow(err): assert len(f.messages) == 1 assert next(i) == ("websocket_message", f) assert len(f.messages) == 2 + assert next(i) == ("websocket_message", f) + assert len(f.messages) == 3 if err: assert next(i) == ("websocket_error", f) assert next(i) == ("websocket_end", f) diff --git a/test/mitmproxy/test_flowfilter.py b/test/mitmproxy/test_flowfilter.py index 646c210f..46fff477 100644 --- a/test/mitmproxy/test_flowfilter.py +++ b/test/mitmproxy/test_flowfilter.py @@ -1,4 +1,5 @@ import io +import pytest from unittest.mock import patch from mitmproxy.test import tflow @@ -134,6 +135,12 @@ class TestMatchingHTTPFlow: e = self.err() assert self.q("~e", e) + def test_fmarked(self): + q = self.req() + assert not self.q("~marked", q) + q.marked = True + assert self.q("~marked", q) + def test_head(self): q = self.req() s = self.resp() @@ -279,6 +286,7 @@ class TestMatchingTCPFlow: f = self.flow() assert self.q("~tcp", f) assert not self.q("~http", f) + assert not self.q("~websocket", f) def test_ferr(self): e = self.err() @@ -388,6 +396,87 @@ class TestMatchingTCPFlow: assert not self.q("~u whatever", f) +class TestMatchingWebSocketFlow: + + def flow(self): + return tflow.twebsocketflow() + + def err(self): + return tflow.twebsocketflow(err=True) + + def q(self, q, o): + return flowfilter.parse(q)(o) + + def test_websocket(self): + f = self.flow() + assert self.q("~websocket", f) + assert not self.q("~tcp", f) + assert not self.q("~http", f) + + def test_ferr(self): + e = self.err() + assert self.q("~e", e) + + def test_body(self): + f = self.flow() + + # Messages sent by client or server + assert self.q("~b hello", f) + assert self.q("~b me", f) + assert not self.q("~b nonexistent", f) + + # Messages sent by client + assert self.q("~bq hello", f) + assert not self.q("~bq me", f) + assert not self.q("~bq nonexistent", f) + + # Messages sent by server + assert self.q("~bs me", f) + assert not self.q("~bs hello", f) + assert not self.q("~bs nonexistent", f) + + def test_src(self): + f = self.flow() + assert self.q("~src address", f) + assert not self.q("~src foobar", f) + assert self.q("~src :22", f) + assert not self.q("~src :99", f) + assert self.q("~src address:22", f) + + def test_dst(self): + f = self.flow() + f.server_conn = tflow.tserver_conn() + assert self.q("~dst address", f) + assert not self.q("~dst foobar", f) + assert self.q("~dst :22", f) + assert not self.q("~dst :99", f) + assert self.q("~dst address:22", f) + + def test_and(self): + f = self.flow() + f.server_conn = tflow.tserver_conn() + assert self.q("~b hello & ~b me", f) + assert not self.q("~src wrongaddress & ~b hello", f) + assert self.q("(~src :22 & ~dst :22) & ~b hello", f) + assert not self.q("(~src address:22 & ~dst :22) & ~b nonexistent", f) + assert not self.q("(~src address:22 & ~dst :99) & ~b hello", f) + + def test_or(self): + f = self.flow() + f.server_conn = tflow.tserver_conn() + assert self.q("~b hello | ~b me", f) + assert self.q("~src :22 | ~b me", f) + assert not self.q("~src :99 | ~dst :99", f) + assert self.q("(~src :22 | ~dst :22) | ~b me", f) + + def test_not(self): + f = self.flow() + assert not self.q("! ~src :22", f) + assert self.q("! ~src :99", f) + assert self.q("!~src :99 !~src :99", f) + assert not self.q("!~src :99 !~src :22", f) + + class TestMatchingDummyFlow: def flow(self): @@ -421,6 +510,8 @@ class TestMatchingDummyFlow: assert not self.q("~e", f) assert not self.q("~http", f) + assert not self.q("~tcp", f) + assert not self.q("~websocket", f) assert not self.q("~h whatever", f) assert not self.q("~hq whatever", f) @@ -450,3 +541,11 @@ def test_pyparsing_bug(extract_tb): # The text is a string with leading and trailing whitespace stripped; if the source is not available it is None. extract_tb.return_value = [("", 1, "test", None)] assert flowfilter.parse("test") + + +def test_match(): + with pytest.raises(ValueError): + flowfilter.match('[foobar', None) + + assert flowfilter.match(None, None) + assert not flowfilter.match('foobar', None) |