aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/bench/serialization-bm.py116
-rwxr-xr-xtest/filename_matching.py3
-rw-r--r--test/mitmproxy/io/test_db.py26
-rw-r--r--test/mitmproxy/io/test_protobuf.py120
-rw-r--r--test/mitmproxy/tools/console/test_commander.py62
5 files changed, 326 insertions, 1 deletions
diff --git a/test/bench/serialization-bm.py b/test/bench/serialization-bm.py
new file mode 100644
index 00000000..665b72cb
--- /dev/null
+++ b/test/bench/serialization-bm.py
@@ -0,0 +1,116 @@
+import tempfile
+import asyncio
+import typing
+import time
+
+from statistics import mean
+
+from mitmproxy import ctx
+from mitmproxy.io import db
+from mitmproxy.test import tflow
+
+
+class StreamTester:
+
+ """
+ Generates a constant stream of flows and
+ measure protobuf dumping throughput.
+ """
+
+ def __init__(self):
+ self.dbh = None
+ self.streaming = False
+ self.tf = None
+ self.out = None
+ self.hot_flows = []
+ self.results = []
+ self._flushes = 0
+ self._stream_period = 0.001
+ self._flush_period = 3.0
+ self._flush_rate = 150
+ self._target = 2000
+ self.loop = asyncio.get_event_loop()
+ self.queue = asyncio.Queue(maxsize=self._flush_rate * 3, loop=self.loop)
+ self.temp = tempfile.NamedTemporaryFile()
+
+ def load(self, loader):
+ loader.add_option(
+ "testflow_size",
+ int,
+ 1000,
+ "Length in bytes of test flow content"
+ )
+ loader.add_option(
+ "benchmark_save_path",
+ typing.Optional[str],
+ None,
+ "Destination for the stats result file"
+ )
+
+ def _log(self, msg):
+ if self.out:
+ self.out.write(msg + '\n')
+ else:
+ ctx.log(msg)
+
+ def running(self):
+ if not self.streaming:
+ ctx.log("<== Serialization Benchmark Enabled ==>")
+ self.tf = tflow.tflow()
+ self.tf.request.content = b'A' * ctx.options.testflow_size
+ ctx.log(f"With content size: {len(self.tf.request.content)} B")
+ if ctx.options.benchmark_save_path:
+ ctx.log(f"Storing results to {ctx.options.benchmark_save_path}")
+ self.out = open(ctx.options.benchmark_save_path, "w")
+ self.dbh = db.DBHandler(self.temp.name, mode='write')
+ self.streaming = True
+ tasks = (self.stream, self.writer, self.stats)
+ self.loop.create_task(asyncio.gather(*(t() for t in tasks)))
+
+ async def stream(self):
+ while True:
+ await self.queue.put(self.tf)
+ await asyncio.sleep(self._stream_period)
+
+ async def writer(self):
+ while True:
+ await asyncio.sleep(self._flush_period)
+ count = 1
+ f = await self.queue.get()
+ self.hot_flows.append(f)
+ while count < self._flush_rate:
+ try:
+ self.hot_flows.append(self.queue.get_nowait())
+ count += 1
+ except asyncio.QueueEmpty:
+ pass
+ start = time.perf_counter()
+ n = self._fflush()
+ end = time.perf_counter()
+ self._log(f"dumps/time ratio: {n} / {end-start} -> {n/(end-start)}")
+ self.results.append(n / (end - start))
+ self._flushes += n
+ self._log(f"Flows dumped: {self._flushes}")
+ ctx.log(f"Progress: {min(100.0, 100.0 * (self._flushes / self._target))}%")
+
+ async def stats(self):
+ while True:
+ await asyncio.sleep(1.0)
+ if self._flushes >= self._target:
+ self._log(f"AVG : {mean(self.results)}")
+ ctx.log(f"<== Benchmark Ended. Shutting down... ==>")
+ if self.out:
+ self.out.close()
+ self.temp.close()
+ ctx.master.shutdown()
+
+ def _fflush(self):
+ self.dbh.store(self.hot_flows)
+ n = len(self.hot_flows)
+ self.hot_flows = []
+ return n
+
+
+addons = [
+ StreamTester()
+]
diff --git a/test/filename_matching.py b/test/filename_matching.py
index 5f49725e..f5321307 100755
--- a/test/filename_matching.py
+++ b/test/filename_matching.py
@@ -9,7 +9,8 @@ import sys
def check_src_files_have_test():
missing_test_files = []
- excluded = ['mitmproxy/contrib/', 'mitmproxy/test/', 'mitmproxy/tools/', 'mitmproxy/platform/']
+ excluded = ['mitmproxy/contrib/', 'mitmproxy/io/proto/',
+ 'mitmproxy/test/', 'mitmproxy/tools/', 'mitmproxy/platform/']
src_files = glob.glob('mitmproxy/**/*.py', recursive=True) + glob.glob('pathod/**/*.py', recursive=True)
src_files = [f for f in src_files if os.path.basename(f) != '__init__.py']
src_files = [f for f in src_files if not any(os.path.normpath(p) in f for p in excluded)]
diff --git a/test/mitmproxy/io/test_db.py b/test/mitmproxy/io/test_db.py
new file mode 100644
index 00000000..4a2dfb67
--- /dev/null
+++ b/test/mitmproxy/io/test_db.py
@@ -0,0 +1,26 @@
+from mitmproxy.io import db
+from mitmproxy.test import tflow
+
+
+class TestDB:
+
+ def test_create(self, tdata):
+ dh = db.DBHandler(db_path=tdata.path("mitmproxy/data") + "/tmp.sqlite")
+ with dh._con as c:
+ cur = c.cursor()
+ cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='FLOWS';")
+ assert cur.fetchall() == [('FLOWS',)]
+
+ def test_roundtrip(self, tdata):
+ dh = db.DBHandler(db_path=tdata.path("mitmproxy/data") + "/tmp.sqlite", mode='write')
+ flows = []
+ for i in range(10):
+ flows.append(tflow.tflow())
+ dh.store(flows)
+ dh = db.DBHandler(db_path=tdata.path("mitmproxy/data") + "/tmp.sqlite")
+ with dh._con as c:
+ cur = c.cursor()
+ cur.execute("SELECT count(*) FROM FLOWS;")
+ assert cur.fetchall()[0][0] == 10
+ loaded_flows = dh.load()
+ assert len(loaded_flows) == len(flows)
diff --git a/test/mitmproxy/io/test_protobuf.py b/test/mitmproxy/io/test_protobuf.py
new file mode 100644
index 00000000..f725b980
--- /dev/null
+++ b/test/mitmproxy/io/test_protobuf.py
@@ -0,0 +1,120 @@
+import pytest
+
+from mitmproxy import certs
+from mitmproxy import http
+from mitmproxy import exceptions
+from mitmproxy.test import tflow, tutils
+from mitmproxy.io import protobuf
+
+
+class TestProtobuf:
+
+ def test_roundtrip_client(self):
+ c = tflow.tclient_conn()
+ del c.reply
+ c.rfile = None
+ c.wfile = None
+ pc = protobuf._dump_http_client_conn(c)
+ lc = protobuf._load_http_client_conn(pc)
+ assert c.__dict__ == lc.__dict__
+
+ def test_roundtrip_client_cert(self, tdata):
+ c = tflow.tclient_conn()
+ c.rfile = None
+ c.wfile = None
+ del c.reply
+ with open(tdata.path("mitmproxy/net/data/clientcert/client.pem"), "rb") as f:
+ d = f.read()
+ c.clientcert = certs.Cert.from_pem(d)
+ pc = protobuf._dump_http_client_conn(c)
+ lc = protobuf._load_http_client_conn(pc)
+ assert c.__dict__ == lc.__dict__
+
+ def test_roundtrip_server(self):
+ s = tflow.tserver_conn()
+ del s.reply
+ s.wfile = None
+ s.rfile = None
+ ps = protobuf._dump_http_server_conn(s)
+ ls = protobuf._load_http_server_conn(ps)
+ assert s.__dict__ == ls.__dict__
+
+ def test_roundtrip_server_cert(self, tdata):
+ s = tflow.tserver_conn()
+ del s.reply
+ s.wfile = None
+ s.rfile = None
+ with open(tdata.path("mitmproxy/net/data/text_cert"), "rb") as f:
+ d = f.read()
+ s.cert = certs.Cert.from_pem(d)
+ ps = protobuf._dump_http_server_conn(s)
+ ls = protobuf._load_http_server_conn(ps)
+ assert s.__dict__ == ls.__dict__
+
+ def test_roundtrip_server_via(self):
+ s = tflow.tserver_conn()
+ s.via = tflow.tserver_conn()
+ del s.reply
+ s.wfile = None
+ s.rfile = None
+ ps = protobuf._dump_http_server_conn(s)
+ ls = protobuf._load_http_server_conn(ps)
+ assert s.__dict__ == ls.__dict__
+ del s.via.reply
+ s.via.wfile = None
+ s.via.rfile = None
+ assert s.via.__dict__ == ls.via.__dict__
+
+ def test_roundtrip_http_request(self):
+ req = http.HTTPRequest.wrap(tutils.treq())
+ preq = protobuf._dump_http_request(req)
+ lreq = protobuf._load_http_request(preq)
+ assert req.__dict__ == lreq.__dict__
+
+ def test_roundtrip_http_request_empty_content(self):
+ req = http.HTTPRequest.wrap(tutils.treq(content=b""))
+ preq = protobuf._dump_http_request(req)
+ lreq = protobuf._load_http_request(preq)
+ assert req.__dict__ == lreq.__dict__
+
+ def test_roundtrip_http_response(self):
+ res = http.HTTPResponse.wrap(tutils.tresp())
+ pres = protobuf._dump_http_response(res)
+ lres = protobuf._load_http_response(pres)
+ assert res.__dict__ == lres.__dict__
+
+ def test_roundtrip_http_response_empty_content(self):
+ res = http.HTTPResponse.wrap(tutils.tresp(content=b""))
+ pres = protobuf._dump_http_response(res)
+ lres = protobuf._load_http_response(pres)
+ assert res.__dict__ == lres.__dict__
+
+ def test_roundtrip_http_error(self):
+ err = tflow.terr()
+ perr = protobuf._dump_http_error(err)
+ lerr = protobuf._load_http_error(perr)
+ assert err.__dict__ == lerr.__dict__
+
+ def test_roundtrip_http_flow_only_req(self):
+ f = tflow.tflow()
+ f.reply = None
+ pf = protobuf.dumps(f)
+ lf = protobuf.loads(pf, "http")
+ assert f.__dict__ == lf.__dict__
+
+ def test_roundtrip_http_flow_res(self):
+ f = tflow.tflow(resp=True)
+ f.reply = None
+ pf = protobuf.dumps(f)
+ lf = protobuf.loads(pf, "http")
+ assert f.__dict__ == lf.__dict__
+
+ def test_unsupported_dumps(self):
+ w = tflow.twebsocketflow()
+ with pytest.raises(exceptions.TypeError):
+ protobuf.dumps(w)
+
+ def test_unsupported_loads(self):
+ b = b"blobs"
+ with pytest.raises(exceptions.TypeError):
+ protobuf.loads(b, 'not-http')
diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py
index 2a96995d..b5e226fe 100644
--- a/test/mitmproxy/tools/console/test_commander.py
+++ b/test/mitmproxy/tools/console/test_commander.py
@@ -28,6 +28,68 @@ class TestListCompleter:
assert c.cycle() == expected
+class TestCommandHistory:
+ def fill_history(self, commands):
+ with taddons.context() as tctx:
+ history = commander.CommandHistory(tctx.master, size=3)
+ for c in commands:
+ cbuf = commander.CommandBuffer(tctx.master, c)
+ history.add_command(cbuf)
+ return history, tctx.master
+
+ def test_add_command(self):
+ commands = ["command1", "command2"]
+ history, tctx_master = self.fill_history(commands)
+
+ saved_commands = [buf.text for buf in history.saved_commands]
+ assert saved_commands == [""] + commands
+
+ # The history size is only 3. So, we forget the first
+ # one command, when adding fourth command
+ cbuf = commander.CommandBuffer(tctx_master, "command3")
+ history.add_command(cbuf)
+ saved_commands = [buf.text for buf in history.saved_commands]
+ assert saved_commands == commands + ["command3"]
+
+ # Commands with the same text are not repeated in the history one by one
+ history.add_command(cbuf)
+ saved_commands = [buf.text for buf in history.saved_commands]
+ assert saved_commands == commands + ["command3"]
+
+ # adding command in execution mode sets index at the beginning of the history
+ # and replace the last command buffer if it is empty or has the same text
+ cbuf = commander.CommandBuffer(tctx_master, "")
+ history.add_command(cbuf)
+ history.index = 0
+ cbuf = commander.CommandBuffer(tctx_master, "command4")
+ history.add_command(cbuf, True)
+ assert history.index == history.last_index
+ saved_commands = [buf.text for buf in history.saved_commands]
+ assert saved_commands == ["command2", "command3", "command4"]
+
+ def test_get_next(self):
+ commands = ["command1", "command2"]
+ history, tctx_master = self.fill_history(commands)
+
+ history.index = -1
+ expected_items = ["", "command1", "command2"]
+ for i in range(3):
+ assert history.get_next().text == expected_items[i]
+ # We are at the last item of the history
+ assert history.get_next() is None
+
+ def test_get_prev(self):
+ commands = ["command1", "command2"]
+ history, tctx_master = self.fill_history(commands)
+
+ expected_items = ["command2", "command1", ""]
+ history.index = history.last_index + 1
+ for i in range(3):
+ assert history.get_prev().text == expected_items[i]
+ # We are at the first item of the history
+ assert history.get_prev() is None
+
+
class TestCommandBuffer:
def test_backspace(self):