aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/addonmanager.py9
-rw-r--r--mitmproxy/addons/__init__.py4
-rw-r--r--mitmproxy/addons/save.py (renamed from mitmproxy/addons/streamfile.py)52
-rw-r--r--mitmproxy/addons/view.py26
-rw-r--r--mitmproxy/command.py91
-rw-r--r--mitmproxy/exceptions.py6
-rw-r--r--mitmproxy/master.py2
-rw-r--r--mitmproxy/options.py6
-rw-r--r--mitmproxy/optmanager.py4
-rw-r--r--mitmproxy/test/taddons.py8
-rw-r--r--mitmproxy/tools/cmdline.py9
-rw-r--r--mitmproxy/tools/console/command.py27
-rw-r--r--mitmproxy/tools/console/flowlist.py10
-rw-r--r--mitmproxy/tools/console/master.py2
-rw-r--r--mitmproxy/tools/console/signals.py3
-rw-r--r--mitmproxy/tools/console/statusbar.py14
-rw-r--r--mitmproxy/tools/main.py11
-rw-r--r--mitmproxy/utils/typecheck.py44
-rw-r--r--test/mitmproxy/addons/test_save.py83
-rw-r--r--test/mitmproxy/addons/test_streamfile.py62
-rw-r--r--test/mitmproxy/addons/test_view.py50
-rw-r--r--test/mitmproxy/test_addonmanager.py9
-rw-r--r--test/mitmproxy/test_command.py74
-rw-r--r--test/mitmproxy/utils/test_typecheck.py66
24 files changed, 523 insertions, 149 deletions
diff --git a/mitmproxy/addonmanager.py b/mitmproxy/addonmanager.py
index 25461338..ea23b6ff 100644
--- a/mitmproxy/addonmanager.py
+++ b/mitmproxy/addonmanager.py
@@ -93,6 +93,9 @@ class Loader:
choices
)
+ def add_command(self, path: str, func: typing.Callable) -> None:
+ self.master.commands.add(path, func)
+
def traverse(chain):
"""
@@ -142,7 +145,7 @@ class AddonManager:
for a in traverse([addon]):
name = _get_name(a)
if name in self.lookup:
- raise exceptions.AddonError(
+ raise exceptions.AddonManagerError(
"An addon called '%s' already exists." % name
)
l = Loader(self.master)
@@ -172,7 +175,7 @@ class AddonManager:
for a in traverse([addon]):
n = _get_name(a)
if n not in self.lookup:
- raise exceptions.AddonError("No such addon: %s" % n)
+ raise exceptions.AddonManagerError("No such addon: %s" % n)
self.chain = [i for i in self.chain if i is not a]
del self.lookup[_get_name(a)]
with self.master.handlecontext():
@@ -221,7 +224,7 @@ class AddonManager:
func = getattr(a, name, None)
if func:
if not callable(func):
- raise exceptions.AddonError(
+ raise exceptions.AddonManagerError(
"Addon handler %s not callable" % name
)
func(*args, **kwargs)
diff --git a/mitmproxy/addons/__init__.py b/mitmproxy/addons/__init__.py
index 7a45106c..e87f2cbd 100644
--- a/mitmproxy/addons/__init__.py
+++ b/mitmproxy/addons/__init__.py
@@ -14,7 +14,7 @@ from mitmproxy.addons import setheaders
from mitmproxy.addons import stickyauth
from mitmproxy.addons import stickycookie
from mitmproxy.addons import streambodies
-from mitmproxy.addons import streamfile
+from mitmproxy.addons import save
from mitmproxy.addons import upstream_auth
@@ -36,6 +36,6 @@ def default_addons():
stickyauth.StickyAuth(),
stickycookie.StickyCookie(),
streambodies.StreamBodies(),
- streamfile.StreamFile(),
+ save.Save(),
upstream_auth.UpstreamAuth(),
]
diff --git a/mitmproxy/addons/streamfile.py b/mitmproxy/addons/save.py
index fde5a1c5..37dc6021 100644
--- a/mitmproxy/addons/streamfile.py
+++ b/mitmproxy/addons/save.py
@@ -1,21 +1,31 @@
import os.path
+import typing
from mitmproxy import exceptions
from mitmproxy import flowfilter
from mitmproxy import io
from mitmproxy import ctx
+from mitmproxy import flow
-class StreamFile:
+class Save:
def __init__(self):
self.stream = None
self.filt = None
self.active_flows = set() # type: Set[flow.Flow]
- def start_stream_to_path(self, path, mode, flt):
+ def open_file(self, path):
+ if path.startswith("+"):
+ path = path[1:]
+ mode = "ab"
+ else:
+ mode = "wb"
path = os.path.expanduser(path)
+ return open(path, mode)
+
+ def start_stream_to_path(self, path, flt):
try:
- f = open(path, mode)
+ f = self.open_file(path)
except IOError as v:
raise exceptions.OptionsError(str(v))
self.stream = io.FilteredFlowWriter(f, flt)
@@ -23,26 +33,32 @@ class StreamFile:
def configure(self, updated):
# We're already streaming - stop the previous stream and restart
- if "streamfile_filter" in updated:
- if ctx.options.streamfile_filter:
- self.filt = flowfilter.parse(ctx.options.streamfile_filter)
+ if "save_stream_filter" in updated:
+ if ctx.options.save_stream_filter:
+ self.filt = flowfilter.parse(ctx.options.save_stream_filter)
if not self.filt:
raise exceptions.OptionsError(
- "Invalid filter specification: %s" % ctx.options.streamfile_filter
+ "Invalid filter specification: %s" % ctx.options.save_stream_filter
)
else:
self.filt = None
- if "streamfile" in updated:
+ if "save_stream_file" in updated:
if self.stream:
self.done()
- if ctx.options.streamfile:
- if ctx.options.streamfile.startswith("+"):
- path = ctx.options.streamfile[1:]
- mode = "ab"
- else:
- path = ctx.options.streamfile
- mode = "wb"
- self.start_stream_to_path(path, mode, self.filt)
+ if ctx.options.save_stream_file:
+ self.start_stream_to_path(ctx.options.save_stream_file, self.filt)
+
+ def save(self, flows: typing.Sequence[flow.Flow], path: str) -> None:
+ try:
+ f = self.open_file(path)
+ except IOError as v:
+ raise exceptions.CommandError(v) from v
+ stream = io.FlowWriter(f)
+ for i in flows:
+ stream.add(i)
+
+ def load(self, l):
+ l.add_command("save.file", self.save)
def tcp_start(self, flow):
if self.stream:
@@ -64,8 +80,8 @@ class StreamFile:
def done(self):
if self.stream:
- for flow in self.active_flows:
- self.stream.add(flow)
+ for f in self.active_flows:
+ self.stream.add(f)
self.active_flows = set([])
self.stream.fo.close()
self.stream = None
diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py
index 341958c2..63416b9f 100644
--- a/mitmproxy/addons/view.py
+++ b/mitmproxy/addons/view.py
@@ -111,10 +111,8 @@ class View(collections.Sequence):
self.default_order = OrderRequestStart(self)
self.orders = dict(
- time = OrderRequestStart(self),
- method = OrderRequestMethod(self),
- url = OrderRequestURL(self),
- size = OrderKeySize(self),
+ time = OrderRequestStart(self), method = OrderRequestMethod(self),
+ url = OrderRequestURL(self), size = OrderKeySize(self),
)
self.order_key = self.default_order
self.order_reversed = False
@@ -324,6 +322,26 @@ class View(collections.Sequence):
if "console_focus_follow" in updated:
self.focus_follow = ctx.options.console_focus_follow
+ def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
+ if spec == "@focus":
+ return [self.focus.flow] if self.focus.flow else []
+ elif spec == "@shown":
+ return [i for i in self]
+ elif spec == "@hidden":
+ return [i for i in self._store.values() if i not in self._view]
+ elif spec == "@marked":
+ return [i for i in self._store.values() if i.marked]
+ elif spec == "@unmarked":
+ return [i for i in self._store.values() if not i.marked]
+ else:
+ filt = flowfilter.parse(spec)
+ if not filt:
+ raise exceptions.CommandError("Invalid flow filter: %s" % spec)
+ return [i for i in self._store.values() if filt(i)]
+
+ def load(self, l):
+ l.add_command("console.resolve", self.resolve)
+
def request(self, f):
self.add(f)
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
new file mode 100644
index 00000000..acf938d5
--- /dev/null
+++ b/mitmproxy/command.py
@@ -0,0 +1,91 @@
+import inspect
+import typing
+import shlex
+from mitmproxy.utils import typecheck
+from mitmproxy import exceptions
+from mitmproxy import flow
+
+
+def typename(t: type, ret: bool) -> str:
+ """
+ Translates a type to an explanatory string. Ifl ret is True, we're
+ looking at a return type, else we're looking at a parameter type.
+ """
+ if t in (str, int, bool):
+ return t.__name__
+ if t == typing.Sequence[flow.Flow]:
+ return "[flow]" if ret else "flowspec"
+ else: # pragma: no cover
+ raise NotImplementedError(t)
+
+
+class Command:
+ def __init__(self, manager, path, func) -> None:
+ self.path = path
+ self.manager = manager
+ self.func = func
+ sig = inspect.signature(self.func)
+ self.paramtypes = [v.annotation for v in sig.parameters.values()]
+ self.returntype = sig.return_annotation
+
+ def signature_help(self) -> str:
+ params = " ".join([typename(i, False) for i in self.paramtypes])
+ ret = " -> " + typename(self.returntype, True) if self.returntype else ""
+ return "%s %s%s" % (self.path, params, ret)
+
+ def call(self, args: typing.Sequence[str]):
+ """
+ Call the command with a set of arguments. At this point, all argumets are strings.
+ """
+ if len(self.paramtypes) != len(args):
+ raise exceptions.CommandError("Usage: %s" % self.signature_help())
+
+ pargs = []
+ for i in range(len(args)):
+ pargs.append(parsearg(self.manager, args[i], self.paramtypes[i]))
+
+ with self.manager.master.handlecontext():
+ ret = self.func(*pargs)
+
+ if not typecheck.check_command_return_type(ret, self.returntype):
+ raise exceptions.CommandError("Command returned unexpected data")
+
+ return ret
+
+
+class CommandManager:
+ def __init__(self, master):
+ self.master = master
+ self.commands = {}
+
+ def add(self, path: str, func: typing.Callable):
+ self.commands[path] = Command(self, path, func)
+
+ def call_args(self, path, args):
+ """
+ Call a command using a list of string arguments. May raise CommandError.
+ """
+ if path not in self.commands:
+ raise exceptions.CommandError("Unknown command: %s" % path)
+ return self.commands[path].call(args)
+
+ def call(self, cmdstr: str):
+ """
+ Call a command using a string. May raise CommandError.
+ """
+ parts = shlex.split(cmdstr)
+ if not len(parts) >= 1:
+ raise exceptions.CommandError("Invalid command: %s" % cmdstr)
+ return self.call_args(parts[0], parts[1:])
+
+
+def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
+ """
+ Convert a string to a argument to the appropriate type.
+ """
+ if argtype == str:
+ return spec
+ elif argtype == typing.Sequence[flow.Flow]:
+ return manager.call_args("console.resolve", [spec])
+ else:
+ raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
diff --git a/mitmproxy/exceptions.py b/mitmproxy/exceptions.py
index 9b6328ac..71517480 100644
--- a/mitmproxy/exceptions.py
+++ b/mitmproxy/exceptions.py
@@ -93,11 +93,15 @@ class SetServerNotAllowedException(MitmproxyException):
pass
+class CommandError(Exception):
+ pass
+
+
class OptionsError(MitmproxyException):
pass
-class AddonError(MitmproxyException):
+class AddonManagerError(MitmproxyException):
pass
diff --git a/mitmproxy/master.py b/mitmproxy/master.py
index 94900915..2a032c4a 100644
--- a/mitmproxy/master.py
+++ b/mitmproxy/master.py
@@ -8,6 +8,7 @@ from mitmproxy import controller
from mitmproxy import eventsequence
from mitmproxy import exceptions
from mitmproxy import connections
+from mitmproxy import command
from mitmproxy import http
from mitmproxy import log
from mitmproxy.proxy.protocol import http_replay
@@ -34,6 +35,7 @@ class Master:
"""
def __init__(self, opts, server):
self.options = opts or options.Options()
+ self.commands = command.CommandManager(self)
self.addons = addonmanager.AddonManager(self)
self.event_queue = queue.Queue()
self.should_exit = threading.Event()
diff --git a/mitmproxy/options.py b/mitmproxy/options.py
index 8f8c1484..e477bed5 100644
--- a/mitmproxy/options.py
+++ b/mitmproxy/options.py
@@ -159,11 +159,11 @@ class Options(optmanager.OptManager):
choices = [i.name.lower() for i in contentviews.views]
)
self.add_option(
- "streamfile", Optional[str], None,
- "Write flows to file. Prefix path with + to append."
+ "save_stream_file", Optional[str], None,
+ "Stream flows to file as they arrive. Prefix path with + to append."
)
self.add_option(
- "streamfile_filter", Optional[str], None,
+ "save_stream_filter", Optional[str], None,
"Filter which flows are written to file."
)
self.add_option(
diff --git a/mitmproxy/optmanager.py b/mitmproxy/optmanager.py
index 8369a36e..cf6e21b0 100644
--- a/mitmproxy/optmanager.py
+++ b/mitmproxy/optmanager.py
@@ -31,7 +31,7 @@ class _Option:
help: str,
choices: typing.Optional[typing.Sequence[str]]
) -> None:
- typecheck.check_type(name, default, typespec)
+ typecheck.check_option_type(name, default, typespec)
self.name = name
self.typespec = typespec
self._default = default
@@ -54,7 +54,7 @@ class _Option:
return copy.deepcopy(v)
def set(self, value: typing.Any) -> None:
- typecheck.check_type(self.name, value, self.typespec)
+ typecheck.check_option_type(self.name, value, self.typespec)
self.value = value
def reset(self) -> None:
diff --git a/mitmproxy/test/taddons.py b/mitmproxy/test/taddons.py
index ea9534af..5680e847 100644
--- a/mitmproxy/test/taddons.py
+++ b/mitmproxy/test/taddons.py
@@ -6,6 +6,7 @@ import mitmproxy.options
from mitmproxy import proxy
from mitmproxy import addonmanager
from mitmproxy import eventsequence
+from mitmproxy import command
from mitmproxy.addons import script
@@ -126,3 +127,10 @@ class context:
Recursively invoke an event on an addon and all its children.
"""
return self.master.addons.invoke_addon(addon, event, *args, **kwargs)
+
+ def command(self, func, *args):
+ """
+ Invoke a command function with a list of string arguments within a command context, mimicing the actual command environment.
+ """
+ cmd = command.Command(self.master.commands, "test.command", func)
+ return cmd.call(args)
diff --git a/mitmproxy/tools/cmdline.py b/mitmproxy/tools/cmdline.py
index fbdbce52..ca83d50e 100644
--- a/mitmproxy/tools/cmdline.py
+++ b/mitmproxy/tools/cmdline.py
@@ -26,6 +26,11 @@ def common_options(parser, opts):
help="Show all options and their default values",
)
parser.add_argument(
+ '--commands',
+ action='store_true',
+ help="Show all commands and their signatures",
+ )
+ parser.add_argument(
"--conf",
type=str, dest="conf", default=CONFIG_PATH,
metavar="PATH",
@@ -61,7 +66,7 @@ def common_options(parser, opts):
opts.make_parser(parser, "scripts", metavar="SCRIPT", short="s")
opts.make_parser(parser, "stickycookie", metavar="FILTER")
opts.make_parser(parser, "stickyauth", metavar="FILTER")
- opts.make_parser(parser, "streamfile", metavar="PATH", short="w")
+ opts.make_parser(parser, "save_stream_file", metavar="PATH", short="w")
opts.make_parser(parser, "anticomp")
# Proxy options
@@ -123,7 +128,7 @@ def mitmdump(opts):
nargs="...",
help="""
Filter expression, equivalent to setting both the view_filter
- and streamfile_filter options.
+ and save_stream_filter options.
"""
)
return parser
diff --git a/mitmproxy/tools/console/command.py b/mitmproxy/tools/console/command.py
new file mode 100644
index 00000000..4cb4fe6d
--- /dev/null
+++ b/mitmproxy/tools/console/command.py
@@ -0,0 +1,27 @@
+import urwid
+
+from mitmproxy import exceptions
+from mitmproxy.tools.console import signals
+
+
+class CommandEdit(urwid.Edit):
+ def __init__(self):
+ urwid.Edit.__init__(self, ":", "")
+
+ def keypress(self, size, key):
+ return urwid.Edit.keypress(self, size, key)
+
+
+class CommandExecutor:
+ def __init__(self, master):
+ self.master = master
+
+ def __call__(self, cmd):
+ if cmd.strip():
+ try:
+ ret = self.master.commands.call(cmd)
+ except exceptions.CommandError as v:
+ signals.status_message.send(message=str(v))
+ else:
+ if type(ret) == str:
+ signals.status_message.send(message=ret)
diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py
index 044f8f05..00e5cf4e 100644
--- a/mitmproxy/tools/console/flowlist.py
+++ b/mitmproxy/tools/console/flowlist.py
@@ -353,7 +353,9 @@ class FlowListBox(urwid.ListBox):
def keypress(self, size, key):
key = common.shortcuts(key)
- if key == "A":
+ if key == ":":
+ signals.status_prompt_command.send()
+ elif key == "A":
for f in self.master.view:
if f.intercepted:
f.resume()
@@ -409,13 +411,13 @@ class FlowListBox(urwid.ListBox):
val = not self.master.options.console_order_reversed
self.master.options.console_order_reversed = val
elif key == "W":
- if self.master.options.streamfile:
- self.master.options.streamfile = None
+ if self.master.options.save_stream_file:
+ self.master.options.save_stream_file = None
else:
signals.status_prompt_path.send(
self,
prompt="Stream flows to",
- callback= lambda path: self.master.options.update(streamfile=path)
+ callback= lambda path: self.master.options.update(save_stream_file=path)
)
else:
return urwid.ListBox.keypress(self, size, key)
diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py
index e7a2c6ae..8727d175 100644
--- a/mitmproxy/tools/console/master.py
+++ b/mitmproxy/tools/console/master.py
@@ -288,7 +288,7 @@ class ConsoleMaster(master.Master):
screen = self.ui,
handle_mouse = self.options.console_mouse,
)
- self.ab = statusbar.ActionBar()
+ self.ab = statusbar.ActionBar(self)
self.loop.set_alarm_in(0.01, self.ticker)
self.loop.set_alarm_in(
diff --git a/mitmproxy/tools/console/signals.py b/mitmproxy/tools/console/signals.py
index 93f09577..91cb63b3 100644
--- a/mitmproxy/tools/console/signals.py
+++ b/mitmproxy/tools/console/signals.py
@@ -24,6 +24,9 @@ status_prompt_path = blinker.Signal()
# Prompt for a single keystroke
status_prompt_onekey = blinker.Signal()
+# Prompt for a command
+status_prompt_command = blinker.Signal()
+
# Call a callback in N seconds
call_in = blinker.Signal()
diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py
index d3a3e1f2..1930fa2f 100644
--- a/mitmproxy/tools/console/statusbar.py
+++ b/mitmproxy/tools/console/statusbar.py
@@ -5,6 +5,7 @@ import urwid
from mitmproxy.tools.console import common
from mitmproxy.tools.console import pathedit
from mitmproxy.tools.console import signals
+from mitmproxy.tools.console import command
import mitmproxy.tools.console.master # noqa
@@ -32,13 +33,15 @@ class PromptStub:
class ActionBar(urwid.WidgetWrap):
- def __init__(self):
+ def __init__(self, master):
urwid.WidgetWrap.__init__(self, None)
+ self.master = master
self.clear()
signals.status_message.connect(self.sig_message)
signals.status_prompt.connect(self.sig_prompt)
signals.status_prompt_path.connect(self.sig_path_prompt)
signals.status_prompt_onekey.connect(self.sig_prompt_onekey)
+ signals.status_prompt_command.connect(self.sig_prompt_command)
self.last_path = ""
@@ -66,6 +69,11 @@ class ActionBar(urwid.WidgetWrap):
self._w = urwid.Edit(self.prep_prompt(prompt), text or "")
self.prompting = PromptStub(callback, args)
+ def sig_prompt_command(self, sender):
+ signals.focus.send(self, section="footer")
+ self._w = command.CommandEdit()
+ self.prompting = command.CommandExecutor(self.master)
+
def sig_path_prompt(self, sender, prompt, callback, args=()):
signals.focus.send(self, section="footer")
self._w = pathedit.PathEdit(
@@ -243,8 +251,8 @@ class StatusBar(urwid.WidgetWrap):
r.append(("heading_key", "s"))
r.append("cripts:%s]" % len(self.master.options.scripts))
- if self.master.options.streamfile:
- r.append("[W:%s]" % self.master.options.streamfile)
+ if self.master.options.save_stream_file:
+ r.append("[W:%s]" % self.master.options.save_stream_file)
return r
diff --git a/mitmproxy/tools/main.py b/mitmproxy/tools/main.py
index b83a35d1..fefdca5c 100644
--- a/mitmproxy/tools/main.py
+++ b/mitmproxy/tools/main.py
@@ -39,7 +39,7 @@ def process_options(parser, opts, args):
if args.version:
print(debug.dump_system_info())
sys.exit(0)
- if args.quiet or args.options:
+ if args.quiet or args.options or args.commands:
args.verbosity = 0
args.flow_detail = 0
@@ -84,6 +84,13 @@ def run(MasterKlass, args, extra=None): # pragma: no cover
if args.options:
print(optmanager.dump_defaults(opts))
sys.exit(0)
+ if args.commands:
+ cmds = []
+ for c in master.commands.commands.values():
+ cmds.append(c.signature_help())
+ for i in sorted(cmds):
+ print(i)
+ sys.exit(0)
opts.set(*args.setoptions)
if extra:
opts.update(**extra(args))
@@ -120,7 +127,7 @@ def mitmdump(args=None): # pragma: no cover
v = " ".join(args.filter_args)
return dict(
view_filter = v,
- streamfile_filter = v,
+ save_stream_filter = v,
)
return {}
diff --git a/mitmproxy/utils/typecheck.py b/mitmproxy/utils/typecheck.py
index 628ea642..20791e17 100644
--- a/mitmproxy/utils/typecheck.py
+++ b/mitmproxy/utils/typecheck.py
@@ -1,20 +1,37 @@
import typing
-def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
+def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
"""
- This function checks if the provided value is an instance of typeinfo
- and raises a TypeError otherwise.
+ Check if the provided value is an instance of typeinfo. Returns True if the
+ types match, False otherwise. This function supports only those types
+ required for command return values.
+ """
+ typename = str(typeinfo)
+ if typename.startswith("typing.Sequence"):
+ try:
+ T = typeinfo.__args__[0] # type: ignore
+ except AttributeError:
+ # Python 3.5.0
+ T = typeinfo.__parameters__[0] # type: ignore
+ if not isinstance(value, (tuple, list)):
+ return False
+ for v in value:
+ if not check_command_return_type(v, T):
+ return False
+ elif value is None and typeinfo is None:
+ return True
+ elif not isinstance(value, typeinfo):
+ return False
+ return True
- The following types from the typing package have specialized support:
- - Union
- - Tuple
- - IO
+def check_option_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
+ """
+ Check if the provided value is an instance of typeinfo and raises a
+ TypeError otherwise. This function supports only those types required for
+ options.
"""
- # If we realize that we need to extend this list substantially, it may make sense
- # to use typeguard for this, but right now it's not worth the hassle for 16 lines of code.
-
e = TypeError("Expected {} for {}, but got {}.".format(
typeinfo,
name,
@@ -32,7 +49,7 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
for T in types:
try:
- check_type(name, value, T)
+ check_option_type(name, value, T)
except TypeError:
pass
else:
@@ -50,7 +67,7 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
if len(types) != len(value):
raise e
for i, (x, T) in enumerate(zip(value, types)):
- check_type("{}[{}]".format(name, i), x, T)
+ check_option_type("{}[{}]".format(name, i), x, T)
return
elif typename.startswith("typing.Sequence"):
try:
@@ -58,11 +75,10 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
except AttributeError:
# Python 3.5.0
T = typeinfo.__parameters__[0] # type: ignore
-
if not isinstance(value, (tuple, list)):
raise e
for v in value:
- check_type(name, v, T)
+ check_option_type(name, v, T)
elif typename.startswith("typing.IO"):
if hasattr(value, "read"):
return
diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py
new file mode 100644
index 00000000..85c2a398
--- /dev/null
+++ b/test/mitmproxy/addons/test_save.py
@@ -0,0 +1,83 @@
+import pytest
+
+from mitmproxy.test import taddons
+from mitmproxy.test import tflow
+
+from mitmproxy import io
+from mitmproxy import exceptions
+from mitmproxy import options
+from mitmproxy.addons import save
+from mitmproxy.addons import view
+
+
+def test_configure(tmpdir):
+ sa = save.Save()
+ with taddons.context(options=options.Options()) as tctx:
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(sa, save_stream_file=str(tmpdir))
+ with pytest.raises(Exception, match="Invalid filter"):
+ tctx.configure(
+ sa, save_stream_file=str(tmpdir.join("foo")), save_stream_filter="~~"
+ )
+ tctx.configure(sa, save_stream_filter="foo")
+ assert sa.filt
+ tctx.configure(sa, save_stream_filter=None)
+ assert not sa.filt
+
+
+def rd(p):
+ x = io.FlowReader(open(p, "rb"))
+ return list(x.stream())
+
+
+def test_tcp(tmpdir):
+ sa = save.Save()
+ with taddons.context() as tctx:
+ p = str(tmpdir.join("foo"))
+ tctx.configure(sa, save_stream_file=p)
+
+ tt = tflow.ttcpflow()
+ sa.tcp_start(tt)
+ sa.tcp_end(tt)
+ tctx.configure(sa, save_stream_file=None)
+ assert rd(p)
+
+
+def test_save_command(tmpdir):
+ sa = save.Save()
+ with taddons.context() as tctx:
+ p = str(tmpdir.join("foo"))
+ sa.save([tflow.tflow(resp=True)], p)
+ assert len(rd(p)) == 1
+ sa.save([tflow.tflow(resp=True)], p)
+ assert len(rd(p)) == 1
+ sa.save([tflow.tflow(resp=True)], "+" + p)
+ assert len(rd(p)) == 2
+
+ with pytest.raises(exceptions.CommandError):
+ sa.save([tflow.tflow(resp=True)], str(tmpdir))
+
+ v = view.View()
+ tctx.master.addons.add(v)
+ tctx.master.addons.add(sa)
+ tctx.master.commands.call_args("save.file", ["@shown", p])
+
+
+def test_simple(tmpdir):
+ sa = save.Save()
+ with taddons.context() as tctx:
+ p = str(tmpdir.join("foo"))
+
+ tctx.configure(sa, save_stream_file=p)
+
+ f = tflow.tflow(resp=True)
+ sa.request(f)
+ sa.response(f)
+ tctx.configure(sa, save_stream_file=None)
+ assert rd(p)[0].response
+
+ tctx.configure(sa, save_stream_file="+" + p)
+ f = tflow.tflow()
+ sa.request(f)
+ tctx.configure(sa, save_stream_file=None)
+ assert not rd(p)[1].response
diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py
deleted file mode 100644
index bcb27c79..00000000
--- a/test/mitmproxy/addons/test_streamfile.py
+++ /dev/null
@@ -1,62 +0,0 @@
-import pytest
-
-from mitmproxy.test import taddons
-from mitmproxy.test import tflow
-
-from mitmproxy import io
-from mitmproxy import exceptions
-from mitmproxy import options
-from mitmproxy.addons import streamfile
-
-
-def test_configure(tmpdir):
- sa = streamfile.StreamFile()
- with taddons.context(options=options.Options()) as tctx:
- with pytest.raises(exceptions.OptionsError):
- tctx.configure(sa, streamfile=str(tmpdir))
- with pytest.raises(Exception, match="Invalid filter"):
- tctx.configure(
- sa, streamfile=str(tmpdir.join("foo")), streamfile_filter="~~"
- )
- tctx.configure(sa, streamfile_filter="foo")
- assert sa.filt
- tctx.configure(sa, streamfile_filter=None)
- assert not sa.filt
-
-
-def rd(p):
- x = io.FlowReader(open(p, "rb"))
- return list(x.stream())
-
-
-def test_tcp(tmpdir):
- sa = streamfile.StreamFile()
- with taddons.context() as tctx:
- p = str(tmpdir.join("foo"))
- tctx.configure(sa, streamfile=p)
-
- tt = tflow.ttcpflow()
- sa.tcp_start(tt)
- sa.tcp_end(tt)
- tctx.configure(sa, streamfile=None)
- assert rd(p)
-
-
-def test_simple(tmpdir):
- sa = streamfile.StreamFile()
- with taddons.context() as tctx:
- p = str(tmpdir.join("foo"))
-
- tctx.configure(sa, streamfile=p)
-
- f = tflow.tflow(resp=True)
- sa.request(f)
- sa.response(f)
- tctx.configure(sa, streamfile=None)
- assert rd(p)[0].response
-
- tctx.configure(sa, streamfile="+" + p)
- f = tflow.tflow()
- sa.request(f)
- tctx.configure(sa, streamfile=None)
- assert not rd(p)[1].response
diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py
index 7fa3819e..05d4af30 100644
--- a/test/mitmproxy/addons/test_view.py
+++ b/test/mitmproxy/addons/test_view.py
@@ -5,6 +5,7 @@ from mitmproxy.test import tflow
from mitmproxy.addons import view
from mitmproxy import flowfilter
from mitmproxy import options
+from mitmproxy import exceptions
from mitmproxy.test import taddons
@@ -130,6 +131,55 @@ def test_filter():
assert len(v) == 4
+def test_load():
+ v = view.View()
+ with taddons.context(options=options.Options()) as tctx:
+ tctx.master.addons.add(v)
+
+
+def test_resolve():
+ v = view.View()
+ with taddons.context(options=options.Options()) as tctx:
+ assert tctx.command(v.resolve, "@focus") == []
+ assert tctx.command(v.resolve, "@shown") == []
+ assert tctx.command(v.resolve, "@hidden") == []
+ assert tctx.command(v.resolve, "@marked") == []
+ assert tctx.command(v.resolve, "@unmarked") == []
+ assert tctx.command(v.resolve, "~m get") == []
+ v.request(tft(method="get"))
+ assert len(tctx.command(v.resolve, "~m get")) == 1
+ assert len(tctx.command(v.resolve, "@focus")) == 1
+ assert len(tctx.command(v.resolve, "@shown")) == 1
+ assert len(tctx.command(v.resolve, "@unmarked")) == 1
+ assert tctx.command(v.resolve, "@hidden") == []
+ assert tctx.command(v.resolve, "@marked") == []
+ v.request(tft(method="put"))
+ assert len(tctx.command(v.resolve, "@focus")) == 1
+ assert len(tctx.command(v.resolve, "@shown")) == 2
+ assert tctx.command(v.resolve, "@hidden") == []
+ assert tctx.command(v.resolve, "@marked") == []
+
+ v.request(tft(method="get"))
+ v.request(tft(method="put"))
+
+ f = flowfilter.parse("~m get")
+ v.set_filter(f)
+ v[0].marked = True
+
+ def m(l):
+ return [i.request.method for i in l]
+
+ assert m(tctx.command(v.resolve, "~m get")) == ["GET", "GET"]
+ assert m(tctx.command(v.resolve, "~m put")) == ["PUT", "PUT"]
+ assert m(tctx.command(v.resolve, "@shown")) == ["GET", "GET"]
+ assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"]
+ assert m(tctx.command(v.resolve, "@marked")) == ["GET"]
+ assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"]
+
+ with pytest.raises(exceptions.CommandError, match="Invalid flow filter"):
+ tctx.command(v.resolve, "~")
+
+
def test_order():
v = view.View()
with taddons.context(options=options.Options()) as tctx:
diff --git a/test/mitmproxy/test_addonmanager.py b/test/mitmproxy/test_addonmanager.py
index 7b461580..034182a6 100644
--- a/test/mitmproxy/test_addonmanager.py
+++ b/test/mitmproxy/test_addonmanager.py
@@ -61,9 +61,9 @@ def test_lifecycle():
a = addonmanager.AddonManager(m)
a.add(TAddon("one"))
- with pytest.raises(exceptions.AddonError):
+ with pytest.raises(exceptions.AddonManagerError):
a.add(TAddon("one"))
- with pytest.raises(exceptions.AddonError):
+ with pytest.raises(exceptions.AddonManagerError):
a.remove(TAddon("nonexistent"))
f = tflow.tflow()
@@ -82,6 +82,11 @@ def test_loader():
l.add_option("custom_option", bool, False, "help")
l.add_option("custom_option", bool, False, "help")
+ def cmd(a: str) -> str:
+ return "foo"
+
+ l.add_command("test.command", cmd)
+
def test_simple():
with taddons.context() as tctx:
diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py
new file mode 100644
index 00000000..92d8c77b
--- /dev/null
+++ b/test/mitmproxy/test_command.py
@@ -0,0 +1,74 @@
+import typing
+from mitmproxy import command
+from mitmproxy import flow
+from mitmproxy import master
+from mitmproxy import options
+from mitmproxy import proxy
+from mitmproxy import exceptions
+from mitmproxy.test import tflow
+from mitmproxy.test import taddons
+import pytest
+
+
+class TAddon:
+ def cmd1(self, foo: str) -> str:
+ return "ret " + foo
+
+ def cmd2(self, foo: str) -> str:
+ return 99
+
+
+class TestCommand:
+ def test_call(self):
+ o = options.Options()
+ m = master.Master(o, proxy.DummyServer(o))
+ cm = command.CommandManager(m)
+
+ a = TAddon()
+ c = command.Command(cm, "cmd.path", a.cmd1)
+ assert c.call(["foo"]) == "ret foo"
+ assert c.signature_help() == "cmd.path str -> str"
+
+ c = command.Command(cm, "cmd.two", a.cmd2)
+ with pytest.raises(exceptions.CommandError):
+ c.call(["foo"])
+
+
+def test_simple():
+ o = options.Options()
+ m = master.Master(o, proxy.DummyServer(o))
+ c = command.CommandManager(m)
+ a = TAddon()
+ c.add("one.two", a.cmd1)
+ assert(c.call("one.two foo") == "ret foo")
+ with pytest.raises(exceptions.CommandError, match="Unknown"):
+ c.call("nonexistent")
+ with pytest.raises(exceptions.CommandError, match="Invalid"):
+ c.call("")
+ with pytest.raises(exceptions.CommandError, match="Usage"):
+ c.call("one.two too many args")
+
+
+def test_typename():
+ assert command.typename(str, True) == "str"
+ assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]"
+ assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec"
+
+
+class DummyConsole:
+ def load(self, l):
+ l.add_command("console.resolve", self.resolve)
+
+ def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
+ return [tflow.tflow(resp=True)]
+
+
+def test_parsearg():
+ with taddons.context() as tctx:
+ tctx.master.addons.add(DummyConsole())
+ assert command.parsearg(tctx.master.commands, "foo", str) == "foo"
+ assert len(command.parsearg(
+ tctx.master.commands, "~b", typing.Sequence[flow.Flow]
+ )) == 1
+ with pytest.raises(exceptions.CommandError):
+ command.parsearg(tctx.master.commands, "foo", Exception)
diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py
index fd0c6e0c..22bd7c34 100644
--- a/test/mitmproxy/utils/test_typecheck.py
+++ b/test/mitmproxy/utils/test_typecheck.py
@@ -16,72 +16,86 @@ class T(TBase):
super(T, self).__init__(42)
-def test_check_type():
- typecheck.check_type("foo", 42, int)
+def test_check_option_type():
+ typecheck.check_option_type("foo", 42, int)
with pytest.raises(TypeError):
- typecheck.check_type("foo", 42, str)
+ typecheck.check_option_type("foo", 42, str)
with pytest.raises(TypeError):
- typecheck.check_type("foo", None, str)
+ typecheck.check_option_type("foo", None, str)
with pytest.raises(TypeError):
- typecheck.check_type("foo", b"foo", str)
+ typecheck.check_option_type("foo", b"foo", str)
def test_check_union():
- typecheck.check_type("foo", 42, typing.Union[int, str])
- typecheck.check_type("foo", "42", typing.Union[int, str])
+ typecheck.check_option_type("foo", 42, typing.Union[int, str])
+ typecheck.check_option_type("foo", "42", typing.Union[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", [], typing.Union[int, str])
+ typecheck.check_option_type("foo", [], typing.Union[int, str])
# Python 3.5 only defines __union_params__
m = mock.Mock()
m.__str__ = lambda self: "typing.Union"
m.__union_params__ = (int,)
- typecheck.check_type("foo", 42, m)
+ typecheck.check_option_type("foo", 42, m)
def test_check_tuple():
- typecheck.check_type("foo", (42, "42"), typing.Tuple[int, str])
+ typecheck.check_option_type("foo", (42, "42"), typing.Tuple[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", None, typing.Tuple[int, str])
+ typecheck.check_option_type("foo", None, typing.Tuple[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", (), typing.Tuple[int, str])
+ typecheck.check_option_type("foo", (), typing.Tuple[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", (42, 42), typing.Tuple[int, str])
+ typecheck.check_option_type("foo", (42, 42), typing.Tuple[int, str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", ("42", 42), typing.Tuple[int, str])
+ typecheck.check_option_type("foo", ("42", 42), typing.Tuple[int, str])
# Python 3.5 only defines __tuple_params__
m = mock.Mock()
m.__str__ = lambda self: "typing.Tuple"
m.__tuple_params__ = (int, str)
- typecheck.check_type("foo", (42, "42"), m)
+ typecheck.check_option_type("foo", (42, "42"), m)
def test_check_sequence():
- typecheck.check_type("foo", [10], typing.Sequence[int])
+ typecheck.check_option_type("foo", [10], typing.Sequence[int])
with pytest.raises(TypeError):
- typecheck.check_type("foo", ["foo"], typing.Sequence[int])
+ typecheck.check_option_type("foo", ["foo"], typing.Sequence[int])
with pytest.raises(TypeError):
- typecheck.check_type("foo", [10, "foo"], typing.Sequence[int])
+ typecheck.check_option_type("foo", [10, "foo"], typing.Sequence[int])
with pytest.raises(TypeError):
- typecheck.check_type("foo", [b"foo"], typing.Sequence[str])
+ typecheck.check_option_type("foo", [b"foo"], typing.Sequence[str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", "foo", typing.Sequence[str])
+ typecheck.check_option_type("foo", "foo", typing.Sequence[str])
# Python 3.5 only defines __parameters__
m = mock.Mock()
m.__str__ = lambda self: "typing.Sequence"
m.__parameters__ = (int,)
- typecheck.check_type("foo", [10], m)
+ typecheck.check_option_type("foo", [10], m)
def test_check_io():
- typecheck.check_type("foo", io.StringIO(), typing.IO[str])
+ typecheck.check_option_type("foo", io.StringIO(), typing.IO[str])
with pytest.raises(TypeError):
- typecheck.check_type("foo", "foo", typing.IO[str])
+ typecheck.check_option_type("foo", "foo", typing.IO[str])
def test_check_any():
- typecheck.check_type("foo", 42, typing.Any)
- typecheck.check_type("foo", object(), typing.Any)
- typecheck.check_type("foo", None, typing.Any)
+ typecheck.check_option_type("foo", 42, typing.Any)
+ typecheck.check_option_type("foo", object(), typing.Any)
+ typecheck.check_option_type("foo", None, typing.Any)
+
+
+def test_check_command_return_type():
+ assert(typecheck.check_command_return_type("foo", str))
+ assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str]))
+ assert(typecheck.check_command_return_type(None, None))
+ assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int]))
+ assert(not typecheck.check_command_return_type("foo", typing.Sequence[int]))
+
+ # Python 3.5 only defines __parameters__
+ m = mock.Mock()
+ m.__str__ = lambda self: "typing.Sequence"
+ m.__parameters__ = (int,)
+ typecheck.check_command_return_type([10], m)