aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@corte.si>2017-04-26 11:45:15 +1200
committerAldo Cortesi <aldo@nullcube.com>2017-04-26 19:56:33 +1200
commit5327756377d239f059e84de4063cfcaa592fdb3d (patch)
tree9c8de935d8e627621c98d4daf39ac25e895905e0
parente32efcae49ba5857feae85b9b4651a45d9e5fcc3 (diff)
downloadmitmproxy-5327756377d239f059e84de4063cfcaa592fdb3d.tar.gz
mitmproxy-5327756377d239f059e84de4063cfcaa592fdb3d.tar.bz2
mitmproxy-5327756377d239f059e84de4063cfcaa592fdb3d.zip
Addons and addon testing
- Fix some loading sequence bugs affecting command-line script invocation - Allow addons to over-ride existing options (with a warning). We need this for reloading. - Convert har_dump to new-style arguments, fix and re-instate its test suite. - Covnert miscelaneous other exmples to new-style args.
-rw-r--r--examples/complex/har_dump.py39
-rw-r--r--examples/complex/tls_passthrough.py12
-rw-r--r--examples/simple/filter_flows.py18
-rw-r--r--mitmproxy/addonmanager.py2
-rw-r--r--mitmproxy/addons/script.py3
-rw-r--r--mitmproxy/optmanager.py2
-rw-r--r--mitmproxy/test/taddons.py6
-rw-r--r--mitmproxy/tools/main.py1
-rw-r--r--test/examples/_test_har_dump.py114
-rw-r--r--test/examples/test_har_dump.py86
-rw-r--r--test/mitmproxy/test_addonmanager.py7
-rw-r--r--test/mitmproxy/test_optmanager.py6
12 files changed, 141 insertions, 155 deletions
diff --git a/examples/complex/har_dump.py b/examples/complex/har_dump.py
index 0515d0f5..40d0373c 100644
--- a/examples/complex/har_dump.py
+++ b/examples/complex/har_dump.py
@@ -4,7 +4,6 @@ This inline script can be used to dump flows as HAR files.
import json
-import sys
import base64
import zlib
import os
@@ -15,6 +14,7 @@ from datetime import timezone
import mitmproxy
from mitmproxy import version
+from mitmproxy import ctx
from mitmproxy.utils import strutils
from mitmproxy.net.http import cookies
@@ -26,16 +26,12 @@ SERVERS_SEEN = set()
def load(l):
- """
- Called once on script startup before any other events.
- """
- if len(sys.argv) != 2:
- raise ValueError(
- 'Usage: -s "har_dump.py filename" '
- '(- will output to stdout, filenames ending with .zhar '
- 'will result in compressed har)'
- )
+ l.add_option(
+ "hardump", str, "", "HAR dump path.",
+ )
+
+def configure(updated):
HAR.update({
"log": {
"version": "1.2",
@@ -156,21 +152,20 @@ def done():
"""
Called once on script shutdown, after any other events.
"""
- dump_file = sys.argv[1]
+ if ctx.options.hardump:
+ json_dump = json.dumps(HAR, indent=2) # type: str
- json_dump = json.dumps(HAR, indent=2) # type: str
-
- if dump_file == '-':
- mitmproxy.ctx.log(json_dump)
- else:
- raw = json_dump.encode() # type: bytes
- if dump_file.endswith('.zhar'):
- raw = zlib.compress(raw, 9)
+ if ctx.options.hardump == '-':
+ mitmproxy.ctx.log(json_dump)
+ else:
+ raw = json_dump.encode() # type: bytes
+ if ctx.options.hardump.endswith('.zhar'):
+ raw = zlib.compress(raw, 9)
- with open(os.path.expanduser(dump_file), "wb") as f:
- f.write(raw)
+ with open(os.path.expanduser(ctx.options.hardump), "wb") as f:
+ f.write(raw)
- mitmproxy.ctx.log("HAR dump finished (wrote %s bytes to file)" % len(json_dump))
+ mitmproxy.ctx.log("HAR dump finished (wrote %s bytes to file)" % len(json_dump))
def format_cookies(cookie_list):
diff --git a/examples/complex/tls_passthrough.py b/examples/complex/tls_passthrough.py
index 72c0244b..9bb27d25 100644
--- a/examples/complex/tls_passthrough.py
+++ b/examples/complex/tls_passthrough.py
@@ -23,10 +23,10 @@ Authors: Maximilian Hils, Matthew Tuusberg
import collections
import random
-import sys
from enum import Enum
import mitmproxy
+from mitmproxy import ctx
from mitmproxy.exceptions import TlsProtocolException
from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer
@@ -113,9 +113,15 @@ tls_strategy = None
def load(l):
+ l.add_option(
+ "tlsstrat", int, 0, "TLS passthrough strategy (0-100)",
+ )
+
+
+def configure(updated):
global tls_strategy
- if len(sys.argv) == 2:
- tls_strategy = ProbabilisticStrategy(float(sys.argv[1]))
+ if ctx.options.tlsstrat > 0:
+ tls_strategy = ProbabilisticStrategy(float(ctx.options.tlsstrat) / 100.0)
else:
tls_strategy = ConservativeStrategy()
diff --git a/examples/simple/filter_flows.py b/examples/simple/filter_flows.py
index 896fa54a..fd49425a 100644
--- a/examples/simple/filter_flows.py
+++ b/examples/simple/filter_flows.py
@@ -1,15 +1,21 @@
"""
This scripts demonstrates how to use mitmproxy's filter pattern in scripts.
-Usage:
- mitmdump -s "flowfilter.py FILTER"
"""
-import sys
from mitmproxy import flowfilter
+from mitmproxy import ctx
class Filter:
- def __init__(self, spec):
- self.filter = flowfilter.parse(spec)
+ def __init__(self):
+ self.filter = None
+
+ def configure(self, updated):
+ self.filter = flowfilter.parse(ctx.options.flowfilter)
+
+ def load(self, l):
+ l.add_option(
+ "flowfilter", str, "", "Check that flow matches filter."
+ )
def response(self, flow):
if flowfilter.match(self.filter, flow):
@@ -17,4 +23,4 @@ class Filter:
print(flow)
-addons = [Filter(sys.argv[1])]
+addons = [Filter()]
diff --git a/mitmproxy/addonmanager.py b/mitmproxy/addonmanager.py
index 13c90413..25461338 100644
--- a/mitmproxy/addonmanager.py
+++ b/mitmproxy/addonmanager.py
@@ -83,6 +83,8 @@ class Loader:
help: str,
choices: typing.Optional[typing.Sequence[str]] = None
) -> None:
+ if name in self.master.options:
+ ctx.log.warn("Over-riding existing option %s" % name)
self.master.options.add_option(
name,
typespec,
diff --git a/mitmproxy/addons/script.py b/mitmproxy/addons/script.py
index 0497af85..99a8f6a4 100644
--- a/mitmproxy/addons/script.py
+++ b/mitmproxy/addons/script.py
@@ -47,7 +47,7 @@ class Script:
if time.time() - self.last_load > self.ReloadInterval:
mtime = os.stat(self.path).st_mtime
if mtime > self.last_mtime:
- ctx.log.info("Loading script: %s" % self.name)
+ ctx.log.info("Loading script: %s" % self.path)
if self.ns:
ctx.master.addons.remove(self.ns)
self.ns = load_script(ctx, self.path)
@@ -108,7 +108,6 @@ class ScriptLoader:
if s in current:
ordered.append(current[s])
else:
- ctx.log.info("Loading script: %s" % s)
sc = Script(s)
ordered.append(sc)
newscripts.append(sc)
diff --git a/mitmproxy/optmanager.py b/mitmproxy/optmanager.py
index 5ab3496c..8369a36e 100644
--- a/mitmproxy/optmanager.py
+++ b/mitmproxy/optmanager.py
@@ -104,8 +104,6 @@ class OptManager:
help: str,
choices: typing.Optional[typing.Sequence[str]] = None
) -> None:
- if name in self._options:
- raise ValueError("Option %s already exists" % name)
self._options[name] = _Option(name, typespec, default, help, choices)
@contextlib.contextmanager
diff --git a/mitmproxy/test/taddons.py b/mitmproxy/test/taddons.py
index 39ebb2e6..ea9534af 100644
--- a/mitmproxy/test/taddons.py
+++ b/mitmproxy/test/taddons.py
@@ -120,3 +120,9 @@ class context:
self.configure(sc)
self.master.addons.invoke_addon(sc, "tick")
return sc.addons[0] if sc.addons else None
+
+ def invoke(self, addon, event, *args, **kwargs):
+ """
+ Recursively invoke an event on an addon and all its children.
+ """
+ return self.master.addons.invoke_addon(addon, event, *args, **kwargs)
diff --git a/mitmproxy/tools/main.py b/mitmproxy/tools/main.py
index c0326739..b83a35d1 100644
--- a/mitmproxy/tools/main.py
+++ b/mitmproxy/tools/main.py
@@ -77,6 +77,7 @@ def run(MasterKlass, args, extra=None): # pragma: no cover
server = process_options(parser, opts, args)
master = MasterKlass(opts, server)
master.addons.trigger("configure", opts.keys())
+ master.addons.trigger("tick")
remaining = opts.update_known(**unknown)
if remaining and opts.verbosity > 1:
print("Ignored options: %s" % remaining)
diff --git a/test/examples/_test_har_dump.py b/test/examples/_test_har_dump.py
deleted file mode 100644
index e5cfd2e1..00000000
--- a/test/examples/_test_har_dump.py
+++ /dev/null
@@ -1,114 +0,0 @@
-import json
-import shlex
-import pytest
-
-from mitmproxy import options
-from mitmproxy import proxy
-from mitmproxy import master
-from mitmproxy.addons import script
-
-from mitmproxy.test import tflow
-from mitmproxy.test import tutils
-from mitmproxy.net.http import cookies
-
-example_dir = tutils.test_data.push("../examples")
-
-
-class ScriptError(Exception):
- pass
-
-
-class RaiseMaster(master.Master):
- def add_log(self, e, level):
- if level in ("warn", "error"):
- raise ScriptError(e)
-
-
-def tscript(cmd, args=""):
- o = options.Options()
- cmd = example_dir.path(cmd) + " " + args
- m = RaiseMaster(o, proxy.DummyServer())
- sc = script.Script(cmd)
- m.addons.add(sc)
- return m, sc
-
-
-class TestHARDump:
-
- def flow(self, resp_content=b'message'):
- times = dict(
- timestamp_start=746203272,
- timestamp_end=746203272,
- )
-
- # Create a dummy flow for testing
- return tflow.tflow(
- req=tutils.treq(method=b'GET', **times),
- resp=tutils.tresp(content=resp_content, **times)
- )
-
- def test_no_file_arg(self):
- with pytest.raises(ScriptError):
- tscript("complex/har_dump.py")
-
- def test_simple(self, tmpdir):
- path = str(tmpdir.join("somefile"))
-
- m, sc = tscript("complex/har_dump.py", shlex.quote(path))
- m.addons.trigger("response", self.flow())
- m.addons.remove(sc)
-
- with open(path, "r") as inp:
- har = json.load(inp)
- assert len(har["log"]["entries"]) == 1
-
- def test_base64(self, tmpdir):
- path = str(tmpdir.join("somefile"))
-
- m, sc = tscript("complex/har_dump.py", shlex.quote(path))
- m.addons.trigger(
- "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)
- )
- m.addons.remove(sc)
-
- with open(path, "r") as inp:
- har = json.load(inp)
- assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64"
-
- def test_format_cookies(self):
- m, sc = tscript("complex/har_dump.py", "-")
- format_cookies = sc.ns.format_cookies
-
- CA = cookies.CookieAttrs
-
- f = format_cookies([("n", "v", CA([("k", "v")]))])[0]
- assert f['name'] == "n"
- assert f['value'] == "v"
- assert not f['httpOnly']
- assert not f['secure']
-
- f = format_cookies([("n", "v", CA([("httponly", None), ("secure", None)]))])[0]
- assert f['httpOnly']
- assert f['secure']
-
- f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0]
- assert f['expires']
-
- def test_binary(self, tmpdir):
-
- f = self.flow()
- f.request.method = "POST"
- f.request.headers["content-type"] = "application/x-www-form-urlencoded"
- f.request.content = b"foo=bar&baz=s%c3%bc%c3%9f"
- f.response.headers["random-junk"] = bytes(range(256))
- f.response.content = bytes(range(256))
-
- path = str(tmpdir.join("somefile"))
-
- m, sc = tscript("complex/har_dump.py", shlex.quote(path))
- m.addons.trigger("response", f)
- m.addons.remove(sc)
-
- with open(path, "r") as inp:
- har = json.load(inp)
- assert len(har["log"]["entries"]) == 1
diff --git a/test/examples/test_har_dump.py b/test/examples/test_har_dump.py
new file mode 100644
index 00000000..11cd5c29
--- /dev/null
+++ b/test/examples/test_har_dump.py
@@ -0,0 +1,86 @@
+import json
+
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+from mitmproxy.test import taddons
+from mitmproxy.net.http import cookies
+
+example_dir = tutils.test_data.push("../examples")
+
+
+class TestHARDump:
+ def flow(self, resp_content=b'message'):
+ times = dict(
+ timestamp_start=746203272,
+ timestamp_end=746203272,
+ )
+
+ # Create a dummy flow for testing
+ return tflow.tflow(
+ req=tutils.treq(method=b'GET', **times),
+ resp=tutils.tresp(content=resp_content, **times)
+ )
+
+ def test_simple(self, tmpdir):
+ with taddons.context() as tctx:
+ a = tctx.script(example_dir.path("complex/har_dump.py"))
+ path = str(tmpdir.join("somefile"))
+ tctx.configure(a, hardump=path)
+ tctx.invoke(a, "response", self.flow())
+ tctx.invoke(a, "done")
+ with open(path, "r") as inp:
+ har = json.load(inp)
+ assert len(har["log"]["entries"]) == 1
+
+ def test_base64(self, tmpdir):
+ with taddons.context() as tctx:
+ a = tctx.script(example_dir.path("complex/har_dump.py"))
+ path = str(tmpdir.join("somefile"))
+ tctx.configure(a, hardump=path)
+
+ tctx.invoke(
+ a, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)
+ )
+ tctx.invoke(a, "done")
+ with open(path, "r") as inp:
+ har = json.load(inp)
+ assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64"
+
+ def test_format_cookies(self):
+ with taddons.context() as tctx:
+ a = tctx.script(example_dir.path("complex/har_dump.py"))
+
+ CA = cookies.CookieAttrs
+
+ f = a.format_cookies([("n", "v", CA([("k", "v")]))])[0]
+ assert f['name'] == "n"
+ assert f['value'] == "v"
+ assert not f['httpOnly']
+ assert not f['secure']
+
+ f = a.format_cookies([("n", "v", CA([("httponly", None), ("secure", None)]))])[0]
+ assert f['httpOnly']
+ assert f['secure']
+
+ f = a.format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0]
+ assert f['expires']
+
+ def test_binary(self, tmpdir):
+ with taddons.context() as tctx:
+ a = tctx.script(example_dir.path("complex/har_dump.py"))
+ path = str(tmpdir.join("somefile"))
+ tctx.configure(a, hardump=path)
+
+ f = self.flow()
+ f.request.method = "POST"
+ f.request.headers["content-type"] = "application/x-www-form-urlencoded"
+ f.request.content = b"foo=bar&baz=s%c3%bc%c3%9f"
+ f.response.headers["random-junk"] = bytes(range(256))
+ f.response.content = bytes(range(256))
+
+ tctx.invoke(a, "response", f)
+ tctx.invoke(a, "done")
+
+ with open(path, "r") as inp:
+ har = json.load(inp)
+ assert len(har["log"]["entries"]) == 1
diff --git a/test/mitmproxy/test_addonmanager.py b/test/mitmproxy/test_addonmanager.py
index cba40412..7b461580 100644
--- a/test/mitmproxy/test_addonmanager.py
+++ b/test/mitmproxy/test_addonmanager.py
@@ -76,6 +76,13 @@ def test_defaults():
assert addons.default_addons()
+def test_loader():
+ with taddons.context() as tctx:
+ l = addonmanager.Loader(tctx.master)
+ l.add_option("custom_option", bool, False, "help")
+ l.add_option("custom_option", bool, False, "help")
+
+
def test_simple():
with taddons.context() as tctx:
a = tctx.master.addons
diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py
index 31b6e52b..a685570f 100644
--- a/test/mitmproxy/test_optmanager.py
+++ b/test/mitmproxy/test_optmanager.py
@@ -38,12 +38,6 @@ class TM(optmanager.OptManager):
self.add_option("one", typing.Optional[str], None, "help")
-def test_add_option():
- o = TO()
- with pytest.raises(ValueError, match="already exists"):
- o.add_option("one", typing.Optional[int], None, "help")
-
-
def test_defaults():
o = TD2()
defaults = {