diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/bench/serialization-bm.py | 116 | ||||
-rwxr-xr-x | test/filename_matching.py | 3 | ||||
-rw-r--r-- | test/mitmproxy/io/test_db.py | 26 | ||||
-rw-r--r-- | test/mitmproxy/io/test_protobuf.py | 120 | ||||
-rw-r--r-- | test/mitmproxy/tools/console/test_commander.py | 62 |
5 files changed, 326 insertions, 1 deletions
diff --git a/test/bench/serialization-bm.py b/test/bench/serialization-bm.py new file mode 100644 index 00000000..665b72cb --- /dev/null +++ b/test/bench/serialization-bm.py @@ -0,0 +1,116 @@ +import tempfile
+import asyncio
+import typing
+import time
+
+from statistics import mean
+
+from mitmproxy import ctx
+from mitmproxy.io import db
+from mitmproxy.test import tflow
+
+
+class StreamTester:
+
+ """
+ Generates a constant stream of flows and
+ measure protobuf dumping throughput.
+ """
+
+ def __init__(self):
+ self.dbh = None
+ self.streaming = False
+ self.tf = None
+ self.out = None
+ self.hot_flows = []
+ self.results = []
+ self._flushes = 0
+ self._stream_period = 0.001
+ self._flush_period = 3.0
+ self._flush_rate = 150
+ self._target = 2000
+ self.loop = asyncio.get_event_loop()
+ self.queue = asyncio.Queue(maxsize=self._flush_rate * 3, loop=self.loop)
+ self.temp = tempfile.NamedTemporaryFile()
+
+ def load(self, loader):
+ loader.add_option(
+ "testflow_size",
+ int,
+ 1000,
+ "Length in bytes of test flow content"
+ )
+ loader.add_option(
+ "benchmark_save_path",
+ typing.Optional[str],
+ None,
+ "Destination for the stats result file"
+ )
+
+ def _log(self, msg):
+ if self.out:
+ self.out.write(msg + '\n')
+ else:
+ ctx.log(msg)
+
+ def running(self):
+ if not self.streaming:
+ ctx.log("<== Serialization Benchmark Enabled ==>")
+ self.tf = tflow.tflow()
+ self.tf.request.content = b'A' * ctx.options.testflow_size
+ ctx.log(f"With content size: {len(self.tf.request.content)} B")
+ if ctx.options.benchmark_save_path:
+ ctx.log(f"Storing results to {ctx.options.benchmark_save_path}")
+ self.out = open(ctx.options.benchmark_save_path, "w")
+ self.dbh = db.DBHandler(self.temp.name, mode='write')
+ self.streaming = True
+ tasks = (self.stream, self.writer, self.stats)
+ self.loop.create_task(asyncio.gather(*(t() for t in tasks)))
+
+ async def stream(self):
+ while True:
+ await self.queue.put(self.tf)
+ await asyncio.sleep(self._stream_period)
+
+ async def writer(self):
+ while True:
+ await asyncio.sleep(self._flush_period)
+ count = 1
+ f = await self.queue.get()
+ self.hot_flows.append(f)
+ while count < self._flush_rate:
+ try:
+ self.hot_flows.append(self.queue.get_nowait())
+ count += 1
+ except asyncio.QueueEmpty:
+ pass
+ start = time.perf_counter()
+ n = self._fflush()
+ end = time.perf_counter()
+ self._log(f"dumps/time ratio: {n} / {end-start} -> {n/(end-start)}")
+ self.results.append(n / (end - start))
+ self._flushes += n
+ self._log(f"Flows dumped: {self._flushes}")
+ ctx.log(f"Progress: {min(100.0, 100.0 * (self._flushes / self._target))}%")
+
+ async def stats(self):
+ while True:
+ await asyncio.sleep(1.0)
+ if self._flushes >= self._target:
+ self._log(f"AVG : {mean(self.results)}")
+ ctx.log(f"<== Benchmark Ended. Shutting down... ==>")
+ if self.out:
+ self.out.close()
+ self.temp.close()
+ ctx.master.shutdown()
+
+ def _fflush(self):
+ self.dbh.store(self.hot_flows)
+ n = len(self.hot_flows)
+ self.hot_flows = []
+ return n
+
+
+addons = [
+ StreamTester()
+]
diff --git a/test/filename_matching.py b/test/filename_matching.py index 5f49725e..f5321307 100755 --- a/test/filename_matching.py +++ b/test/filename_matching.py @@ -9,7 +9,8 @@ import sys def check_src_files_have_test(): missing_test_files = [] - excluded = ['mitmproxy/contrib/', 'mitmproxy/test/', 'mitmproxy/tools/', 'mitmproxy/platform/'] + excluded = ['mitmproxy/contrib/', 'mitmproxy/io/proto/', + 'mitmproxy/test/', 'mitmproxy/tools/', 'mitmproxy/platform/'] src_files = glob.glob('mitmproxy/**/*.py', recursive=True) + glob.glob('pathod/**/*.py', recursive=True) src_files = [f for f in src_files if os.path.basename(f) != '__init__.py'] src_files = [f for f in src_files if not any(os.path.normpath(p) in f for p in excluded)] diff --git a/test/mitmproxy/io/test_db.py b/test/mitmproxy/io/test_db.py new file mode 100644 index 00000000..4a2dfb67 --- /dev/null +++ b/test/mitmproxy/io/test_db.py @@ -0,0 +1,26 @@ +from mitmproxy.io import db +from mitmproxy.test import tflow + + +class TestDB: + + def test_create(self, tdata): + dh = db.DBHandler(db_path=tdata.path("mitmproxy/data") + "/tmp.sqlite") + with dh._con as c: + cur = c.cursor() + cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='FLOWS';") + assert cur.fetchall() == [('FLOWS',)] + + def test_roundtrip(self, tdata): + dh = db.DBHandler(db_path=tdata.path("mitmproxy/data") + "/tmp.sqlite", mode='write') + flows = [] + for i in range(10): + flows.append(tflow.tflow()) + dh.store(flows) + dh = db.DBHandler(db_path=tdata.path("mitmproxy/data") + "/tmp.sqlite") + with dh._con as c: + cur = c.cursor() + cur.execute("SELECT count(*) FROM FLOWS;") + assert cur.fetchall()[0][0] == 10 + loaded_flows = dh.load() + assert len(loaded_flows) == len(flows) diff --git a/test/mitmproxy/io/test_protobuf.py b/test/mitmproxy/io/test_protobuf.py new file mode 100644 index 00000000..f725b980 --- /dev/null +++ b/test/mitmproxy/io/test_protobuf.py @@ -0,0 +1,120 @@ +import pytest + +from mitmproxy import certs +from mitmproxy import http +from mitmproxy import exceptions +from mitmproxy.test import tflow, tutils +from mitmproxy.io import protobuf + + +class TestProtobuf: + + def test_roundtrip_client(self): + c = tflow.tclient_conn() + del c.reply + c.rfile = None + c.wfile = None + pc = protobuf._dump_http_client_conn(c) + lc = protobuf._load_http_client_conn(pc) + assert c.__dict__ == lc.__dict__ + + def test_roundtrip_client_cert(self, tdata): + c = tflow.tclient_conn() + c.rfile = None + c.wfile = None + del c.reply + with open(tdata.path("mitmproxy/net/data/clientcert/client.pem"), "rb") as f: + d = f.read() + c.clientcert = certs.Cert.from_pem(d) + pc = protobuf._dump_http_client_conn(c) + lc = protobuf._load_http_client_conn(pc) + assert c.__dict__ == lc.__dict__ + + def test_roundtrip_server(self): + s = tflow.tserver_conn() + del s.reply + s.wfile = None + s.rfile = None + ps = protobuf._dump_http_server_conn(s) + ls = protobuf._load_http_server_conn(ps) + assert s.__dict__ == ls.__dict__ + + def test_roundtrip_server_cert(self, tdata): + s = tflow.tserver_conn() + del s.reply + s.wfile = None + s.rfile = None + with open(tdata.path("mitmproxy/net/data/text_cert"), "rb") as f: + d = f.read() + s.cert = certs.Cert.from_pem(d) + ps = protobuf._dump_http_server_conn(s) + ls = protobuf._load_http_server_conn(ps) + assert s.__dict__ == ls.__dict__ + + def test_roundtrip_server_via(self): + s = tflow.tserver_conn() + s.via = tflow.tserver_conn() + del s.reply + s.wfile = None + s.rfile = None + ps = protobuf._dump_http_server_conn(s) + ls = protobuf._load_http_server_conn(ps) + assert s.__dict__ == ls.__dict__ + del s.via.reply + s.via.wfile = None + s.via.rfile = None + assert s.via.__dict__ == ls.via.__dict__ + + def test_roundtrip_http_request(self): + req = http.HTTPRequest.wrap(tutils.treq()) + preq = protobuf._dump_http_request(req) + lreq = protobuf._load_http_request(preq) + assert req.__dict__ == lreq.__dict__ + + def test_roundtrip_http_request_empty_content(self): + req = http.HTTPRequest.wrap(tutils.treq(content=b"")) + preq = protobuf._dump_http_request(req) + lreq = protobuf._load_http_request(preq) + assert req.__dict__ == lreq.__dict__ + + def test_roundtrip_http_response(self): + res = http.HTTPResponse.wrap(tutils.tresp()) + pres = protobuf._dump_http_response(res) + lres = protobuf._load_http_response(pres) + assert res.__dict__ == lres.__dict__ + + def test_roundtrip_http_response_empty_content(self): + res = http.HTTPResponse.wrap(tutils.tresp(content=b"")) + pres = protobuf._dump_http_response(res) + lres = protobuf._load_http_response(pres) + assert res.__dict__ == lres.__dict__ + + def test_roundtrip_http_error(self): + err = tflow.terr() + perr = protobuf._dump_http_error(err) + lerr = protobuf._load_http_error(perr) + assert err.__dict__ == lerr.__dict__ + + def test_roundtrip_http_flow_only_req(self): + f = tflow.tflow() + f.reply = None + pf = protobuf.dumps(f) + lf = protobuf.loads(pf, "http") + assert f.__dict__ == lf.__dict__ + + def test_roundtrip_http_flow_res(self): + f = tflow.tflow(resp=True) + f.reply = None + pf = protobuf.dumps(f) + lf = protobuf.loads(pf, "http") + assert f.__dict__ == lf.__dict__ + + def test_unsupported_dumps(self): + w = tflow.twebsocketflow() + with pytest.raises(exceptions.TypeError): + protobuf.dumps(w) + + def test_unsupported_loads(self): + b = b"blobs" + with pytest.raises(exceptions.TypeError): + protobuf.loads(b, 'not-http') diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index 2a96995d..b5e226fe 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -28,6 +28,68 @@ class TestListCompleter: assert c.cycle() == expected +class TestCommandHistory: + def fill_history(self, commands): + with taddons.context() as tctx: + history = commander.CommandHistory(tctx.master, size=3) + for c in commands: + cbuf = commander.CommandBuffer(tctx.master, c) + history.add_command(cbuf) + return history, tctx.master + + def test_add_command(self): + commands = ["command1", "command2"] + history, tctx_master = self.fill_history(commands) + + saved_commands = [buf.text for buf in history.saved_commands] + assert saved_commands == [""] + commands + + # The history size is only 3. So, we forget the first + # one command, when adding fourth command + cbuf = commander.CommandBuffer(tctx_master, "command3") + history.add_command(cbuf) + saved_commands = [buf.text for buf in history.saved_commands] + assert saved_commands == commands + ["command3"] + + # Commands with the same text are not repeated in the history one by one + history.add_command(cbuf) + saved_commands = [buf.text for buf in history.saved_commands] + assert saved_commands == commands + ["command3"] + + # adding command in execution mode sets index at the beginning of the history + # and replace the last command buffer if it is empty or has the same text + cbuf = commander.CommandBuffer(tctx_master, "") + history.add_command(cbuf) + history.index = 0 + cbuf = commander.CommandBuffer(tctx_master, "command4") + history.add_command(cbuf, True) + assert history.index == history.last_index + saved_commands = [buf.text for buf in history.saved_commands] + assert saved_commands == ["command2", "command3", "command4"] + + def test_get_next(self): + commands = ["command1", "command2"] + history, tctx_master = self.fill_history(commands) + + history.index = -1 + expected_items = ["", "command1", "command2"] + for i in range(3): + assert history.get_next().text == expected_items[i] + # We are at the last item of the history + assert history.get_next() is None + + def test_get_prev(self): + commands = ["command1", "command2"] + history, tctx_master = self.fill_history(commands) + + expected_items = ["command2", "command1", ""] + history.index = history.last_index + 1 + for i in range(3): + assert history.get_prev().text == expected_items[i] + # We are at the first item of the history + assert history.get_prev() is None + + class TestCommandBuffer: def test_backspace(self): |