diff options
24 files changed, 523 insertions, 149 deletions
diff --git a/mitmproxy/addonmanager.py b/mitmproxy/addonmanager.py index 25461338..ea23b6ff 100644 --- a/mitmproxy/addonmanager.py +++ b/mitmproxy/addonmanager.py @@ -93,6 +93,9 @@ class Loader: choices ) + def add_command(self, path: str, func: typing.Callable) -> None: + self.master.commands.add(path, func) + def traverse(chain): """ @@ -142,7 +145,7 @@ class AddonManager: for a in traverse([addon]): name = _get_name(a) if name in self.lookup: - raise exceptions.AddonError( + raise exceptions.AddonManagerError( "An addon called '%s' already exists." % name ) l = Loader(self.master) @@ -172,7 +175,7 @@ class AddonManager: for a in traverse([addon]): n = _get_name(a) if n not in self.lookup: - raise exceptions.AddonError("No such addon: %s" % n) + raise exceptions.AddonManagerError("No such addon: %s" % n) self.chain = [i for i in self.chain if i is not a] del self.lookup[_get_name(a)] with self.master.handlecontext(): @@ -221,7 +224,7 @@ class AddonManager: func = getattr(a, name, None) if func: if not callable(func): - raise exceptions.AddonError( + raise exceptions.AddonManagerError( "Addon handler %s not callable" % name ) func(*args, **kwargs) diff --git a/mitmproxy/addons/__init__.py b/mitmproxy/addons/__init__.py index 7a45106c..e87f2cbd 100644 --- a/mitmproxy/addons/__init__.py +++ b/mitmproxy/addons/__init__.py @@ -14,7 +14,7 @@ from mitmproxy.addons import setheaders from mitmproxy.addons import stickyauth from mitmproxy.addons import stickycookie from mitmproxy.addons import streambodies -from mitmproxy.addons import streamfile +from mitmproxy.addons import save from mitmproxy.addons import upstream_auth @@ -36,6 +36,6 @@ def default_addons(): stickyauth.StickyAuth(), stickycookie.StickyCookie(), streambodies.StreamBodies(), - streamfile.StreamFile(), + save.Save(), upstream_auth.UpstreamAuth(), ] diff --git a/mitmproxy/addons/streamfile.py b/mitmproxy/addons/save.py index fde5a1c5..37dc6021 100644 --- a/mitmproxy/addons/streamfile.py +++ b/mitmproxy/addons/save.py @@ -1,21 +1,31 @@ import os.path +import typing from mitmproxy import exceptions from mitmproxy import flowfilter from mitmproxy import io from mitmproxy import ctx +from mitmproxy import flow -class StreamFile: +class Save: def __init__(self): self.stream = None self.filt = None self.active_flows = set() # type: Set[flow.Flow] - def start_stream_to_path(self, path, mode, flt): + def open_file(self, path): + if path.startswith("+"): + path = path[1:] + mode = "ab" + else: + mode = "wb" path = os.path.expanduser(path) + return open(path, mode) + + def start_stream_to_path(self, path, flt): try: - f = open(path, mode) + f = self.open_file(path) except IOError as v: raise exceptions.OptionsError(str(v)) self.stream = io.FilteredFlowWriter(f, flt) @@ -23,26 +33,32 @@ class StreamFile: def configure(self, updated): # We're already streaming - stop the previous stream and restart - if "streamfile_filter" in updated: - if ctx.options.streamfile_filter: - self.filt = flowfilter.parse(ctx.options.streamfile_filter) + if "save_stream_filter" in updated: + if ctx.options.save_stream_filter: + self.filt = flowfilter.parse(ctx.options.save_stream_filter) if not self.filt: raise exceptions.OptionsError( - "Invalid filter specification: %s" % ctx.options.streamfile_filter + "Invalid filter specification: %s" % ctx.options.save_stream_filter ) else: self.filt = None - if "streamfile" in updated: + if "save_stream_file" in updated: if self.stream: self.done() - if ctx.options.streamfile: - if ctx.options.streamfile.startswith("+"): - path = ctx.options.streamfile[1:] - mode = "ab" - else: - path = ctx.options.streamfile - mode = "wb" - self.start_stream_to_path(path, mode, self.filt) + if ctx.options.save_stream_file: + self.start_stream_to_path(ctx.options.save_stream_file, self.filt) + + def save(self, flows: typing.Sequence[flow.Flow], path: str) -> None: + try: + f = self.open_file(path) + except IOError as v: + raise exceptions.CommandError(v) from v + stream = io.FlowWriter(f) + for i in flows: + stream.add(i) + + def load(self, l): + l.add_command("save.file", self.save) def tcp_start(self, flow): if self.stream: @@ -64,8 +80,8 @@ class StreamFile: def done(self): if self.stream: - for flow in self.active_flows: - self.stream.add(flow) + for f in self.active_flows: + self.stream.add(f) self.active_flows = set([]) self.stream.fo.close() self.stream = None diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py index 341958c2..63416b9f 100644 --- a/mitmproxy/addons/view.py +++ b/mitmproxy/addons/view.py @@ -111,10 +111,8 @@ class View(collections.Sequence): self.default_order = OrderRequestStart(self) self.orders = dict( - time = OrderRequestStart(self), - method = OrderRequestMethod(self), - url = OrderRequestURL(self), - size = OrderKeySize(self), + time = OrderRequestStart(self), method = OrderRequestMethod(self), + url = OrderRequestURL(self), size = OrderKeySize(self), ) self.order_key = self.default_order self.order_reversed = False @@ -324,6 +322,26 @@ class View(collections.Sequence): if "console_focus_follow" in updated: self.focus_follow = ctx.options.console_focus_follow + def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]: + if spec == "@focus": + return [self.focus.flow] if self.focus.flow else [] + elif spec == "@shown": + return [i for i in self] + elif spec == "@hidden": + return [i for i in self._store.values() if i not in self._view] + elif spec == "@marked": + return [i for i in self._store.values() if i.marked] + elif spec == "@unmarked": + return [i for i in self._store.values() if not i.marked] + else: + filt = flowfilter.parse(spec) + if not filt: + raise exceptions.CommandError("Invalid flow filter: %s" % spec) + return [i for i in self._store.values() if filt(i)] + + def load(self, l): + l.add_command("console.resolve", self.resolve) + def request(self, f): self.add(f) diff --git a/mitmproxy/command.py b/mitmproxy/command.py new file mode 100644 index 00000000..acf938d5 --- /dev/null +++ b/mitmproxy/command.py @@ -0,0 +1,91 @@ +import inspect +import typing +import shlex +from mitmproxy.utils import typecheck +from mitmproxy import exceptions +from mitmproxy import flow + + +def typename(t: type, ret: bool) -> str: + """ + Translates a type to an explanatory string. Ifl ret is True, we're + looking at a return type, else we're looking at a parameter type. + """ + if t in (str, int, bool): + return t.__name__ + if t == typing.Sequence[flow.Flow]: + return "[flow]" if ret else "flowspec" + else: # pragma: no cover + raise NotImplementedError(t) + + +class Command: + def __init__(self, manager, path, func) -> None: + self.path = path + self.manager = manager + self.func = func + sig = inspect.signature(self.func) + self.paramtypes = [v.annotation for v in sig.parameters.values()] + self.returntype = sig.return_annotation + + def signature_help(self) -> str: + params = " ".join([typename(i, False) for i in self.paramtypes]) + ret = " -> " + typename(self.returntype, True) if self.returntype else "" + return "%s %s%s" % (self.path, params, ret) + + def call(self, args: typing.Sequence[str]): + """ + Call the command with a set of arguments. At this point, all argumets are strings. + """ + if len(self.paramtypes) != len(args): + raise exceptions.CommandError("Usage: %s" % self.signature_help()) + + pargs = [] + for i in range(len(args)): + pargs.append(parsearg(self.manager, args[i], self.paramtypes[i])) + + with self.manager.master.handlecontext(): + ret = self.func(*pargs) + + if not typecheck.check_command_return_type(ret, self.returntype): + raise exceptions.CommandError("Command returned unexpected data") + + return ret + + +class CommandManager: + def __init__(self, master): + self.master = master + self.commands = {} + + def add(self, path: str, func: typing.Callable): + self.commands[path] = Command(self, path, func) + + def call_args(self, path, args): + """ + Call a command using a list of string arguments. May raise CommandError. + """ + if path not in self.commands: + raise exceptions.CommandError("Unknown command: %s" % path) + return self.commands[path].call(args) + + def call(self, cmdstr: str): + """ + Call a command using a string. May raise CommandError. + """ + parts = shlex.split(cmdstr) + if not len(parts) >= 1: + raise exceptions.CommandError("Invalid command: %s" % cmdstr) + return self.call_args(parts[0], parts[1:]) + + +def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any: + """ + Convert a string to a argument to the appropriate type. + """ + if argtype == str: + return spec + elif argtype == typing.Sequence[flow.Flow]: + return manager.call_args("console.resolve", [spec]) + else: + raise exceptions.CommandError("Unsupported argument type: %s" % argtype) diff --git a/mitmproxy/exceptions.py b/mitmproxy/exceptions.py index 9b6328ac..71517480 100644 --- a/mitmproxy/exceptions.py +++ b/mitmproxy/exceptions.py @@ -93,11 +93,15 @@ class SetServerNotAllowedException(MitmproxyException): pass +class CommandError(Exception): + pass + + class OptionsError(MitmproxyException): pass -class AddonError(MitmproxyException): +class AddonManagerError(MitmproxyException): pass diff --git a/mitmproxy/master.py b/mitmproxy/master.py index 94900915..2a032c4a 100644 --- a/mitmproxy/master.py +++ b/mitmproxy/master.py @@ -8,6 +8,7 @@ from mitmproxy import controller from mitmproxy import eventsequence from mitmproxy import exceptions from mitmproxy import connections +from mitmproxy import command from mitmproxy import http from mitmproxy import log from mitmproxy.proxy.protocol import http_replay @@ -34,6 +35,7 @@ class Master: """ def __init__(self, opts, server): self.options = opts or options.Options() + self.commands = command.CommandManager(self) self.addons = addonmanager.AddonManager(self) self.event_queue = queue.Queue() self.should_exit = threading.Event() diff --git a/mitmproxy/options.py b/mitmproxy/options.py index 8f8c1484..e477bed5 100644 --- a/mitmproxy/options.py +++ b/mitmproxy/options.py @@ -159,11 +159,11 @@ class Options(optmanager.OptManager): choices = [i.name.lower() for i in contentviews.views] ) self.add_option( - "streamfile", Optional[str], None, - "Write flows to file. Prefix path with + to append." + "save_stream_file", Optional[str], None, + "Stream flows to file as they arrive. Prefix path with + to append." ) self.add_option( - "streamfile_filter", Optional[str], None, + "save_stream_filter", Optional[str], None, "Filter which flows are written to file." ) self.add_option( diff --git a/mitmproxy/optmanager.py b/mitmproxy/optmanager.py index 8369a36e..cf6e21b0 100644 --- a/mitmproxy/optmanager.py +++ b/mitmproxy/optmanager.py @@ -31,7 +31,7 @@ class _Option: help: str, choices: typing.Optional[typing.Sequence[str]] ) -> None: - typecheck.check_type(name, default, typespec) + typecheck.check_option_type(name, default, typespec) self.name = name self.typespec = typespec self._default = default @@ -54,7 +54,7 @@ class _Option: return copy.deepcopy(v) def set(self, value: typing.Any) -> None: - typecheck.check_type(self.name, value, self.typespec) + typecheck.check_option_type(self.name, value, self.typespec) self.value = value def reset(self) -> None: diff --git a/mitmproxy/test/taddons.py b/mitmproxy/test/taddons.py index ea9534af..5680e847 100644 --- a/mitmproxy/test/taddons.py +++ b/mitmproxy/test/taddons.py @@ -6,6 +6,7 @@ import mitmproxy.options from mitmproxy import proxy from mitmproxy import addonmanager from mitmproxy import eventsequence +from mitmproxy import command from mitmproxy.addons import script @@ -126,3 +127,10 @@ class context: Recursively invoke an event on an addon and all its children. """ return self.master.addons.invoke_addon(addon, event, *args, **kwargs) + + def command(self, func, *args): + """ + Invoke a command function with a list of string arguments within a command context, mimicing the actual command environment. + """ + cmd = command.Command(self.master.commands, "test.command", func) + return cmd.call(args) diff --git a/mitmproxy/tools/cmdline.py b/mitmproxy/tools/cmdline.py index fbdbce52..ca83d50e 100644 --- a/mitmproxy/tools/cmdline.py +++ b/mitmproxy/tools/cmdline.py @@ -26,6 +26,11 @@ def common_options(parser, opts): help="Show all options and their default values", ) parser.add_argument( + '--commands', + action='store_true', + help="Show all commands and their signatures", + ) + parser.add_argument( "--conf", type=str, dest="conf", default=CONFIG_PATH, metavar="PATH", @@ -61,7 +66,7 @@ def common_options(parser, opts): opts.make_parser(parser, "scripts", metavar="SCRIPT", short="s") opts.make_parser(parser, "stickycookie", metavar="FILTER") opts.make_parser(parser, "stickyauth", metavar="FILTER") - opts.make_parser(parser, "streamfile", metavar="PATH", short="w") + opts.make_parser(parser, "save_stream_file", metavar="PATH", short="w") opts.make_parser(parser, "anticomp") # Proxy options @@ -123,7 +128,7 @@ def mitmdump(opts): nargs="...", help=""" Filter expression, equivalent to setting both the view_filter - and streamfile_filter options. + and save_stream_filter options. """ ) return parser diff --git a/mitmproxy/tools/console/command.py b/mitmproxy/tools/console/command.py new file mode 100644 index 00000000..4cb4fe6d --- /dev/null +++ b/mitmproxy/tools/console/command.py @@ -0,0 +1,27 @@ +import urwid + +from mitmproxy import exceptions +from mitmproxy.tools.console import signals + + +class CommandEdit(urwid.Edit): + def __init__(self): + urwid.Edit.__init__(self, ":", "") + + def keypress(self, size, key): + return urwid.Edit.keypress(self, size, key) + + +class CommandExecutor: + def __init__(self, master): + self.master = master + + def __call__(self, cmd): + if cmd.strip(): + try: + ret = self.master.commands.call(cmd) + except exceptions.CommandError as v: + signals.status_message.send(message=str(v)) + else: + if type(ret) == str: + signals.status_message.send(message=ret) diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py index 044f8f05..00e5cf4e 100644 --- a/mitmproxy/tools/console/flowlist.py +++ b/mitmproxy/tools/console/flowlist.py @@ -353,7 +353,9 @@ class FlowListBox(urwid.ListBox): def keypress(self, size, key): key = common.shortcuts(key) - if key == "A": + if key == ":": + signals.status_prompt_command.send() + elif key == "A": for f in self.master.view: if f.intercepted: f.resume() @@ -409,13 +411,13 @@ class FlowListBox(urwid.ListBox): val = not self.master.options.console_order_reversed self.master.options.console_order_reversed = val elif key == "W": - if self.master.options.streamfile: - self.master.options.streamfile = None + if self.master.options.save_stream_file: + self.master.options.save_stream_file = None else: signals.status_prompt_path.send( self, prompt="Stream flows to", - callback= lambda path: self.master.options.update(streamfile=path) + callback= lambda path: self.master.options.update(save_stream_file=path) ) else: return urwid.ListBox.keypress(self, size, key) diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py index e7a2c6ae..8727d175 100644 --- a/mitmproxy/tools/console/master.py +++ b/mitmproxy/tools/console/master.py @@ -288,7 +288,7 @@ class ConsoleMaster(master.Master): screen = self.ui, handle_mouse = self.options.console_mouse, ) - self.ab = statusbar.ActionBar() + self.ab = statusbar.ActionBar(self) self.loop.set_alarm_in(0.01, self.ticker) self.loop.set_alarm_in( diff --git a/mitmproxy/tools/console/signals.py b/mitmproxy/tools/console/signals.py index 93f09577..91cb63b3 100644 --- a/mitmproxy/tools/console/signals.py +++ b/mitmproxy/tools/console/signals.py @@ -24,6 +24,9 @@ status_prompt_path = blinker.Signal() # Prompt for a single keystroke status_prompt_onekey = blinker.Signal() +# Prompt for a command +status_prompt_command = blinker.Signal() + # Call a callback in N seconds call_in = blinker.Signal() diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index d3a3e1f2..1930fa2f 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -5,6 +5,7 @@ import urwid from mitmproxy.tools.console import common from mitmproxy.tools.console import pathedit from mitmproxy.tools.console import signals +from mitmproxy.tools.console import command import mitmproxy.tools.console.master # noqa @@ -32,13 +33,15 @@ class PromptStub: class ActionBar(urwid.WidgetWrap): - def __init__(self): + def __init__(self, master): urwid.WidgetWrap.__init__(self, None) + self.master = master self.clear() signals.status_message.connect(self.sig_message) signals.status_prompt.connect(self.sig_prompt) signals.status_prompt_path.connect(self.sig_path_prompt) signals.status_prompt_onekey.connect(self.sig_prompt_onekey) + signals.status_prompt_command.connect(self.sig_prompt_command) self.last_path = "" @@ -66,6 +69,11 @@ class ActionBar(urwid.WidgetWrap): self._w = urwid.Edit(self.prep_prompt(prompt), text or "") self.prompting = PromptStub(callback, args) + def sig_prompt_command(self, sender): + signals.focus.send(self, section="footer") + self._w = command.CommandEdit() + self.prompting = command.CommandExecutor(self.master) + def sig_path_prompt(self, sender, prompt, callback, args=()): signals.focus.send(self, section="footer") self._w = pathedit.PathEdit( @@ -243,8 +251,8 @@ class StatusBar(urwid.WidgetWrap): r.append(("heading_key", "s")) r.append("cripts:%s]" % len(self.master.options.scripts)) - if self.master.options.streamfile: - r.append("[W:%s]" % self.master.options.streamfile) + if self.master.options.save_stream_file: + r.append("[W:%s]" % self.master.options.save_stream_file) return r diff --git a/mitmproxy/tools/main.py b/mitmproxy/tools/main.py index b83a35d1..fefdca5c 100644 --- a/mitmproxy/tools/main.py +++ b/mitmproxy/tools/main.py @@ -39,7 +39,7 @@ def process_options(parser, opts, args): if args.version: print(debug.dump_system_info()) sys.exit(0) - if args.quiet or args.options: + if args.quiet or args.options or args.commands: args.verbosity = 0 args.flow_detail = 0 @@ -84,6 +84,13 @@ def run(MasterKlass, args, extra=None): # pragma: no cover if args.options: print(optmanager.dump_defaults(opts)) sys.exit(0) + if args.commands: + cmds = [] + for c in master.commands.commands.values(): + cmds.append(c.signature_help()) + for i in sorted(cmds): + print(i) + sys.exit(0) opts.set(*args.setoptions) if extra: opts.update(**extra(args)) @@ -120,7 +127,7 @@ def mitmdump(args=None): # pragma: no cover v = " ".join(args.filter_args) return dict( view_filter = v, - streamfile_filter = v, + save_stream_filter = v, ) return {} diff --git a/mitmproxy/utils/typecheck.py b/mitmproxy/utils/typecheck.py index 628ea642..20791e17 100644 --- a/mitmproxy/utils/typecheck.py +++ b/mitmproxy/utils/typecheck.py @@ -1,20 +1,37 @@ import typing -def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None: +def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool: """ - This function checks if the provided value is an instance of typeinfo - and raises a TypeError otherwise. + Check if the provided value is an instance of typeinfo. Returns True if the + types match, False otherwise. This function supports only those types + required for command return values. + """ + typename = str(typeinfo) + if typename.startswith("typing.Sequence"): + try: + T = typeinfo.__args__[0] # type: ignore + except AttributeError: + # Python 3.5.0 + T = typeinfo.__parameters__[0] # type: ignore + if not isinstance(value, (tuple, list)): + return False + for v in value: + if not check_command_return_type(v, T): + return False + elif value is None and typeinfo is None: + return True + elif not isinstance(value, typeinfo): + return False + return True - The following types from the typing package have specialized support: - - Union - - Tuple - - IO +def check_option_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None: + """ + Check if the provided value is an instance of typeinfo and raises a + TypeError otherwise. This function supports only those types required for + options. """ - # If we realize that we need to extend this list substantially, it may make sense - # to use typeguard for this, but right now it's not worth the hassle for 16 lines of code. - e = TypeError("Expected {} for {}, but got {}.".format( typeinfo, name, @@ -32,7 +49,7 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None: for T in types: try: - check_type(name, value, T) + check_option_type(name, value, T) except TypeError: pass else: @@ -50,7 +67,7 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None: if len(types) != len(value): raise e for i, (x, T) in enumerate(zip(value, types)): - check_type("{}[{}]".format(name, i), x, T) + check_option_type("{}[{}]".format(name, i), x, T) return elif typename.startswith("typing.Sequence"): try: @@ -58,11 +75,10 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None: except AttributeError: # Python 3.5.0 T = typeinfo.__parameters__[0] # type: ignore - if not isinstance(value, (tuple, list)): raise e for v in value: - check_type(name, v, T) + check_option_type(name, v, T) elif typename.startswith("typing.IO"): if hasattr(value, "read"): return diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py new file mode 100644 index 00000000..85c2a398 --- /dev/null +++ b/test/mitmproxy/addons/test_save.py @@ -0,0 +1,83 @@ +import pytest + +from mitmproxy.test import taddons +from mitmproxy.test import tflow + +from mitmproxy import io +from mitmproxy import exceptions +from mitmproxy import options +from mitmproxy.addons import save +from mitmproxy.addons import view + + +def test_configure(tmpdir): + sa = save.Save() + with taddons.context(options=options.Options()) as tctx: + with pytest.raises(exceptions.OptionsError): + tctx.configure(sa, save_stream_file=str(tmpdir)) + with pytest.raises(Exception, match="Invalid filter"): + tctx.configure( + sa, save_stream_file=str(tmpdir.join("foo")), save_stream_filter="~~" + ) + tctx.configure(sa, save_stream_filter="foo") + assert sa.filt + tctx.configure(sa, save_stream_filter=None) + assert not sa.filt + + +def rd(p): + x = io.FlowReader(open(p, "rb")) + return list(x.stream()) + + +def test_tcp(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + tctx.configure(sa, save_stream_file=p) + + tt = tflow.ttcpflow() + sa.tcp_start(tt) + sa.tcp_end(tt) + tctx.configure(sa, save_stream_file=None) + assert rd(p) + + +def test_save_command(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + sa.save([tflow.tflow(resp=True)], p) + assert len(rd(p)) == 1 + sa.save([tflow.tflow(resp=True)], p) + assert len(rd(p)) == 1 + sa.save([tflow.tflow(resp=True)], "+" + p) + assert len(rd(p)) == 2 + + with pytest.raises(exceptions.CommandError): + sa.save([tflow.tflow(resp=True)], str(tmpdir)) + + v = view.View() + tctx.master.addons.add(v) + tctx.master.addons.add(sa) + tctx.master.commands.call_args("save.file", ["@shown", p]) + + +def test_simple(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + + tctx.configure(sa, save_stream_file=p) + + f = tflow.tflow(resp=True) + sa.request(f) + sa.response(f) + tctx.configure(sa, save_stream_file=None) + assert rd(p)[0].response + + tctx.configure(sa, save_stream_file="+" + p) + f = tflow.tflow() + sa.request(f) + tctx.configure(sa, save_stream_file=None) + assert not rd(p)[1].response diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py deleted file mode 100644 index bcb27c79..00000000 --- a/test/mitmproxy/addons/test_streamfile.py +++ /dev/null @@ -1,62 +0,0 @@ -import pytest - -from mitmproxy.test import taddons -from mitmproxy.test import tflow - -from mitmproxy import io -from mitmproxy import exceptions -from mitmproxy import options -from mitmproxy.addons import streamfile - - -def test_configure(tmpdir): - sa = streamfile.StreamFile() - with taddons.context(options=options.Options()) as tctx: - with pytest.raises(exceptions.OptionsError): - tctx.configure(sa, streamfile=str(tmpdir)) - with pytest.raises(Exception, match="Invalid filter"): - tctx.configure( - sa, streamfile=str(tmpdir.join("foo")), streamfile_filter="~~" - ) - tctx.configure(sa, streamfile_filter="foo") - assert sa.filt - tctx.configure(sa, streamfile_filter=None) - assert not sa.filt - - -def rd(p): - x = io.FlowReader(open(p, "rb")) - return list(x.stream()) - - -def test_tcp(tmpdir): - sa = streamfile.StreamFile() - with taddons.context() as tctx: - p = str(tmpdir.join("foo")) - tctx.configure(sa, streamfile=p) - - tt = tflow.ttcpflow() - sa.tcp_start(tt) - sa.tcp_end(tt) - tctx.configure(sa, streamfile=None) - assert rd(p) - - -def test_simple(tmpdir): - sa = streamfile.StreamFile() - with taddons.context() as tctx: - p = str(tmpdir.join("foo")) - - tctx.configure(sa, streamfile=p) - - f = tflow.tflow(resp=True) - sa.request(f) - sa.response(f) - tctx.configure(sa, streamfile=None) - assert rd(p)[0].response - - tctx.configure(sa, streamfile="+" + p) - f = tflow.tflow() - sa.request(f) - tctx.configure(sa, streamfile=None) - assert not rd(p)[1].response diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py index 7fa3819e..05d4af30 100644 --- a/test/mitmproxy/addons/test_view.py +++ b/test/mitmproxy/addons/test_view.py @@ -5,6 +5,7 @@ from mitmproxy.test import tflow from mitmproxy.addons import view from mitmproxy import flowfilter from mitmproxy import options +from mitmproxy import exceptions from mitmproxy.test import taddons @@ -130,6 +131,55 @@ def test_filter(): assert len(v) == 4 +def test_load(): + v = view.View() + with taddons.context(options=options.Options()) as tctx: + tctx.master.addons.add(v) + + +def test_resolve(): + v = view.View() + with taddons.context(options=options.Options()) as tctx: + assert tctx.command(v.resolve, "@focus") == [] + assert tctx.command(v.resolve, "@shown") == [] + assert tctx.command(v.resolve, "@hidden") == [] + assert tctx.command(v.resolve, "@marked") == [] + assert tctx.command(v.resolve, "@unmarked") == [] + assert tctx.command(v.resolve, "~m get") == [] + v.request(tft(method="get")) + assert len(tctx.command(v.resolve, "~m get")) == 1 + assert len(tctx.command(v.resolve, "@focus")) == 1 + assert len(tctx.command(v.resolve, "@shown")) == 1 + assert len(tctx.command(v.resolve, "@unmarked")) == 1 + assert tctx.command(v.resolve, "@hidden") == [] + assert tctx.command(v.resolve, "@marked") == [] + v.request(tft(method="put")) + assert len(tctx.command(v.resolve, "@focus")) == 1 + assert len(tctx.command(v.resolve, "@shown")) == 2 + assert tctx.command(v.resolve, "@hidden") == [] + assert tctx.command(v.resolve, "@marked") == [] + + v.request(tft(method="get")) + v.request(tft(method="put")) + + f = flowfilter.parse("~m get") + v.set_filter(f) + v[0].marked = True + + def m(l): + return [i.request.method for i in l] + + assert m(tctx.command(v.resolve, "~m get")) == ["GET", "GET"] + assert m(tctx.command(v.resolve, "~m put")) == ["PUT", "PUT"] + assert m(tctx.command(v.resolve, "@shown")) == ["GET", "GET"] + assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"] + assert m(tctx.command(v.resolve, "@marked")) == ["GET"] + assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"] + + with pytest.raises(exceptions.CommandError, match="Invalid flow filter"): + tctx.command(v.resolve, "~") + + def test_order(): v = view.View() with taddons.context(options=options.Options()) as tctx: diff --git a/test/mitmproxy/test_addonmanager.py b/test/mitmproxy/test_addonmanager.py index 7b461580..034182a6 100644 --- a/test/mitmproxy/test_addonmanager.py +++ b/test/mitmproxy/test_addonmanager.py @@ -61,9 +61,9 @@ def test_lifecycle(): a = addonmanager.AddonManager(m) a.add(TAddon("one")) - with pytest.raises(exceptions.AddonError): + with pytest.raises(exceptions.AddonManagerError): a.add(TAddon("one")) - with pytest.raises(exceptions.AddonError): + with pytest.raises(exceptions.AddonManagerError): a.remove(TAddon("nonexistent")) f = tflow.tflow() @@ -82,6 +82,11 @@ def test_loader(): l.add_option("custom_option", bool, False, "help") l.add_option("custom_option", bool, False, "help") + def cmd(a: str) -> str: + return "foo" + + l.add_command("test.command", cmd) + def test_simple(): with taddons.context() as tctx: diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py new file mode 100644 index 00000000..92d8c77b --- /dev/null +++ b/test/mitmproxy/test_command.py @@ -0,0 +1,74 @@ +import typing +from mitmproxy import command +from mitmproxy import flow +from mitmproxy import master +from mitmproxy import options +from mitmproxy import proxy +from mitmproxy import exceptions +from mitmproxy.test import tflow +from mitmproxy.test import taddons +import pytest + + +class TAddon: + def cmd1(self, foo: str) -> str: + return "ret " + foo + + def cmd2(self, foo: str) -> str: + return 99 + + +class TestCommand: + def test_call(self): + o = options.Options() + m = master.Master(o, proxy.DummyServer(o)) + cm = command.CommandManager(m) + + a = TAddon() + c = command.Command(cm, "cmd.path", a.cmd1) + assert c.call(["foo"]) == "ret foo" + assert c.signature_help() == "cmd.path str -> str" + + c = command.Command(cm, "cmd.two", a.cmd2) + with pytest.raises(exceptions.CommandError): + c.call(["foo"]) + + +def test_simple(): + o = options.Options() + m = master.Master(o, proxy.DummyServer(o)) + c = command.CommandManager(m) + a = TAddon() + c.add("one.two", a.cmd1) + assert(c.call("one.two foo") == "ret foo") + with pytest.raises(exceptions.CommandError, match="Unknown"): + c.call("nonexistent") + with pytest.raises(exceptions.CommandError, match="Invalid"): + c.call("") + with pytest.raises(exceptions.CommandError, match="Usage"): + c.call("one.two too many args") + + +def test_typename(): + assert command.typename(str, True) == "str" + assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]" + assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec" + + +class DummyConsole: + def load(self, l): + l.add_command("console.resolve", self.resolve) + + def resolve(self, spec: str) -> typing.Sequence[flow.Flow]: + return [tflow.tflow(resp=True)] + + +def test_parsearg(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + assert command.parsearg(tctx.master.commands, "foo", str) == "foo" + assert len(command.parsearg( + tctx.master.commands, "~b", typing.Sequence[flow.Flow] + )) == 1 + with pytest.raises(exceptions.CommandError): + command.parsearg(tctx.master.commands, "foo", Exception) diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py index fd0c6e0c..22bd7c34 100644 --- a/test/mitmproxy/utils/test_typecheck.py +++ b/test/mitmproxy/utils/test_typecheck.py @@ -16,72 +16,86 @@ class T(TBase): super(T, self).__init__(42) -def test_check_type(): - typecheck.check_type("foo", 42, int) +def test_check_option_type(): + typecheck.check_option_type("foo", 42, int) with pytest.raises(TypeError): - typecheck.check_type("foo", 42, str) + typecheck.check_option_type("foo", 42, str) with pytest.raises(TypeError): - typecheck.check_type("foo", None, str) + typecheck.check_option_type("foo", None, str) with pytest.raises(TypeError): - typecheck.check_type("foo", b"foo", str) + typecheck.check_option_type("foo", b"foo", str) def test_check_union(): - typecheck.check_type("foo", 42, typing.Union[int, str]) - typecheck.check_type("foo", "42", typing.Union[int, str]) + typecheck.check_option_type("foo", 42, typing.Union[int, str]) + typecheck.check_option_type("foo", "42", typing.Union[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", [], typing.Union[int, str]) + typecheck.check_option_type("foo", [], typing.Union[int, str]) # Python 3.5 only defines __union_params__ m = mock.Mock() m.__str__ = lambda self: "typing.Union" m.__union_params__ = (int,) - typecheck.check_type("foo", 42, m) + typecheck.check_option_type("foo", 42, m) def test_check_tuple(): - typecheck.check_type("foo", (42, "42"), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (42, "42"), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", None, typing.Tuple[int, str]) + typecheck.check_option_type("foo", None, typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", (), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", (42, 42), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (42, 42), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", ("42", 42), typing.Tuple[int, str]) + typecheck.check_option_type("foo", ("42", 42), typing.Tuple[int, str]) # Python 3.5 only defines __tuple_params__ m = mock.Mock() m.__str__ = lambda self: "typing.Tuple" m.__tuple_params__ = (int, str) - typecheck.check_type("foo", (42, "42"), m) + typecheck.check_option_type("foo", (42, "42"), m) def test_check_sequence(): - typecheck.check_type("foo", [10], typing.Sequence[int]) + typecheck.check_option_type("foo", [10], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", ["foo"], typing.Sequence[int]) + typecheck.check_option_type("foo", ["foo"], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", [10, "foo"], typing.Sequence[int]) + typecheck.check_option_type("foo", [10, "foo"], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", [b"foo"], typing.Sequence[str]) + typecheck.check_option_type("foo", [b"foo"], typing.Sequence[str]) with pytest.raises(TypeError): - typecheck.check_type("foo", "foo", typing.Sequence[str]) + typecheck.check_option_type("foo", "foo", typing.Sequence[str]) # Python 3.5 only defines __parameters__ m = mock.Mock() m.__str__ = lambda self: "typing.Sequence" m.__parameters__ = (int,) - typecheck.check_type("foo", [10], m) + typecheck.check_option_type("foo", [10], m) def test_check_io(): - typecheck.check_type("foo", io.StringIO(), typing.IO[str]) + typecheck.check_option_type("foo", io.StringIO(), typing.IO[str]) with pytest.raises(TypeError): - typecheck.check_type("foo", "foo", typing.IO[str]) + typecheck.check_option_type("foo", "foo", typing.IO[str]) def test_check_any(): - typecheck.check_type("foo", 42, typing.Any) - typecheck.check_type("foo", object(), typing.Any) - typecheck.check_type("foo", None, typing.Any) + typecheck.check_option_type("foo", 42, typing.Any) + typecheck.check_option_type("foo", object(), typing.Any) + typecheck.check_option_type("foo", None, typing.Any) + + +def test_check_command_return_type(): + assert(typecheck.check_command_return_type("foo", str)) + assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str])) + assert(typecheck.check_command_return_type(None, None)) + assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int])) + assert(not typecheck.check_command_return_type("foo", typing.Sequence[int])) + + # Python 3.5 only defines __parameters__ + m = mock.Mock() + m.__str__ = lambda self: "typing.Sequence" + m.__parameters__ = (int,) + typecheck.check_command_return_type([10], m) |