diff options
-rw-r--r-- | mitmproxy/command.py | 333 | ||||
-rw-r--r-- | mitmproxy/tools/console/commander/commander.py | 101 | ||||
-rw-r--r-- | mitmproxy/tools/console/commands.py | 23 | ||||
-rw-r--r-- | mitmproxy/tools/console/consoleaddons.py | 6 | ||||
-rw-r--r-- | mitmproxy/types.py | 123 | ||||
-rw-r--r-- | test/mitmproxy/test_command.py | 250 | ||||
-rw-r--r-- | test/mitmproxy/test_types.py | 16 |
7 files changed, 446 insertions, 406 deletions
diff --git a/mitmproxy/command.py b/mitmproxy/command.py index f2a87e1e..a64c7404 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -1,19 +1,20 @@ """ This module manages and invokes typed commands. """ +import functools import inspect +import sys +import textwrap import types import typing -import textwrap -import functools -import sys + import pyparsing -from mitmproxy import exceptions import mitmproxy.types +from mitmproxy import exceptions -def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None: +def verify_arg_signature(f: typing.Callable, args: typing.Iterable[typing.Any], kwargs: dict) -> None: sig = inspect.signature(f) try: sig.bind(*args, **kwargs) @@ -33,122 +34,135 @@ def typename(t: type) -> str: return to.display +def _empty_as_none(x: typing.Any) -> typing.Any: + if x == inspect.Signature.empty: + return None + return x + + +class CommandParameter(typing.NamedTuple): + display_name: str + type: typing.Type + + class Command: - returntype: typing.Optional[typing.Type] + name: str + manager: "CommandManager" + signature: inspect.Signature + help: typing.Optional[str] - def __init__(self, manager, path, func) -> None: - self.path = path + def __init__(self, manager: "CommandManager", name: str, func: typing.Callable) -> None: + self.name = name self.manager = manager self.func = func - sig = inspect.signature(self.func) - self.help = None + self.signature = inspect.signature(self.func) + if func.__doc__: txt = func.__doc__.strip() self.help = "\n".join(textwrap.wrap(txt)) - - self.has_positional = False - for i in sig.parameters.values(): - # This is the kind for *args parameters - if i.kind == i.VAR_POSITIONAL: - self.has_positional = True - self.paramtypes = [v.annotation for v in sig.parameters.values()] - if sig.return_annotation == inspect._empty: # type: ignore - self.returntype = None else: - self.returntype = sig.return_annotation + self.help = None + # This fails with a CommandException if types are invalid - self.signature_help() + for name, parameter in self.signature.parameters.items(): + t = parameter.annotation + if not mitmproxy.types.CommandTypes.get(parameter.annotation, None): + raise exceptions.CommandError(f"Argument {name} has an unknown type ({_empty_as_none(t)}) in {func}.") + if self.return_type and not mitmproxy.types.CommandTypes.get(self.return_type, None): + raise exceptions.CommandError(f"Return type has an unknown type ({self.return_type}) in {func}.") + + @property + def return_type(self) -> typing.Optional[typing.Type]: + return _empty_as_none(self.signature.return_annotation) + + @property + def parameters(self) -> typing.List[CommandParameter]: + """Returns a list of (display name, type) tuples.""" + ret = [] + for name, param in self.signature.parameters.items(): + if param.kind is param.VAR_POSITIONAL: + name = f"*{name}" + ret.append(CommandParameter(name, param.annotation)) + return ret - def paramnames(self) -> typing.Sequence[str]: - v = [typename(i) for i in self.paramtypes] - if self.has_positional: - v[-1] = "*" + v[-1] - return v + def signature_help(self) -> str: + params = " ".join(name for name, t in self.parameters) + if self.return_type: + ret = f" -> {typename(self.return_type)}" + else: + ret = "" + return f"{self.name} {params}{ret}" - def retname(self) -> str: - return typename(self.returntype) if self.returntype else "" + def prepare_args(self, args: typing.Sequence[str]) -> inspect.BoundArguments: + try: + bound_arguments = self.signature.bind(*args) + except TypeError as v: + raise exceptions.CommandError(f"Command argument mismatch: {v.args[0]}") - def signature_help(self) -> str: - params = " ".join(self.paramnames()) - ret = self.retname() - if ret: - ret = " -> " + ret - return "%s %s%s" % (self.path, params, ret) - - def prepare_args(self, args: typing.Sequence[str]) -> typing.List[typing.Any]: - - # Arguments that are just blank spaces aren't really arguments - # We need to get rid of those. If the user intended to pass a sequence - # of spaces, it would come between quotes - args = [a for a in args if a.strip() != ''] - verify_arg_signature(self.func, list(args), {}) - - remainder: typing.Sequence[str] = [] - if self.has_positional: - remainder = args[len(self.paramtypes) - 1:] - args = args[:len(self.paramtypes) - 1] - - pargs = [] - for arg, paramtype in zip(args, self.paramtypes): - pargs.append(parsearg(self.manager, arg, paramtype)) - pargs.extend(remainder) - return pargs + for name, value in bound_arguments.arguments.items(): + convert_to = self.signature.parameters[name].annotation + bound_arguments.arguments[name] = parsearg(self.manager, value, convert_to) + + bound_arguments.apply_defaults() + + return bound_arguments def call(self, args: typing.Sequence[str]) -> typing.Any: """ - Call the command with a list of arguments. At this point, all - arguments are strings. + Call the command with a list of arguments. At this point, all + arguments are strings. """ - ret = self.func(*self.prepare_args(args)) - if ret is None and self.returntype is None: + bound_args = self.prepare_args(args) + ret = self.func(*bound_args.args, **bound_args.kwargs) + if ret is None and self.return_type is None: return - typ = mitmproxy.types.CommandTypes.get(self.returntype) + typ = mitmproxy.types.CommandTypes.get(self.return_type) + assert typ if not typ.is_valid(self.manager, typ, ret): raise exceptions.CommandError( - "%s returned unexpected data - expected %s" % ( - self.path, typ.display - ) + f"{self.name} returned unexpected data - expected {typ.display}" ) return ret -ParseResult = typing.NamedTuple( - "ParseResult", - [ - ("value", str), - ("type", typing.Type), - ("valid", bool), - ], -) +class ParseResult(typing.NamedTuple): + value: str + type: typing.Type + valid: bool -class CommandManager(mitmproxy.types._CommandBase): +class CommandManager: + commands: typing.Dict[str, Command] + def __init__(self, master): self.master = master - self.commands: typing.Dict[str, Command] = {} - - self.regex = pyparsing.QuotedString("\"", escChar='\\', unquoteResults=False) |\ - pyparsing.QuotedString("'", escChar='\\', unquoteResults=False) |\ - pyparsing.Combine(pyparsing.Literal('"') + pyparsing.Word(pyparsing.printables + " ") + pyparsing.StringEnd()) |\ - pyparsing.Word(pyparsing.printables) |\ - pyparsing.Word(" \r\n\t") - self.regex = self.regex.leaveWhitespace() + self.commands = {} + + self.expr_parser = pyparsing.ZeroOrMore( + pyparsing.QuotedString('"', escChar='\\', unquoteResults=False) + | pyparsing.QuotedString("'", escChar='\\', unquoteResults=False) + | pyparsing.Combine(pyparsing.Literal('"') + + pyparsing.Word(pyparsing.printables + " ") + + pyparsing.StringEnd()) + | pyparsing.Word(pyparsing.printables) + | pyparsing.Word(" \r\n\t") + ).leaveWhitespace() def collect_commands(self, addon): for i in dir(addon): if not i.startswith("__"): o = getattr(addon, i) try: - is_command = hasattr(o, "command_path") + is_command = hasattr(o, "command_name") except Exception: pass # hasattr may raise if o implements __getattr__. else: if is_command: try: - self.add(o.command_path, o) + self.add(o.command_name, o) except exceptions.CommandError as e: self.master.log.warn( - "Could not load command %s: %s" % (o.command_path, e) + "Could not load command %s: %s" % (o.command_name, e) ) def add(self, path: str, func: typing.Callable): @@ -156,105 +170,100 @@ class CommandManager(mitmproxy.types._CommandBase): @functools.lru_cache(maxsize=128) def parse_partial( - self, - cmdstr: str - ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[str]]: + self, + cmdstr: str + ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[CommandParameter]]: """ - Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items. + Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items. """ - parts: typing.List[str] = [] - for t, start, end in self.regex.scanString(cmdstr): - parts.append(t[0]) - - parse: typing.List[ParseResult] = [] - params: typing.List[type] = [] - typ: typing.Type - cmd_found: bool = False - for i in range(len(parts)): - if not parts[i].isspace(): - if not cmd_found: - cmd_found = True - typ = mitmproxy.types.Cmd - if parts[i] in self.commands: - params.extend(self.commands[parts[i]].paramtypes) - elif params: - typ = params.pop(0) - if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg: - if parts[i] in self.commands: - params[:] = self.commands[parts[i]].paramtypes + + parts: typing.List[str] = self.expr_parser.parseString(cmdstr) + + parsed: typing.List[ParseResult] = [] + next_params: typing.List[CommandParameter] = [ + CommandParameter("", mitmproxy.types.Cmd), + CommandParameter("", mitmproxy.types.CmdArgs), + ] + for part in parts: + if part.isspace(): + parsed.append( + ParseResult( + value=part, + type=mitmproxy.types.Space, + valid=True, + ) + ) + continue + + if next_params: + expected_type: typing.Type = next_params.pop(0).type else: - # If the token is just a bunch of spaces, then we don't - # want to count it against the arguments of the command - typ = mitmproxy.types.Unknown + expected_type = mitmproxy.types.Unknown - to = mitmproxy.types.CommandTypes.get(typ, None) + arg_is_known_command = ( + expected_type == mitmproxy.types.Cmd and part in self.commands + ) + arg_is_unknown_command = ( + expected_type == mitmproxy.types.Cmd and part not in self.commands + ) + command_args_following = ( + next_params and next_params[0].type == mitmproxy.types.CmdArgs + ) + if arg_is_known_command and command_args_following: + next_params = self.commands[part].parameters + next_params[1:] + if arg_is_unknown_command and command_args_following: + next_params.pop(0) + + to = mitmproxy.types.CommandTypes.get(expected_type, None) valid = False if to: try: - to.parse(self, typ, parts[i]) + to.parse(self, expected_type, part) except exceptions.TypeError: valid = False else: valid = True - parse.append( + parsed.append( ParseResult( - value=parts[i], - type=typ, + value=part, + type=expected_type, valid=valid, ) ) - remhelp: typing.List[str] = [] - for x in params: - remt = mitmproxy.types.CommandTypes.get(x, None) - remhelp.append(remt.display) - - return parse, remhelp + return parsed, next_params - def call(self, path: str, *args: typing.Sequence[typing.Any]) -> typing.Any: + def call(self, command_name: str, *args: typing.Sequence[typing.Any]) -> typing.Any: """ - Call a command with native arguments. May raise CommandError. + Call a command with native arguments. May raise CommandError. """ - if path not in self.commands: - raise exceptions.CommandError("Unknown command: %s" % path) - return self.commands[path].func(*args) + if command_name not in self.commands: + raise exceptions.CommandError("Unknown command: %s" % command_name) + return self.commands[command_name].func(*args) - def _call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any: + def _call_strings(self, command_name: str, args: typing.Sequence[str]) -> typing.Any: """ - Call a command using a list of string arguments. May raise CommandError. + Call a command using a list of string arguments. May raise CommandError. """ - if path not in self.commands: - raise exceptions.CommandError("Unknown command: %s" % path) + if command_name not in self.commands: + raise exceptions.CommandError("Unknown command: %s" % command_name) - return self.commands[path].call(args) + return self.commands[command_name].call(args) - def execute(self, cmdstr: str): + def execute(self, cmdstr: str) -> typing.Any: """ - Execute a command string. May raise CommandError. + Execute a command string. May raise CommandError. """ parts, _ = self.parse_partial(cmdstr) - params = [] - for p in parts: - v = p.value.strip() - if v != '': - if ((v.startswith("'") and v.endswith("'")) or - (v.startswith("\"") and v.endswith("\""))) and \ - len(v.split(' ')) == 1: - # If this parameter is between quotes but has no spaces in - # it, then it is safe to remove the quotes to pass it down - # This allows any commands that take a simple spaceless - # string as a parameter to work. For example - # view.flows.create get "http://www.example.com" won't work - # if the quotes are there as it won't see the param as a URL - v = v[1:-1] - - params.append(v) - - if len(parts) == 0: - raise exceptions.CommandError("Invalid command: %s" % cmdstr) - - return self._call_strings(params[0], params[1:]) + if not parts: + raise exceptions.CommandError(f"Invalid command: {cmdstr!r}") + command_name, *args = [ + unquote(part.value) + for part in parts + if part.type != mitmproxy.types.Space + ] + return self._call_strings(command_name, args) def dump(self, out=sys.stdout) -> None: cmds = list(self.commands.values()) @@ -266,27 +275,37 @@ class CommandManager(mitmproxy.types._CommandBase): print(file=out) +def unquote(x: str) -> str: + if x.startswith("'") and x.endswith("'"): + return x[1:-1] + if x.startswith('"') and x.endswith('"'): + return x[1:-1] + return x + + def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any: """ Convert a string to a argument to the appropriate type. """ t = mitmproxy.types.CommandTypes.get(argtype, None) if not t: - raise exceptions.CommandError("Unsupported argument type: %s" % argtype) + raise exceptions.CommandError(f"Unsupported argument type: {argtype}") try: - return t.parse(manager, argtype, spec) # type: ignore + return t.parse(manager, argtype, spec) except exceptions.TypeError as e: raise exceptions.CommandError from e -def command(path): +def command(name: typing.Optional[str]): def decorator(function): @functools.wraps(function) def wrapper(*args, **kwargs): verify_arg_signature(function, args, kwargs) return function(*args, **kwargs) - wrapper.__dict__["command_path"] = path + + wrapper.__dict__["command_name"] = name or function.__name__ return wrapper + return decorator @@ -296,8 +315,10 @@ def argument(name, type): specific types such as mitmproxy.types.Choice, which we cannot annotate directly as mypy does not like that. """ + def decorator(f: types.FunctionType) -> types.FunctionType: assert name in f.__annotations__ f.__annotations__[name] = type return f + return decorator diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index bff58605..ba3e601e 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -1,52 +1,49 @@ import abc +import collections import copy import typing -import collections import urwid from urwid.text_layout import calc_coords +import mitmproxy.command import mitmproxy.flow import mitmproxy.master -import mitmproxy.command import mitmproxy.types -class Completer: # pragma: no cover +class Completer: @abc.abstractmethod - def cycle(self) -> str: - pass + def cycle(self, forward: bool) -> str: + raise NotImplementedError() class ListCompleter(Completer): def __init__( - self, - start: str, - options: typing.Sequence[str], + self, + start: str, + options: typing.Sequence[str], ) -> None: self.start = start - self.options: typing.Sequence[str] = [] + self.options: typing.List[str] = [] for o in options: if o.startswith(start): self.options.append(o) self.options.sort() self.offset = 0 - def cycle(self) -> str: + def cycle(self, forward: bool) -> str: if not self.options: return self.start ret = self.options[self.offset] - self.offset = (self.offset + 1) % len(self.options) + delta = 1 if forward else -1 + self.offset = (self.offset + delta) % len(self.options) return ret -CompletionState = typing.NamedTuple( - "CompletionState", - [ - ("completer", Completer), - ("parse", typing.Sequence[mitmproxy.command.ParseResult]) - ] -) +class CompletionState(typing.NamedTuple): + completer: Completer + parsed: typing.Sequence[mitmproxy.command.ParseResult] class CommandBuffer: @@ -70,30 +67,13 @@ class CommandBuffer: else: self._cursor = x - def parse_quoted(self, txt): - parts, remhelp = self.master.commands.parse_partial(txt) - for i, p in enumerate(parts): - parts[i] = mitmproxy.command.ParseResult( - value = p.value, - type = p.type, - valid = p.valid - ) - return parts, remhelp - def render(self): - """ - This function is somewhat tricky - in order to make the cursor - position valid, we have to make sure there is a - character-for-character offset match in the rendered output, up - to the cursor. Beyond that, we can add stuff. - """ - parts, remhelp = self.parse_quoted(self.text) + parts, remaining = self.master.commands.parse_partial(self.text) ret = [] - if parts == []: + if not parts: # Means we just received the leader, so we need to give a blank # text to the widget to render or it crashes ret.append(("text", "")) - ret.append(("text", " ")) else: for p in parts: if p.valid: @@ -104,10 +84,11 @@ class CommandBuffer: elif p.value: ret.append(("commander_invalid", p.value)) - if remhelp: - ret.append(("text", " ")) - for v in remhelp: - ret.append(("commander_hint", "%s " % v)) + if remaining: + if parts[-1].type != mitmproxy.types.Space: + ret.append(("text", " ")) + for param in remaining: + ret.append(("commander_hint", f"{param.display_name} ")) return ret @@ -117,23 +98,31 @@ class CommandBuffer: def right(self) -> None: self.cursor = self.cursor + 1 - def cycle_completion(self) -> None: + def cycle_completion(self, forward: bool) -> None: if not self.completion: - parts, remainhelp = self.master.commands.parse_partial(self.text[:self.cursor]) - last = parts[-1] - ct = mitmproxy.types.CommandTypes.get(last.type, None) + parts, remaining = self.master.commands.parse_partial(self.text[:self.cursor]) + if parts and parts[-1].type != mitmproxy.types.Space: + type_to_complete = parts[-1].type + cycle_prefix = parts[-1].value + parsed = parts[:-1] + elif remaining: + type_to_complete = remaining[0].type + cycle_prefix = "" + parsed = parts + else: + return + ct = mitmproxy.types.CommandTypes.get(type_to_complete, None) if ct: self.completion = CompletionState( - completer = ListCompleter( - parts[-1].value, - ct.completion(self.master.commands, last.type, parts[-1].value) + completer=ListCompleter( + cycle_prefix, + ct.completion(self.master.commands, type_to_complete, cycle_prefix) ), - parse = parts, + parsed=parsed, ) if self.completion: - nxt = self.completion.completer.cycle() - buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt - buf = buf.strip() + nxt = self.completion.completer.cycle(forward) + buf = "".join([i.value for i in self.completion.parsed]) + nxt self.text = buf self.cursor = len(self.text) @@ -159,7 +148,7 @@ class CommandBuffer: class CommandHistory: - def __init__(self, master: mitmproxy.master.Master, size: int=30) -> None: + def __init__(self, master: mitmproxy.master.Master, size: int = 30) -> None: self.saved_commands: collections.deque = collections.deque( [CommandBuffer(master, "")], maxlen=size @@ -182,7 +171,7 @@ class CommandHistory: return self.saved_commands[self.index] return None - def add_command(self, command: CommandBuffer, execution: bool=False) -> None: + def add_command(self, command: CommandBuffer, execution: bool = False) -> None: if self.index == self.last_index or execution: last_item = self.saved_commands[-1] last_item_empty = not last_item.text @@ -219,8 +208,10 @@ class CommandEdit(urwid.WidgetWrap): self.cbuf = self.history.get_prev() or self.cbuf elif key == "down": self.cbuf = self.history.get_next() or self.cbuf + elif key == "shift tab": + self.cbuf.cycle_completion(False) elif key == "tab": - self.cbuf.cycle_completion() + self.cbuf.cycle_completion(True) elif len(key) == 1: self.cbuf.insert(key) self.update() diff --git a/mitmproxy/tools/console/commands.py b/mitmproxy/tools/console/commands.py index 0f35742b..d35a6b8a 100644 --- a/mitmproxy/tools/console/commands.py +++ b/mitmproxy/tools/console/commands.py @@ -1,6 +1,8 @@ import urwid import blinker import textwrap + +from mitmproxy import command from mitmproxy.tools.console import layoutwidget from mitmproxy.tools.console import signals @@ -10,7 +12,7 @@ command_focus_change = blinker.Signal() class CommandItem(urwid.WidgetWrap): - def __init__(self, walker, cmd, focused): + def __init__(self, walker, cmd: command.Command, focused: bool): self.walker, self.cmd, self.focused = walker, cmd, focused super().__init__(None) self._w = self.get_widget() @@ -18,15 +20,18 @@ class CommandItem(urwid.WidgetWrap): def get_widget(self): parts = [ ("focus", ">> " if self.focused else " "), - ("title", self.cmd.path), - ("text", " "), - ("text", " ".join(self.cmd.paramnames())), + ("title", self.cmd.name) ] - if self.cmd.returntype: - parts.append([ + if self.cmd.parameters: + parts += [ + ("text", " "), + ("text", " ".join(name for name, t in self.cmd.parameters)), + ] + if self.cmd.return_type: + parts += [ ("title", " -> "), - ("text", self.cmd.retname()), - ]) + ("text", command.typename(self.cmd.return_type)), + ] return urwid.AttrMap( urwid.Padding(urwid.Text(parts)), @@ -92,7 +97,7 @@ class CommandsList(urwid.ListBox): def keypress(self, size, key): if key == "m_select": foc, idx = self.get_focus() - signals.status_prompt_command.send(partial=foc.cmd.path + " ") + signals.status_prompt_command.send(partial=foc.cmd.name + " ") elif key == "m_start": self.set_focus(0) self.walker._modified() diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py index 9e0533a4..b5263f6f 100644 --- a/mitmproxy/tools/console/consoleaddons.py +++ b/mitmproxy/tools/console/consoleaddons.py @@ -238,7 +238,7 @@ class ConsoleAddon: prompt: str, choices: typing.Sequence[str], cmd: mitmproxy.types.Cmd, - *args: mitmproxy.types.Arg + *args: mitmproxy.types.CmdArgs ) -> None: """ Prompt the user to choose from a specified list of strings, then @@ -264,7 +264,7 @@ class ConsoleAddon: prompt: str, choicecmd: mitmproxy.types.Cmd, subcmd: mitmproxy.types.Cmd, - *args: mitmproxy.types.Arg + *args: mitmproxy.types.CmdArgs ) -> None: """ Prompt the user to choose from a list of strings returned by a @@ -573,7 +573,7 @@ class ConsoleAddon: contexts: typing.Sequence[str], key: str, cmd: mitmproxy.types.Cmd, - *args: mitmproxy.types.Arg + *args: mitmproxy.types.CmdArgs ) -> None: """ Bind a shortcut key. diff --git a/mitmproxy/types.py b/mitmproxy/types.py index b48aef84..1f1c503b 100644 --- a/mitmproxy/types.py +++ b/mitmproxy/types.py @@ -5,6 +5,9 @@ import typing from mitmproxy import exceptions from mitmproxy import flow +if typing.TYPE_CHECKING: + from mitmproxy.command import CommandManager + class Path(str): pass @@ -14,7 +17,7 @@ class Cmd(str): pass -class Arg(str): +class CmdArgs(str): pass @@ -22,6 +25,10 @@ class Unknown(str): pass +class Space(str): + pass + + class CutSpec(typing.Sequence[str]): pass @@ -40,27 +47,11 @@ class Choice: return False -# One of the many charming things about mypy is that introducing type -# annotations can cause circular dependencies where there were none before. -# Rather than putting types and the CommandManger in the same file, we introduce -# a stub type with the signature we use. -class _CommandBase: - commands: typing.MutableMapping[str, typing.Any] = {} - - def _call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any: - raise NotImplementedError - - def execute(self, cmd: str) -> typing.Any: - raise NotImplementedError - - class _BaseType: typ: typing.Type = object display: str = "" - def completion( - self, manager: _CommandBase, t: typing.Any, s: str - ) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: typing.Any, s: str) -> typing.Sequence[str]: """ Returns a list of completion strings for a given prefix. The strings returned don't necessarily need to be suffixes of the prefix, since @@ -68,9 +59,7 @@ class _BaseType: """ raise NotImplementedError - def parse( - self, manager: _CommandBase, typ: typing.Any, s: str - ) -> typing.Any: + def parse(self, manager: "CommandManager", typ: typing.Any, s: str) -> typing.Any: """ Parse a string, given the specific type instance (to allow rich type annotations like Choice) and a string. @@ -78,7 +67,7 @@ class _BaseType: """ raise NotImplementedError - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: """ Check if data is valid for this type. """ @@ -89,10 +78,10 @@ class _BoolType(_BaseType): typ = bool display = "bool" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return ["false", "true"] - def parse(self, manager: _CommandBase, t: type, s: str) -> bool: + def parse(self, manager: "CommandManager", t: type, s: str) -> bool: if s == "true": return True elif s == "false": @@ -102,7 +91,7 @@ class _BoolType(_BaseType): "Booleans are 'true' or 'false', got %s" % s ) - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return val in [True, False] @@ -110,13 +99,13 @@ class _StrType(_BaseType): typ = str display = "str" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return [] - def parse(self, manager: _CommandBase, t: type, s: str) -> str: + def parse(self, manager: "CommandManager", t: type, s: str) -> str: return s - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return isinstance(val, str) @@ -124,13 +113,13 @@ class _UnknownType(_BaseType): typ = Unknown display = "unknown" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return [] - def parse(self, manager: _CommandBase, t: type, s: str) -> str: + def parse(self, manager: "CommandManager", t: type, s: str) -> str: return s - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return False @@ -138,16 +127,16 @@ class _IntType(_BaseType): typ = int display = "int" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return [] - def parse(self, manager: _CommandBase, t: type, s: str) -> int: + def parse(self, manager: "CommandManager", t: type, s: str) -> int: try: return int(s) except ValueError as e: raise exceptions.TypeError from e - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return isinstance(val, int) @@ -155,7 +144,7 @@ class _PathType(_BaseType): typ = Path display = "path" - def completion(self, manager: _CommandBase, t: type, start: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, start: str) -> typing.Sequence[str]: if not start: start = "./" path = os.path.expanduser(start) @@ -177,10 +166,10 @@ class _PathType(_BaseType): ret.sort() return ret - def parse(self, manager: _CommandBase, t: type, s: str) -> str: + def parse(self, manager: "CommandManager", t: type, s: str) -> str: return os.path.expanduser(s) - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return isinstance(val, str) @@ -188,43 +177,43 @@ class _CmdType(_BaseType): typ = Cmd display = "cmd" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return list(manager.commands.keys()) - def parse(self, manager: _CommandBase, t: type, s: str) -> str: + def parse(self, manager: "CommandManager", t: type, s: str) -> str: if s not in manager.commands: raise exceptions.TypeError("Unknown command: %s" % s) return s - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return val in manager.commands class _ArgType(_BaseType): - typ = Arg + typ = CmdArgs display = "arg" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return [] - def parse(self, manager: _CommandBase, t: type, s: str) -> str: - return s + def parse(self, manager: "CommandManager", t: type, s: str) -> str: + raise exceptions.TypeError("Arguments for unknown command.") - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return isinstance(val, str) class _StrSeqType(_BaseType): typ = typing.Sequence[str] - display = "[str]" + display = "str[]" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return [] - def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def parse(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return [x.strip() for x in s.split(",")] - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: if isinstance(val, str) or isinstance(val, bytes): return False try: @@ -238,7 +227,7 @@ class _StrSeqType(_BaseType): class _CutSpecType(_BaseType): typ = CutSpec - display = "[cut]" + display = "cut[]" valid_prefixes = [ "request.method", "request.scheme", @@ -277,7 +266,7 @@ class _CutSpecType(_BaseType): "server_conn.tls_established", ] - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: spec = s.split(",") opts = [] for pref in self.valid_prefixes: @@ -285,11 +274,11 @@ class _CutSpecType(_BaseType): opts.append(",".join(spec)) return opts - def parse(self, manager: _CommandBase, t: type, s: str) -> CutSpec: + def parse(self, manager: "CommandManager", t: type, s: str) -> CutSpec: parts: typing.Any = s.split(",") return parts - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: if not isinstance(val, str): return False parts = [x.strip() for x in val.split(",")] @@ -327,7 +316,7 @@ class _BaseFlowType(_BaseType): "~c", ] - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return self.valid_prefixes @@ -335,7 +324,7 @@ class _FlowType(_BaseFlowType): typ = flow.Flow display = "flow" - def parse(self, manager: _CommandBase, t: type, s: str) -> flow.Flow: + def parse(self, manager: "CommandManager", t: type, s: str) -> flow.Flow: try: flows = manager.execute("view.flows.resolve %s" % (s)) except exceptions.CommandError as e: @@ -346,21 +335,21 @@ class _FlowType(_BaseFlowType): ) return flows[0] - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return isinstance(val, flow.Flow) class _FlowsType(_BaseFlowType): typ = typing.Sequence[flow.Flow] - display = "[flow]" + display = "flow[]" - def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[flow.Flow]: + def parse(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[flow.Flow]: try: return manager.execute("view.flows.resolve %s" % (s)) except exceptions.CommandError as e: raise exceptions.TypeError from e - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: try: for v in val: if not isinstance(v, flow.Flow): @@ -372,19 +361,19 @@ class _FlowsType(_BaseFlowType): class _DataType(_BaseType): typ = Data - display = "[data]" + display = "data[][]" def completion( - self, manager: _CommandBase, t: type, s: str + self, manager: "CommandManager", t: type, s: str ) -> typing.Sequence[str]: # pragma: no cover raise exceptions.TypeError("data cannot be passed as argument") def parse( - self, manager: _CommandBase, t: type, s: str + self, manager: "CommandManager", t: type, s: str ) -> typing.Any: # pragma: no cover raise exceptions.TypeError("data cannot be passed as argument") - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: # FIXME: validate that all rows have equal length, and all columns have equal types try: for row in val: @@ -400,16 +389,16 @@ class _ChoiceType(_BaseType): typ = Choice display = "choice" - def completion(self, manager: _CommandBase, t: Choice, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: Choice, s: str) -> typing.Sequence[str]: return manager.execute(t.options_command) - def parse(self, manager: _CommandBase, t: Choice, s: str) -> str: + def parse(self, manager: "CommandManager", t: Choice, s: str) -> str: opts = manager.execute(t.options_command) if s not in opts: raise exceptions.TypeError("Invalid choice.") return s - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: try: opts = manager.execute(typ.options_command) except exceptions.CommandError: @@ -423,7 +412,7 @@ class TypeManager: for t in types: self.typemap[t.typ] = t() - def get(self, t: typing.Optional[typing.Type], default=None) -> _BaseType: + def get(self, t: typing.Optional[typing.Type], default=None) -> typing.Optional[_BaseType]: if type(t) in self.typemap: return self.typemap[type(t)] return self.typemap.get(t, default) diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index eb3857bf..2a1dfd08 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -1,13 +1,15 @@ -import typing import inspect +import io +import typing + +import pytest + +import mitmproxy.types from mitmproxy import command -from mitmproxy import flow from mitmproxy import exceptions -from mitmproxy.test import tflow +from mitmproxy import flow from mitmproxy.test import taddons -import mitmproxy.types -import io -import pytest +from mitmproxy.test import tflow class TAddon: @@ -29,7 +31,7 @@ class TAddon: return "ok" @command.command("subcommand") - def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.Arg) -> str: + def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.CmdArgs) -> str: return "ok" @command.command("empty") @@ -83,14 +85,14 @@ class TestCommand: with pytest.raises(exceptions.CommandError): command.Command(cm, "invalidret", a.invalidret) with pytest.raises(exceptions.CommandError): - command.Command(cm, "invalidarg", a.invalidarg) + assert command.Command(cm, "invalidarg", a.invalidarg) def test_varargs(self): with taddons.context() as tctx: cm = command.CommandManager(tctx.master) a = TAddon() c = command.Command(cm, "varargs", a.varargs) - assert c.signature_help() == "varargs str *str -> [str]" + assert c.signature_help() == "varargs one *var -> str[]" assert c.call(["one", "two", "three"]) == ["two", "three"] def test_call(self): @@ -99,7 +101,7 @@ class TestCommand: a = TAddon() c = command.Command(cm, "cmd.path", a.cmd1) assert c.call(["foo"]) == "ret foo" - assert c.signature_help() == "cmd.path str -> str" + assert c.signature_help() == "cmd.path foo -> str" c = command.Command(cm, "cmd.two", a.cmd2) with pytest.raises(exceptions.CommandError): @@ -113,239 +115,272 @@ class TestCommand: [ "foo bar", [ - command.ParseResult(value = "foo", type = mitmproxy.types.Cmd, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "bar", type = mitmproxy.types.Unknown, valid = False) + command.ParseResult(value="foo", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="bar", type=mitmproxy.types.Unknown, valid=False) ], [], ], [ "cmd1 'bar", [ - command.ParseResult(value = "cmd1", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "'bar", type = str, valid = True) + command.ParseResult(value="cmd1", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="'bar", type=str, valid=True) ], [], ], [ "a", - [command.ParseResult(value = "a", type = mitmproxy.types.Cmd, valid = False)], + [command.ParseResult(value="a", type=mitmproxy.types.Cmd, valid=False)], [], ], [ "", [], - [] + [ + command.CommandParameter("", mitmproxy.types.Cmd), + command.CommandParameter("", mitmproxy.types.CmdArgs) + ] ], [ "cmd3 1", [ - command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "1", type = int, valid = True), + command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="1", type=int, valid=True), ], [] ], [ "cmd3 ", [ - command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), + command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), ], - ['int'] + [command.CommandParameter('foo', int)] ], [ "subcommand ", [ - command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd, valid = True,), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), + command.ParseResult(value="subcommand", type=mitmproxy.types.Cmd, valid=True, ), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + ], + [ + command.CommandParameter('cmd', mitmproxy.types.Cmd), + command.CommandParameter('*args', mitmproxy.types.CmdArgs), ], - ["cmd", "arg"], ], [ "subcommand cmd3 ", [ - command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), + command.ParseResult(value="subcommand", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), ], - ["int"] + [command.CommandParameter('foo', int)] ], [ "cmd4", [ - command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True), ], - ["int", "str", "path"] + [ + command.CommandParameter('a', int), + command.CommandParameter('b', str), + command.CommandParameter('c', mitmproxy.types.Path), + ] ], [ "cmd4 ", [ - command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), + command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), ], - ["int", "str", "path"] + [ + command.CommandParameter('a', int), + command.CommandParameter('b', str), + command.CommandParameter('c', mitmproxy.types.Path), + ] ], [ "cmd4 1", [ - command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "1", type = int, valid = True), + command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="1", type=int, valid=True), ], - ["str", "path"] + [ + command.CommandParameter('b', str), + command.CommandParameter('c', mitmproxy.types.Path), + ] ], [ "flow", [ - command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), ], - ["flow", "str"] + [ + command.CommandParameter('f', flow.Flow), + command.CommandParameter('s', str), + ] ], [ "flow ", [ - command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), + command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), ], - ["flow", "str"] + [ + command.CommandParameter('f', flow.Flow), + command.CommandParameter('s', str), + ] ], [ "flow x", [ - command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "x", type = flow.Flow, valid = False), + command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="x", type=flow.Flow, valid=False), ], - ["str"] + [ + command.CommandParameter('s', str), + ] ], [ "flow x ", [ - command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "x", type = flow.Flow, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), + command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="x", type=flow.Flow, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), ], - ["str"] + [ + command.CommandParameter('s', str), + ] ], [ "flow \"one two", [ - command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "\"one two", type = flow.Flow, valid = False), + command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="\"one two", type=flow.Flow, valid=False), ], - ["str"] + [ + command.CommandParameter('s', str), + ] ], [ "flow \"three four\"", [ - command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = '"three four"', type = flow.Flow, valid = False), + command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value='"three four"', type=flow.Flow, valid=False), ], - ["str"] + [ + command.CommandParameter('s', str), + ] ], [ "spaces ' '", [ - command.ParseResult(value = "spaces", type = mitmproxy.types.Cmd, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "' '", type = mitmproxy.types.Unknown, valid = False) + command.ParseResult(value="spaces", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="' '", type=mitmproxy.types.Unknown, valid=False) ], [], ], [ 'spaces2 " "', [ - command.ParseResult(value = "spaces2", type = mitmproxy.types.Cmd, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = '" "', type = mitmproxy.types.Unknown, valid = False) + command.ParseResult(value="spaces2", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value='" "', type=mitmproxy.types.Unknown, valid=False) ], [], ], [ '"abc"', [ - command.ParseResult(value = '"abc"', type = mitmproxy.types.Cmd, valid = False), + command.ParseResult(value='"abc"', type=mitmproxy.types.Cmd, valid=False), ], [], ], [ "'def'", [ - command.ParseResult(value = "'def'", type = mitmproxy.types.Cmd, valid = False), + command.ParseResult(value="'def'", type=mitmproxy.types.Cmd, valid=False), ], [], ], [ "cmd10 'a' \"b\" c", [ - command.ParseResult(value = "cmd10", type = mitmproxy.types.Cmd, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "'a'", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = '"b"', type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "c", type = mitmproxy.types.Unknown, valid = False), + command.ParseResult(value="cmd10", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="'a'", type=mitmproxy.types.Unknown, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value='"b"', type=mitmproxy.types.Unknown, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="c", type=mitmproxy.types.Unknown, valid=False), ], [], ], [ "cmd11 'a \"b\" c'", [ - command.ParseResult(value = "cmd11", type = mitmproxy.types.Cmd, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "'a \"b\" c'", type = mitmproxy.types.Unknown, valid = False), + command.ParseResult(value="cmd11", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="'a \"b\" c'", type=mitmproxy.types.Unknown, valid=False), ], [], ], [ 'cmd12 "a \'b\' c"', [ - command.ParseResult(value = "cmd12", type = mitmproxy.types.Cmd, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = '"a \'b\' c"', type = mitmproxy.types.Unknown, valid = False), + command.ParseResult(value="cmd12", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value='"a \'b\' c"', type=mitmproxy.types.Unknown, valid=False), ], [], ], [ r'cmd13 "a \"b\" c"', [ - command.ParseResult(value = "cmd13", type = mitmproxy.types.Cmd, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = r'"a \"b\" c"', type = mitmproxy.types.Unknown, valid = False), + command.ParseResult(value="cmd13", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value=r'"a \"b\" c"', type=mitmproxy.types.Unknown, valid=False), ], [], ], [ r"cmd14 'a \'b\' c'", [ - command.ParseResult(value = "cmd14", type = mitmproxy.types.Cmd, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = r"'a \'b\' c'", type = mitmproxy.types.Unknown, valid = False), + command.ParseResult(value="cmd14", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value=r"'a \'b\' c'", type=mitmproxy.types.Unknown, valid=False), ], [], ], [ " spaces_at_the_begining_are_not_stripped", [ - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "spaces_at_the_begining_are_not_stripped", type = mitmproxy.types.Cmd, valid = False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="spaces_at_the_begining_are_not_stripped", type=mitmproxy.types.Cmd, + valid=False), ], [], ], [ " spaces_at_the_begining_are_not_stripped neither_at_the_end ", [ - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "spaces_at_the_begining_are_not_stripped", type = mitmproxy.types.Cmd, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = "neither_at_the_end", type = mitmproxy.types.Unknown, valid = False), - command.ParseResult(value = " ", type = mitmproxy.types.Unknown, valid = False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="spaces_at_the_begining_are_not_stripped", type=mitmproxy.types.Cmd, + valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="neither_at_the_end", type=mitmproxy.types.Unknown, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), ], [], ], @@ -356,8 +391,7 @@ class TestCommand: tctx.master.addons.add(TAddon()) for s, expected, expectedremain in tests: current, remain = tctx.master.commands.parse_partial(s) - assert current == expected - assert expectedremain == remain + assert (s, current, expectedremain) == (s, expected, remain) def test_simple(): @@ -365,11 +399,11 @@ def test_simple(): c = command.CommandManager(tctx.master) a = TAddon() c.add("one.two", a.cmd1) - assert(c.commands["one.two"].help == "cmd1 help") - assert(c.execute("one.two foo") == "ret foo") - assert(c.execute("one.two \"foo\"") == "ret foo") - assert(c.execute("one.two \"foo bar\"") == "ret \"foo bar\"") - assert(c.call("one.two", "foo") == "ret foo") + assert (c.commands["one.two"].help == "cmd1 help") + assert (c.execute("one.two foo") == "ret foo") + assert (c.execute("one.two \"foo\"") == "ret foo") + assert (c.execute("one.two \"foo bar\"") == "ret foo bar") + assert (c.call("one.two", "foo") == "ret foo") with pytest.raises(exceptions.CommandError, match="Unknown"): c.execute("nonexistent") with pytest.raises(exceptions.CommandError, match="Invalid"): @@ -397,13 +431,13 @@ def test_simple(): def test_typename(): assert command.typename(str) == "str" - assert command.typename(typing.Sequence[flow.Flow]) == "[flow]" + assert command.typename(typing.Sequence[flow.Flow]) == "flow[]" - assert command.typename(mitmproxy.types.Data) == "[data]" - assert command.typename(mitmproxy.types.CutSpec) == "[cut]" + assert command.typename(mitmproxy.types.Data) == "data[][]" + assert command.typename(mitmproxy.types.CutSpec) == "cut[]" assert command.typename(flow.Flow) == "flow" - assert command.typename(typing.Sequence[str]) == "[str]" + assert command.typename(typing.Sequence[str]) == "str[]" assert command.typename(mitmproxy.types.Choice("foo")) == "choice" assert command.typename(mitmproxy.types.Path) == "path" diff --git a/test/mitmproxy/test_types.py b/test/mitmproxy/test_types.py index 571985fb..c8f7afde 100644 --- a/test/mitmproxy/test_types.py +++ b/test/mitmproxy/test_types.py @@ -2,7 +2,6 @@ import pytest import os import typing import contextlib -from unittest import mock import mitmproxy.exceptions import mitmproxy.types @@ -64,13 +63,14 @@ def test_int(): b.parse(tctx.master.commands, int, "foo") -def test_path(tdata): +def test_path(tdata, monkeypatch): with taddons.context() as tctx: b = mitmproxy.types._PathType() assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo" assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar" - with mock.patch.dict("os.environ", {"HOME": "/home/test"}): - assert b.parse(tctx.master.commands, mitmproxy.types.Path, "~/mitm") == "/home/test/mitm" + monkeypatch.setenv("HOME", "/home/test") + monkeypatch.setenv("USERPROFILE", "/home/test") + assert b.parse(tctx.master.commands, mitmproxy.types.Path, "~/mitm") == "/home/test/mitm" assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "foo") is True assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "~/mitm") is True assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, 3) is False @@ -127,10 +127,10 @@ def test_cutspec(): def test_arg(): with taddons.context() as tctx: b = mitmproxy.types._ArgType() - assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == [] - assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo" - assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, "foo") is True - assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, 1) is False + assert b.completion(tctx.master.commands, mitmproxy.types.CmdArgs, "") == [] + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, mitmproxy.types.CmdArgs, "foo") + assert b.is_valid(tctx.master.commands, mitmproxy.types.CmdArgs, 1) is False def test_strseq(): |