diff options
author | Aldo Cortesi <aldo@nullcube.com> | 2017-04-30 21:20:32 +1200 |
---|---|---|
committer | Aldo Cortesi <aldo@nullcube.com> | 2017-04-30 21:24:00 +1200 |
commit | 3cd93567f5be263f1b40b2545573d024f69d2bdd (patch) | |
tree | b2d8ab7aec197a56db69ef4e82d1451fd0d7063d | |
parent | bcbe87bb0986819c83c3e2efc683194bbf9c6c50 (diff) | |
download | mitmproxy-3cd93567f5be263f1b40b2545573d024f69d2bdd.tar.gz mitmproxy-3cd93567f5be263f1b40b2545573d024f69d2bdd.tar.bz2 mitmproxy-3cd93567f5be263f1b40b2545573d024f69d2bdd.zip |
commands: support *args for commands
Use this to simplify meta-commands in console, and to create a console_choose
command that prompts the user for a choice, and then executes a command with
variable substitution.
-rw-r--r-- | mitmproxy/command.py | 44 | ||||
-rw-r--r-- | mitmproxy/tools/console/flowlist.py | 5 | ||||
-rw-r--r-- | mitmproxy/tools/console/master.py | 38 | ||||
-rw-r--r-- | mitmproxy/utils/typecheck.py | 6 | ||||
-rw-r--r-- | test/mitmproxy/test_command.py | 53 | ||||
-rw-r--r-- | test/mitmproxy/utils/test_typecheck.py | 23 |
6 files changed, 122 insertions, 47 deletions
diff --git a/mitmproxy/command.py b/mitmproxy/command.py index 031d2cae..82b8fae4 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -1,3 +1,6 @@ +""" + This module manges and invokes typed commands. +""" import inspect import typing import shlex @@ -17,10 +20,10 @@ Cuts = typing.Sequence[ def typename(t: type, ret: bool) -> str: """ - Translates a type to an explanatory string. Ifl ret is True, we're + Translates a type to an explanatory string. If ret is True, we're looking at a return type, else we're looking at a parameter type. """ - if t in (str, int, bool): + if issubclass(t, (str, int, bool)): return t.__name__ elif t == typing.Sequence[flow.Flow]: return "[flow]" if ret else "flowspec" @@ -44,11 +47,20 @@ class Command: if func.__doc__: txt = func.__doc__.strip() self.help = "\n".join(textwrap.wrap(txt)) + + self.has_positional = False + for i in sig.parameters.values(): + # This is the kind for *args paramters + if i.kind == i.VAR_POSITIONAL: + self.has_positional = True self.paramtypes = [v.annotation for v in sig.parameters.values()] self.returntype = sig.return_annotation def paramnames(self) -> typing.Sequence[str]: - return [typename(i, False) for i in self.paramtypes] + v = [typename(i, False) for i in self.paramtypes] + if self.has_positional: + v[-1] = "*" + v[-1][1:-1] + return v def retname(self) -> str: return typename(self.returntype, True) if self.returntype else "" @@ -64,17 +76,31 @@ class Command: """ Call the command with a set of arguments. At this point, all argumets are strings. """ - if len(self.paramtypes) != len(args): + if not self.has_positional and (len(self.paramtypes) != len(args)): raise exceptions.CommandError("Usage: %s" % self.signature_help()) + remainder = [] # type: typing.Sequence[str] + if self.has_positional: + remainder = args[len(self.paramtypes) - 1:] + args = args[:len(self.paramtypes) - 1] + pargs = [] for i in range(len(args)): - pargs.append(parsearg(self.manager, args[i], self.paramtypes[i])) + if typecheck.check_command_type(args[i], self.paramtypes[i]): + pargs.append(args[i]) + else: + pargs.append(parsearg(self.manager, args[i], self.paramtypes[i])) + + if remainder: + if typecheck.check_command_type(remainder, self.paramtypes[-1]): + pargs.extend(remainder) + else: + raise exceptions.CommandError("Invalid value type.") with self.manager.master.handlecontext(): ret = self.func(*pargs) - if not typecheck.check_command_return_type(ret, self.returntype): + if not typecheck.check_command_type(ret, self.returntype): raise exceptions.CommandError("Command returned unexpected data") return ret @@ -126,7 +152,7 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any: """ Convert a string to a argument to the appropriate type. """ - if argtype == str: + if issubclass(argtype, str): return spec elif argtype == bool: if spec == "true": @@ -137,7 +163,7 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any: raise exceptions.CommandError( "Booleans are 'true' or 'false', got %s" % spec ) - elif argtype == int: + elif issubclass(argtype, int): try: return int(spec) except ValueError as e: @@ -153,6 +179,8 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any: "Command requires one flow, specification matched %s." % len(flows) ) return flows[0] + elif argtype == typing.Sequence[str]: + return [i.strip() for i in spec.split(",")] else: raise exceptions.CommandError("Unsupported argument type: %s" % argtype) diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py index be3d2dae..7364524f 100644 --- a/mitmproxy/tools/console/flowlist.py +++ b/mitmproxy/tools/console/flowlist.py @@ -2,7 +2,6 @@ import urwid from mitmproxy.tools.console import common from mitmproxy.tools.console import signals -from mitmproxy.tools.console import master from mitmproxy.addons import view import mitmproxy.tools.console.master # noqa @@ -185,7 +184,9 @@ class FlowListWalker(urwid.ListWalker): class FlowListBox(urwid.ListBox): - def __init__(self, master: master.ConsoleMaster) -> None: + def __init__( + self, master: "mitmproxy.tools.console.master.ConsoleMaster" + ) -> None: self.master = master # type: "mitmproxy.tools.console.master.ConsoleMaster" super().__init__(FlowListWalker(master)) diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py index b79125fb..9b651dcc 100644 --- a/mitmproxy/tools/console/master.py +++ b/mitmproxy/tools/console/master.py @@ -9,9 +9,11 @@ import subprocess import sys import tempfile import traceback +import typing import urwid +from mitmproxy import ctx from mitmproxy import addons from mitmproxy import command from mitmproxy import master @@ -84,12 +86,31 @@ class ConsoleAddon: self.master = master self.started = False + @command.command("console.choose") + def console_choose( + self, prompt: str, choicecmd: str, *cmd: typing.Sequence[str] + ) -> None: + """ + Prompt the user to choose from a list of strings returned by a + command, then invoke another command with all occurances of {choice} + replaced by the choice the user made. + """ + choices = ctx.master.commands.call_args(choicecmd, []) + + def callback(opt): + repl = " ".join(cmd) + repl = repl.replace("{choice}", opt) + self.master.commands.call(repl) + + self.master.overlay(overlay.Chooser(choicecmd, choices, "", callback)) + ctx.log.info(choices) + @command.command("console.command") - def console_command(self, partial: str) -> None: + def console_command(self, *partial: typing.Sequence[str]) -> None: """ Prompt the user to edit a command with a (possilby empty) starting value. """ - signals.status_prompt_command.send(partial=partial) + signals.status_prompt_command.send(partial=" ".join(partial) + " ") # type: ignore @command.command("console.view.commands") def view_commands(self) -> None: @@ -146,16 +167,21 @@ def default_keymap(km): km.add("O", "console.view.options") km.add("Q", "console.exit") km.add("q", "console.view.pop") - km.add("i", "console.command 'set intercept='") - km.add("W", "console.command 'set save_stream_file='") + km.add("i", "console.command set intercept=") + km.add("W", "console.command set save_stream_file=") km.add("A", "flow.resume @all", context="flowlist") km.add("a", "flow.resume @focus", context="flowlist") - km.add("b", "console.command 'cut.save s.content|@focus '", context="flowlist") + km.add("b", "console.command cut.save s.content|@focus ''", context="flowlist") km.add("d", "view.remove @focus", context="flowlist") km.add("D", "view.duplicate @focus", context="flowlist") km.add("e", "set console_eventlog=toggle", context="flowlist") - km.add("E", "console.command 'export.file curl @focus '", context="flowlist") + km.add( + "E", + "console.choose Format export.formats " + "console.command export.file {choice} @focus ''", + context="flowlist" + ) km.add("f", "console.command 'set view_filter='", context="flowlist") km.add("F", "set console_focus_follow=toggle", context="flowlist") km.add("g", "view.go 0", context="flowlist") diff --git a/mitmproxy/utils/typecheck.py b/mitmproxy/utils/typecheck.py index c97ff529..a5f27fee 100644 --- a/mitmproxy/utils/typecheck.py +++ b/mitmproxy/utils/typecheck.py @@ -1,7 +1,7 @@ import typing -def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool: +def check_command_type(value: typing.Any, typeinfo: typing.Any) -> bool: """ Check if the provided value is an instance of typeinfo. Returns True if the types match, False otherwise. This function supports only those types @@ -17,7 +17,7 @@ def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool: if not isinstance(value, (tuple, list)): return False for v in value: - if not check_command_return_type(v, T): + if not check_command_type(v, T): return False elif typename.startswith("typing.Union"): try: @@ -26,7 +26,7 @@ def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool: # Python 3.5.x types = typeinfo.__union_params__ # type: ignore for T in types: - checks = [check_command_return_type(value, T) for T in types] + checks = [check_command_type(value, T) for T in types] if not any(checks): return False elif value is None and typeinfo is None: diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index ac082153..958328b2 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -1,9 +1,6 @@ import typing from mitmproxy import command from mitmproxy import flow -from mitmproxy import master -from mitmproxy import options -from mitmproxy import proxy from mitmproxy import exceptions from mitmproxy.test import tflow from mitmproxy.test import taddons @@ -19,24 +16,41 @@ class TAddon: def cmd2(self, foo: str) -> str: return 99 + def cmd3(self, foo: int) -> int: + return foo + def empty(self) -> None: pass + def varargs(self, one: str, *var: typing.Sequence[str]) -> typing.Sequence[str]: + return list(var) + class TestCommand: + def test_varargs(self): + with taddons.context() as tctx: + cm = command.CommandManager(tctx.master) + a = TAddon() + c = command.Command(cm, "varargs", a.varargs) + assert c.signature_help() == "varargs str *str -> [str]" + assert c.call(["one", "two", "three"]) == ["two", "three"] + with pytest.raises(exceptions.CommandError): + c.call(["one", "two", 3]) + def test_call(self): - o = options.Options() - m = master.Master(o, proxy.DummyServer(o)) - cm = command.CommandManager(m) + with taddons.context() as tctx: + cm = command.CommandManager(tctx.master) + a = TAddon() + c = command.Command(cm, "cmd.path", a.cmd1) + assert c.call(["foo"]) == "ret foo" + assert c.signature_help() == "cmd.path str -> str" - a = TAddon() - c = command.Command(cm, "cmd.path", a.cmd1) - assert c.call(["foo"]) == "ret foo" - assert c.signature_help() == "cmd.path str -> str" + c = command.Command(cm, "cmd.two", a.cmd2) + with pytest.raises(exceptions.CommandError): + c.call(["foo"]) - c = command.Command(cm, "cmd.two", a.cmd2) - with pytest.raises(exceptions.CommandError): - c.call(["foo"]) + c = command.Command(cm, "cmd.three", a.cmd3) + assert c.call(["1"]) == 1 def test_simple(): @@ -74,14 +88,12 @@ def test_typename(): class DummyConsole: - def load(self, l): - l.add_command("view.resolve", self.resolve) - l.add_command("cut", self.cut) - + @command.command("view.resolve") def resolve(self, spec: str) -> typing.Sequence[flow.Flow]: n = int(spec) return [tflow.tflow(resp=True)] * n + @command.command("cut") def cut(self, spec: str) -> command.Cuts: return [["test"]] @@ -115,6 +127,13 @@ def test_parsearg(): tctx.master.commands, "foo", command.Cuts ) == [["test"]] + assert command.parsearg( + tctx.master.commands, "foo", typing.Sequence[str] + ) == ["foo"] + assert command.parsearg( + tctx.master.commands, "foo, bar", typing.Sequence[str] + ) == ["foo", "bar"] + class TDec: @command.command("cmd1") diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py index 17f70d37..fe33070e 100644 --- a/test/mitmproxy/utils/test_typecheck.py +++ b/test/mitmproxy/utils/test_typecheck.py @@ -88,25 +88,26 @@ def test_check_any(): typecheck.check_option_type("foo", None, typing.Any) -def test_check_command_return_type(): - assert(typecheck.check_command_return_type("foo", str)) - assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str])) - assert(typecheck.check_command_return_type(None, None)) - assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int])) - assert(not typecheck.check_command_return_type("foo", typing.Sequence[int])) - assert(typecheck.check_command_return_type([["foo", b"bar"]], command.Cuts)) - assert(not typecheck.check_command_return_type(["foo", b"bar"], command.Cuts)) - assert(not typecheck.check_command_return_type([["foo", 22]], command.Cuts)) +def test_check_command_type(): + assert(typecheck.check_command_type("foo", str)) + assert(typecheck.check_command_type(["foo"], typing.Sequence[str])) + assert(not typecheck.check_command_type(["foo", 1], typing.Sequence[str])) + assert(typecheck.check_command_type(None, None)) + assert(not typecheck.check_command_type(["foo"], typing.Sequence[int])) + assert(not typecheck.check_command_type("foo", typing.Sequence[int])) + assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts)) + assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts)) + assert(not typecheck.check_command_type([["foo", 22]], command.Cuts)) # Python 3.5 only defines __parameters__ m = mock.Mock() m.__str__ = lambda self: "typing.Sequence" m.__parameters__ = (int,) - typecheck.check_command_return_type([10], m) + typecheck.check_command_type([10], m) # Python 3.5 only defines __union_params__ m = mock.Mock() m.__str__ = lambda self: "typing.Union" m.__union_params__ = (int,) - assert not typecheck.check_command_return_type([22], m) + assert not typecheck.check_command_type([22], m) |