diff options
39 files changed, 1208 insertions, 514 deletions
diff --git a/mitmproxy/addons/clientplayback.py b/mitmproxy/addons/clientplayback.py index fcc3209b..bed06e82 100644 --- a/mitmproxy/addons/clientplayback.py +++ b/mitmproxy/addons/clientplayback.py @@ -3,6 +3,7 @@ from mitmproxy import ctx from mitmproxy import io from mitmproxy import flow from mitmproxy import command +import mitmproxy.types import typing @@ -37,7 +38,7 @@ class ClientPlayback: ctx.master.addons.trigger("update", []) @command.command("replay.client.file") - def load_file(self, path: command.Path) -> None: + def load_file(self, path: mitmproxy.types.Path) -> None: try: flows = io.read_flows_from_paths([path]) except exceptions.FlowReadException as e: diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py index 4191d490..2b0b2f14 100644 --- a/mitmproxy/addons/core.py +++ b/mitmproxy/addons/core.py @@ -6,6 +6,7 @@ from mitmproxy import command from mitmproxy import flow from mitmproxy import optmanager from mitmproxy.net.http import status_codes +import mitmproxy.types class Core: @@ -96,7 +97,7 @@ class Core: ] @command.command("flow.set") - @command.argument("spec", type=command.Choice("flow.set.options")) + @command.argument("spec", type=mitmproxy.types.Choice("flow.set.options")) def flow_set( self, flows: typing.Sequence[flow.Flow], @@ -187,7 +188,7 @@ class Core: ctx.log.alert("Toggled encoding on %s flows." % len(updated)) @command.command("flow.encode") - @command.argument("enc", type=command.Choice("flow.encode.options")) + @command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options")) def encode( self, flows: typing.Sequence[flow.Flow], @@ -216,7 +217,7 @@ class Core: return ["gzip", "deflate", "br"] @command.command("options.load") - def options_load(self, path: command.Path) -> None: + def options_load(self, path: mitmproxy.types.Path) -> None: """ Load options from a file. """ @@ -228,7 +229,7 @@ class Core: ) from e @command.command("options.save") - def options_save(self, path: command.Path) -> None: + def options_save(self, path: mitmproxy.types.Path) -> None: """ Save options to a file. """ diff --git a/mitmproxy/addons/cut.py b/mitmproxy/addons/cut.py index efc9e5df..f4b560e8 100644 --- a/mitmproxy/addons/cut.py +++ b/mitmproxy/addons/cut.py @@ -7,6 +7,7 @@ from mitmproxy import flow from mitmproxy import ctx from mitmproxy import certs from mitmproxy.utils import strutils +import mitmproxy.types import pyperclip @@ -35,6 +36,8 @@ def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]: if spec == "host" and is_addr(current): return str(current[0]) elif spec.startswith("header["): + if not current: + return "" return current.headers.get(headername(spec), "") elif isinstance(part, bytes): return part @@ -51,8 +54,8 @@ class Cut: def cut( self, flows: typing.Sequence[flow.Flow], - cuts: typing.Sequence[command.Cut] - ) -> command.Cuts: + cuts: mitmproxy.types.CutSpec, + ) -> mitmproxy.types.Data: """ Cut data from a set of flows. Cut specifications are attribute paths from the base of the flow object, with a few conveniences - "port" @@ -62,17 +65,17 @@ class Cut: or "false", "bytes" are preserved, and all other values are converted to strings. """ - ret = [] + ret = [] # type:typing.List[typing.List[typing.Union[str, bytes]]] for f in flows: ret.append([extract(c, f) for c in cuts]) - return ret + return ret # type: ignore @command.command("cut.save") def save( self, flows: typing.Sequence[flow.Flow], - cuts: typing.Sequence[command.Cut], - path: command.Path + cuts: mitmproxy.types.CutSpec, + path: mitmproxy.types.Path ) -> None: """ Save cuts to file. If there are multiple flows or cuts, the format @@ -84,7 +87,7 @@ class Cut: append = False if path.startswith("+"): append = True - path = command.Path(path[1:]) + path = mitmproxy.types.Path(path[1:]) if len(cuts) == 1 and len(flows) == 1: with open(path, "ab" if append else "wb") as fp: if fp.tell() > 0: @@ -110,7 +113,7 @@ class Cut: def clip( self, flows: typing.Sequence[flow.Flow], - cuts: typing.Sequence[command.Cut], + cuts: mitmproxy.types.CutSpec, ) -> None: """ Send cuts to the clipboard. If there are multiple flows or cuts, the diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py index 5388a0e8..0169f5b1 100644 --- a/mitmproxy/addons/export.py +++ b/mitmproxy/addons/export.py @@ -5,6 +5,7 @@ from mitmproxy import flow from mitmproxy import exceptions from mitmproxy.utils import strutils from mitmproxy.net.http.http1 import assemble +import mitmproxy.types import pyperclip @@ -49,7 +50,7 @@ class Export(): return list(sorted(formats.keys())) @command.command("export.file") - def file(self, fmt: str, f: flow.Flow, path: command.Path) -> None: + def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None: """ Export a flow to path. """ diff --git a/mitmproxy/addons/save.py b/mitmproxy/addons/save.py index 40cd6f82..44afef68 100644 --- a/mitmproxy/addons/save.py +++ b/mitmproxy/addons/save.py @@ -7,6 +7,7 @@ from mitmproxy import flowfilter from mitmproxy import io from mitmproxy import ctx from mitmproxy import flow +import mitmproxy.types class Save: @@ -50,7 +51,7 @@ class Save: self.start_stream_to_path(ctx.options.save_stream_file, self.filt) @command.command("save.file") - def save(self, flows: typing.Sequence[flow.Flow], path: command.Path) -> None: + def save(self, flows: typing.Sequence[flow.Flow], path: mitmproxy.types.Path) -> None: """ Save flows to a file. If the path starts with a +, flows are appended to the file, otherwise it is over-written. @@ -74,6 +75,15 @@ class Save: self.stream.add(flow) self.active_flows.discard(flow) + def websocket_start(self, flow): + if self.stream: + self.active_flows.add(flow) + + def websocket_end(self, flow): + if self.stream: + self.stream.add(flow) + self.active_flows.discard(flow) + def response(self, flow): if self.stream: self.stream.add(flow) diff --git a/mitmproxy/addons/serverplayback.py b/mitmproxy/addons/serverplayback.py index 46968a8d..20fcfc2a 100644 --- a/mitmproxy/addons/serverplayback.py +++ b/mitmproxy/addons/serverplayback.py @@ -9,6 +9,7 @@ from mitmproxy import flow from mitmproxy import exceptions from mitmproxy import io from mitmproxy import command +import mitmproxy.types class ServerPlayback: @@ -31,7 +32,7 @@ class ServerPlayback: ctx.master.addons.trigger("update", []) @command.command("replay.server.file") - def load_file(self, path: command.Path) -> None: + def load_file(self, path: mitmproxy.types.Path) -> None: try: flows = io.read_flows_from_paths([path]) except exceptions.FlowReadException as e: diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py index e45f2baf..3a15fd3e 100644 --- a/mitmproxy/addons/view.py +++ b/mitmproxy/addons/view.py @@ -351,7 +351,7 @@ class View(collections.Sequence): ctx.master.addons.trigger("update", updated) @command.command("view.load") - def load_file(self, path: command.Path) -> None: + def load_file(self, path: mitmproxy.types.Path) -> None: """ Load flows into the view, without processing them with addons. """ diff --git a/mitmproxy/command.py b/mitmproxy/command.py index c86d9792..e1e56d3a 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -10,9 +10,16 @@ import textwrap import functools import sys -from mitmproxy.utils import typecheck from mitmproxy import exceptions -from mitmproxy import flow +import mitmproxy.types + + +def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None: + sig = inspect.signature(f) + try: + sig.bind(*args, **kwargs) + except TypeError as v: + raise exceptions.CommandError("command argument mismatch: %s" % v.args[0]) def lexer(s): @@ -24,113 +31,14 @@ def lexer(s): return lex -# This is an awkward location for these values, but it's better than having -# the console core import and depend on an addon. FIXME: Add a way for -# addons to add custom types and manage their completion and validation. -valid_flow_prefixes = [ - "@all", - "@focus", - "@shown", - "@hidden", - "@marked", - "@unmarked", - "~q", - "~s", - "~a", - "~hq", - "~hs", - "~b", - "~bq", - "~bs", - "~t", - "~d", - "~m", - "~u", - "~c", -] - - -Cuts = typing.Sequence[ - typing.Sequence[typing.Union[str, bytes]] -] - - -class Cut(str): - # This is an awkward location for these values, but it's better than having - # the console core import and depend on an addon. FIXME: Add a way for - # addons to add custom types and manage their completion and validation. - valid_prefixes = [ - "request.method", - "request.scheme", - "request.host", - "request.http_version", - "request.port", - "request.path", - "request.url", - "request.text", - "request.content", - "request.raw_content", - "request.timestamp_start", - "request.timestamp_end", - "request.header[", - - "response.status_code", - "response.reason", - "response.text", - "response.content", - "response.timestamp_start", - "response.timestamp_end", - "response.raw_content", - "response.header[", - - "client_conn.address.port", - "client_conn.address.host", - "client_conn.tls_version", - "client_conn.sni", - "client_conn.ssl_established", - - "server_conn.address.port", - "server_conn.address.host", - "server_conn.ip_address.host", - "server_conn.tls_version", - "server_conn.sni", - "server_conn.ssl_established", - ] - - -class Path(str): - pass - - -class Cmd(str): - pass - - -class Arg(str): - pass - - def typename(t: type) -> str: """ - Translates a type to an explanatory string. If ret is True, we're - looking at a return type, else we're looking at a parameter type. + Translates a type to an explanatory string. """ - if isinstance(t, Choice): - return "choice" - elif t == typing.Sequence[flow.Flow]: - return "[flow]" - elif t == typing.Sequence[str]: - return "[str]" - elif t == typing.Sequence[Cut]: - return "[cut]" - elif t == Cuts: - return "[cuts]" - elif t == flow.Flow: - return "flow" - elif issubclass(t, (str, int, bool)): - return t.__name__.lower() - else: # pragma: no cover + to = mitmproxy.types.CommandTypes.get(t, None) + if not to: raise NotImplementedError(t) + return to.display class Command: @@ -168,13 +76,12 @@ class Command: ret = " -> " + ret return "%s %s%s" % (self.path, params, ret) - def call(self, args: typing.Sequence[str]): + def call(self, args: typing.Sequence[str]) -> typing.Any: """ Call the command with a list of arguments. At this point, all arguments are strings. """ - if not self.has_positional and (len(self.paramtypes) != len(args)): - raise exceptions.CommandError("Usage: %s" % self.signature_help()) + verify_arg_signature(self.func, list(args), {}) remainder = [] # type: typing.Sequence[str] if self.has_positional: @@ -183,37 +90,35 @@ class Command: pargs = [] for arg, paramtype in zip(args, self.paramtypes): - if typecheck.check_command_type(arg, paramtype): - pargs.append(arg) - else: - pargs.append(parsearg(self.manager, arg, paramtype)) - - if remainder: - chk = typecheck.check_command_type( - remainder, - typing.Sequence[self.paramtypes[-1]] # type: ignore - ) - if chk: - pargs.extend(remainder) - else: - raise exceptions.CommandError("Invalid value type: %s - expected %s" % (remainder, self.paramtypes[-1])) + pargs.append(parsearg(self.manager, arg, paramtype)) + pargs.extend(remainder) with self.manager.master.handlecontext(): ret = self.func(*pargs) - if not typecheck.check_command_type(ret, self.returntype): - raise exceptions.CommandError("Command returned unexpected data") - + if ret is None and self.returntype is None: + return + typ = mitmproxy.types.CommandTypes.get(self.returntype) + if not typ.is_valid(self.manager, typ, ret): + raise exceptions.CommandError( + "%s returned unexpected data - expected %s" % ( + self.path, typ.display + ) + ) return ret ParseResult = typing.NamedTuple( "ParseResult", - [("value", str), ("type", typing.Type)], + [ + ("value", str), + ("type", typing.Type), + ("valid", bool), + ], ) -class CommandManager: +class CommandManager(mitmproxy.types._CommandBase): def __init__(self, master): self.master = master self.commands = {} @@ -228,9 +133,12 @@ class CommandManager: def add(self, path: str, func: typing.Callable): self.commands[path] = Command(self, path, func) - def parse_partial(self, cmdstr: str) -> typing.Sequence[ParseResult]: + def parse_partial( + self, + cmdstr: str + ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[str]]: """ - Parse a possibly partial command. Return a sequence of (part, type) tuples. + Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items. """ buf = io.StringIO(cmdstr) parts = [] # type: typing.List[str] @@ -255,21 +163,43 @@ class CommandManager: typ = None # type: typing.Type for i in range(len(parts)): if i == 0: - typ = Cmd + typ = mitmproxy.types.Cmd if parts[i] in self.commands: params.extend(self.commands[parts[i]].paramtypes) elif params: typ = params.pop(0) - # FIXME: Do we need to check that Arg is positional? - if typ == Cmd and params and params[0] == Arg: + if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg: if parts[i] in self.commands: params[:] = self.commands[parts[i]].paramtypes else: - typ = str - parse.append(ParseResult(value=parts[i], type=typ)) - return parse + typ = mitmproxy.types.Unknown + + to = mitmproxy.types.CommandTypes.get(typ, None) + valid = False + if to: + try: + to.parse(self, typ, parts[i]) + except exceptions.TypeError: + valid = False + else: + valid = True + + parse.append( + ParseResult( + value=parts[i], + type=typ, + valid=valid, + ) + ) - def call_args(self, path, args): + remhelp = [] # type: typing.List[str] + for x in params: + remt = mitmproxy.types.CommandTypes.get(x, None) + remhelp.append(remt.display) + + return parse, remhelp + + def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any: """ Call a command using a list of string arguments. May raise CommandError. """ @@ -300,53 +230,13 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any: """ Convert a string to a argument to the appropriate type. """ - if isinstance(argtype, Choice): - cmd = argtype.options_command - opts = manager.call(cmd) - if spec not in opts: - raise exceptions.CommandError( - "Invalid choice: see %s for options" % cmd - ) - return spec - elif issubclass(argtype, str): - return spec - elif argtype == bool: - if spec == "true": - return True - elif spec == "false": - return False - else: - raise exceptions.CommandError( - "Booleans are 'true' or 'false', got %s" % spec - ) - elif issubclass(argtype, int): - try: - return int(spec) - except ValueError as e: - raise exceptions.CommandError("Expected an integer, got %s." % spec) - elif argtype == typing.Sequence[flow.Flow]: - return manager.call_args("view.resolve", [spec]) - elif argtype == Cuts: - return manager.call_args("cut", [spec]) - elif argtype == flow.Flow: - flows = manager.call_args("view.resolve", [spec]) - if len(flows) != 1: - raise exceptions.CommandError( - "Command requires one flow, specification matched %s." % len(flows) - ) - return flows[0] - elif argtype in (typing.Sequence[str], typing.Sequence[Cut]): - return [i.strip() for i in spec.split(",")] - else: + t = mitmproxy.types.CommandTypes.get(argtype, None) + if not t: raise exceptions.CommandError("Unsupported argument type: %s" % argtype) - - -def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None: - sig = inspect.signature(f) try: - sig.bind(*args, **kwargs) - except TypeError as v: - raise exceptions.CommandError("Argument mismatch: %s" % v.args[0]) + return t.parse(manager, argtype, spec) # type: ignore + except exceptions.TypeError as e: + raise exceptions.CommandError from e def command(path): @@ -360,21 +250,11 @@ def command(path): return decorator -class Choice: - def __init__(self, options_command): - self.options_command = options_command - - def __instancecheck__(self, instance): - # return false here so that arguments are piped through parsearg, - # which does extended validation. - return False - - def argument(name, type): """ - Set the type of a command argument at runtime. - This is useful for more specific types such as command.Choice, which we cannot annotate - directly as mypy does not like that. + Set the type of a command argument at runtime. This is useful for more + specific types such as mitmproxy.types.Choice, which we cannot annotate + directly as mypy does not like that. """ def decorator(f: types.FunctionType) -> types.FunctionType: assert name in f.__annotations__ diff --git a/mitmproxy/contrib/wsproto/__init__.py b/mitmproxy/contrib/wsproto/__init__.py new file mode 100644 index 00000000..d0592bc5 --- /dev/null +++ b/mitmproxy/contrib/wsproto/__init__.py @@ -0,0 +1,13 @@ +from . import compat +from . import connection +from . import events +from . import extensions +from . import frame_protocol + +__all__ = [ + 'compat', + 'connection', + 'events', + 'extensions', + 'frame_protocol', +] diff --git a/mitmproxy/contrib/wsproto/extensions.py b/mitmproxy/contrib/wsproto/extensions.py index f7cf4fb6..0e0d2018 100644 --- a/mitmproxy/contrib/wsproto/extensions.py +++ b/mitmproxy/contrib/wsproto/extensions.py @@ -1,3 +1,5 @@ +# type: ignore + # -*- coding: utf-8 -*- """ wsproto/extensions diff --git a/mitmproxy/contrib/wsproto/frame_protocol.py b/mitmproxy/contrib/wsproto/frame_protocol.py index b95dceec..30f146c6 100644 --- a/mitmproxy/contrib/wsproto/frame_protocol.py +++ b/mitmproxy/contrib/wsproto/frame_protocol.py @@ -1,3 +1,5 @@ +# type: ignore + # -*- coding: utf-8 -*- """ wsproto/frame_protocol diff --git a/mitmproxy/exceptions.py b/mitmproxy/exceptions.py index 71517480..d568898b 100644 --- a/mitmproxy/exceptions.py +++ b/mitmproxy/exceptions.py @@ -112,6 +112,10 @@ class AddonHalt(MitmproxyException): pass +class TypeError(MitmproxyException): + pass + + """ Net-layer exceptions """ diff --git a/mitmproxy/flowfilter.py b/mitmproxy/flowfilter.py index 23e47e2b..d1fd8299 100644 --- a/mitmproxy/flowfilter.py +++ b/mitmproxy/flowfilter.py @@ -322,8 +322,10 @@ class FDomain(_Rex): flags = re.IGNORECASE is_binary = False - @only(http.HTTPFlow) + @only(http.HTTPFlow, websocket.WebSocketFlow) def __call__(self, f): + if isinstance(f, websocket.WebSocketFlow): + f = f.handshake_flow return bool( self.re.search(f.request.host) or self.re.search(f.request.pretty_host) @@ -342,9 +344,11 @@ class FUrl(_Rex): toks = toks[1:] return klass(*toks) - @only(http.HTTPFlow) + @only(http.HTTPFlow, websocket.WebSocketFlow) def __call__(self, f): - if not f.request: + if isinstance(f, websocket.WebSocketFlow): + f = f.handshake_flow + if not f or not f.request: return False return self.re.search(f.request.pretty_url) diff --git a/mitmproxy/master.py b/mitmproxy/master.py index 5997ff6d..de3b24e1 100644 --- a/mitmproxy/master.py +++ b/mitmproxy/master.py @@ -9,6 +9,7 @@ from mitmproxy import eventsequence from mitmproxy import exceptions from mitmproxy import command from mitmproxy import http +from mitmproxy import websocket from mitmproxy import log from mitmproxy.net import server_spec from mitmproxy.proxy.protocol import http_replay @@ -41,6 +42,7 @@ class Master: self.should_exit = threading.Event() self._server = None self.first_tick = True + self.waiting_flows = [] @property def server(self): @@ -117,15 +119,33 @@ class Master: self.should_exit.set() self.addons.trigger("done") + def _change_reverse_host(self, f): + """ + When we load flows in reverse proxy mode, we adjust the target host to + the reverse proxy destination for all flows we load. This makes it very + easy to replay saved flows against a different host. + """ + if self.options.mode.startswith("reverse:"): + _, upstream_spec = server_spec.parse_with_mode(self.options.mode) + f.request.host, f.request.port = upstream_spec.address + f.request.scheme = upstream_spec.scheme + def load_flow(self, f): """ - Loads a flow + Loads a flow and links websocket & handshake flows """ + if isinstance(f, http.HTTPFlow): - if self.options.mode.startswith("reverse:"): - _, upstream_spec = server_spec.parse_with_mode(self.options.mode) - f.request.host, f.request.port = upstream_spec.address - f.request.scheme = upstream_spec.scheme + self._change_reverse_host(f) + if 'websocket' in f.metadata: + self.waiting_flows.append(f) + + if isinstance(f, websocket.WebSocketFlow): + hf = [hf for hf in self.waiting_flows if hf.id == f.metadata['websocket_handshake']][0] + f.handshake_flow = hf + self.waiting_flows.remove(hf) + self._change_reverse_host(f.handshake_flow) + f.reply = controller.DummyReply() for e, o in eventsequence.iterate(f): self.addons.handle_lifecycle(e, o) diff --git a/mitmproxy/proxy/protocol/http.py b/mitmproxy/proxy/protocol/http.py index 57ac0f16..076ffa62 100644 --- a/mitmproxy/proxy/protocol/http.py +++ b/mitmproxy/proxy/protocol/http.py @@ -321,6 +321,7 @@ class HttpLayer(base.Layer): try: if websockets.check_handshake(request.headers) and websockets.check_client_version(request.headers): + f.metadata['websocket'] = True # We only support RFC6455 with WebSocket version 13 # allow inline scripts to manipulate the client handshake self.channel.ask("websocket_handshake", f) diff --git a/mitmproxy/proxy/protocol/websocket.py b/mitmproxy/proxy/protocol/websocket.py index 34dcba06..1bd5284d 100644 --- a/mitmproxy/proxy/protocol/websocket.py +++ b/mitmproxy/proxy/protocol/websocket.py @@ -1,10 +1,11 @@ import socket from OpenSSL import SSL + +from mitmproxy.contrib import wsproto from mitmproxy.contrib.wsproto import events from mitmproxy.contrib.wsproto.connection import ConnectionType, WSConnection from mitmproxy.contrib.wsproto.extensions import PerMessageDeflate -from mitmproxy.contrib.wsproto.frame_protocol import Opcode from mitmproxy import exceptions from mitmproxy import flow @@ -93,11 +94,14 @@ class WebSocketLayer(base.Layer): if event.message_finished: original_chunk_sizes = [len(f) for f in fb] - message_type = Opcode.TEXT if isinstance(event, events.TextReceived) else Opcode.BINARY - if message_type == Opcode.TEXT: + + if isinstance(event, events.TextReceived): + message_type = wsproto.frame_protocol.Opcode.TEXT payload = ''.join(fb) else: + message_type = wsproto.frame_protocol.Opcode.BINARY payload = b''.join(fb) + fb.clear() websocket_message = WebSocketMessage(message_type, not is_server, payload) diff --git a/mitmproxy/test/tflow.py b/mitmproxy/test/tflow.py index c3dab30c..91747866 100644 --- a/mitmproxy/test/tflow.py +++ b/mitmproxy/test/tflow.py @@ -44,7 +44,7 @@ def twebsocketflow(client_conn=True, server_conn=True, messages=True, err=None, "GET", "http", "example.com", - "80", + 80, "/ws", "HTTP/1.1", headers=net_http.Headers( @@ -75,7 +75,9 @@ def twebsocketflow(client_conn=True, server_conn=True, messages=True, err=None, handshake_flow.response = resp f = websocket.WebSocketFlow(client_conn, server_conn, handshake_flow) - handshake_flow.metadata['websocket_flow'] = f + f.metadata['websocket_handshake'] = handshake_flow.id + handshake_flow.metadata['websocket_flow'] = f.id + handshake_flow.metadata['websocket'] = True if messages is True: messages = [ diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index ef32b953..30e8b13b 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -1,6 +1,4 @@ import abc -import glob -import os import typing import urwid @@ -9,6 +7,7 @@ from urwid.text_layout import calc_coords import mitmproxy.flow import mitmproxy.master import mitmproxy.command +import mitmproxy.types class Completer: # pragma: no cover @@ -39,30 +38,6 @@ class ListCompleter(Completer): return ret -# Generates the completion options for a specific starting input -def pathOptions(start: str) -> typing.Sequence[str]: - if not start: - start = "./" - path = os.path.expanduser(start) - ret = [] - if os.path.isdir(path): - files = glob.glob(os.path.join(path, "*")) - prefix = start - else: - files = glob.glob(path + "*") - prefix = os.path.dirname(start) - prefix = prefix or "./" - for f in files: - display = os.path.join(prefix, os.path.normpath(os.path.basename(f))) - if os.path.isdir(f): - display += "/" - ret.append(display) - if not ret: - ret = [start] - ret.sort() - return ret - - CompletionState = typing.NamedTuple( "CompletionState", [ @@ -75,9 +50,9 @@ CompletionState = typing.NamedTuple( class CommandBuffer(): def __init__(self, master: mitmproxy.master.Master, start: str = "") -> None: self.master = master - self.buf = start + self.text = self.flatten(start) # Cursor is always within the range [0:len(buffer)]. - self._cursor = len(self.buf) + self._cursor = len(self.text) self.completion = None # type: CompletionState @property @@ -88,13 +63,40 @@ class CommandBuffer(): def cursor(self, x) -> None: if x < 0: self._cursor = 0 - elif x > len(self.buf): - self._cursor = len(self.buf) + elif x > len(self.text): + self._cursor = len(self.text) else: self._cursor = x def render(self): - return self.buf + """ + This function is somewhat tricky - in order to make the cursor + position valid, we have to make sure there is a + character-for-character offset match in the rendered output, up + to the cursor. Beyond that, we can add stuff. + """ + parts, remhelp = self.master.commands.parse_partial(self.text) + ret = [] + for p in parts: + if p.valid: + if p.type == mitmproxy.types.Cmd: + ret.append(("commander_command", p.value)) + else: + ret.append(("text", p.value)) + elif p.value: + ret.append(("commander_invalid", p.value)) + else: + ret.append(("text", "")) + ret.append(("text", " ")) + if remhelp: + ret.append(("text", " ")) + for v in remhelp: + ret.append(("commander_hint", "%s " % v)) + return ret + + def flatten(self, txt): + parts, _ = self.master.commands.parse_partial(txt) + return " ".join([x.value for x in parts]) def left(self) -> None: self.cursor = self.cursor - 1 @@ -104,50 +106,14 @@ class CommandBuffer(): def cycle_completion(self) -> None: if not self.completion: - parts = self.master.commands.parse_partial(self.buf[:self.cursor]) + parts, remainhelp = self.master.commands.parse_partial(self.text[:self.cursor]) last = parts[-1] - if last.type == mitmproxy.command.Cmd: - self.completion = CompletionState( - completer = ListCompleter( - parts[-1].value, - self.master.commands.commands.keys(), - ), - parse = parts, - ) - if last.type == typing.Sequence[mitmproxy.command.Cut]: - spec = parts[-1].value.split(",") - opts = [] - for pref in mitmproxy.command.Cut.valid_prefixes: - spec[-1] = pref - opts.append(",".join(spec)) - self.completion = CompletionState( - completer = ListCompleter( - parts[-1].value, - opts, - ), - parse = parts, - ) - elif isinstance(last.type, mitmproxy.command.Choice): + ct = mitmproxy.types.CommandTypes.get(last.type, None) + if ct: self.completion = CompletionState( completer = ListCompleter( parts[-1].value, - self.master.commands.call(last.type.options_command), - ), - parse = parts, - ) - elif last.type == mitmproxy.command.Path: - self.completion = CompletionState( - completer = ListCompleter( - "", - pathOptions(parts[1].value) - ), - parse = parts, - ) - elif last.type in (typing.Sequence[mitmproxy.flow.Flow], mitmproxy.flow.Flow): - self.completion = CompletionState( - completer = ListCompleter( - "", - mitmproxy.command.valid_flow_prefixes, + ct.completion(self.master.commands, last.type, parts[-1].value) ), parse = parts, ) @@ -155,13 +121,13 @@ class CommandBuffer(): nxt = self.completion.completer.cycle() buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt buf = buf.strip() - self.buf = buf - self.cursor = len(self.buf) + self.text = self.flatten(buf) + self.cursor = len(self.text) def backspace(self) -> None: if self.cursor == 0: return - self.buf = self.buf[:self.cursor - 1] + self.buf[self.cursor:] + self.text = self.flatten(self.text[:self.cursor - 1] + self.text[self.cursor:]) self.cursor = self.cursor - 1 self.completion = None @@ -169,7 +135,7 @@ class CommandBuffer(): """ Inserts text at the cursor. """ - self.buf = self.buf = self.buf[:self.cursor] + k + self.buf[self.cursor:] + self.text = self.flatten(self.text[:self.cursor] + k + self.text[self.cursor:]) self.cursor += 1 self.completion = None @@ -213,4 +179,4 @@ class CommandEdit(urwid.WidgetWrap): return x, y def get_value(self): - return self.cbuf.buf + return self.cbuf.text diff --git a/mitmproxy/tools/console/commandexecutor.py b/mitmproxy/tools/console/commandexecutor.py index 829daee1..26f92238 100644 --- a/mitmproxy/tools/console/commandexecutor.py +++ b/mitmproxy/tools/console/commandexecutor.py @@ -23,6 +23,10 @@ class CommandExecutor: signals.status_message.send( message="Command returned %s flows" % len(ret) ) + elif type(ret) == flow.Flow: + signals.status_message.send( + message="Command returned 1 flow" + ) else: self.master.overlay( overlay.DataViewerOverlay( diff --git a/mitmproxy/tools/console/commands.py b/mitmproxy/tools/console/commands.py index 20efcee3..1183ee9d 100644 --- a/mitmproxy/tools/console/commands.py +++ b/mitmproxy/tools/console/commands.py @@ -124,7 +124,7 @@ class CommandHelp(urwid.Frame): class Commands(urwid.Pile, layoutwidget.LayoutWidget): - title = "Commands" + title = "Command Reference" keyctx = "commands" def __init__(self, master): diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py index 453e9e1c..20d54bc6 100644 --- a/mitmproxy/tools/console/consoleaddons.py +++ b/mitmproxy/tools/console/consoleaddons.py @@ -7,6 +7,8 @@ from mitmproxy import exceptions from mitmproxy import flow from mitmproxy import contentviews from mitmproxy.utils import strutils +import mitmproxy.types + from mitmproxy.tools.console import overlay from mitmproxy.tools.console import signals @@ -102,7 +104,7 @@ class ConsoleAddon: @command.command("console.layout.options") def layout_options(self) -> typing.Sequence[str]: """ - Returns the available options for the consoler_layout option. + Returns the available options for the console_layout option. """ return ["single", "vertical", "horizontal"] @@ -218,8 +220,8 @@ class ConsoleAddon: self, prompt: str, choices: typing.Sequence[str], - cmd: command.Cmd, - *args: command.Arg + cmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.Arg ) -> None: """ Prompt the user to choose from a specified list of strings, then @@ -241,7 +243,11 @@ class ConsoleAddon: @command.command("console.choose.cmd") def console_choose_cmd( - self, prompt: str, choicecmd: command.Cmd, *cmd: command.Arg + self, + prompt: str, + choicecmd: mitmproxy.types.Cmd, + subcmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.Arg ) -> None: """ Prompt the user to choose from a list of strings returned by a @@ -252,10 +258,10 @@ class ConsoleAddon: def callback(opt): # We're now outside of the call context... - repl = " ".join(cmd) + repl = " ".join(args) repl = repl.replace("{choice}", opt) try: - self.master.commands.call(repl) + self.master.commands.call(subcmd + " " + repl) except exceptions.CommandError as e: signals.status_message.send(message=str(e)) @@ -316,6 +322,7 @@ class ConsoleAddon: signals.pop_view_state.send(self) @command.command("console.bodyview") + @command.argument("part", type=mitmproxy.types.Choice("console.bodyview.options")) def bodyview(self, f: flow.Flow, part: str) -> None: """ Spawn an external viewer for a flow request or response body based @@ -332,6 +339,13 @@ class ConsoleAddon: raise exceptions.CommandError("No content to view.") self.master.spawn_external_viewer(content, t) + @command.command("console.bodyview.options") + def bodyview_options(self) -> typing.Sequence[str]: + """ + Possible parts for console.bodyview. + """ + return ["request", "response"] + @command.command("console.edit.focus.options") def edit_focus_options(self) -> typing.Sequence[str]: """ @@ -346,17 +360,24 @@ class ConsoleAddon: "reason", "request-headers", "response-headers", + "request-body", + "response-body", "status_code", "set-cookies", "url", ] @command.command("console.edit.focus") - @command.argument("part", type=command.Choice("console.edit.focus.options")) + @command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options")) def edit_focus(self, part: str) -> None: """ Edit a component of the currently focused flow. """ + flow = self.master.view.focus.flow + # This shouldn't be necessary once this command is "console.edit @focus", + # but for now it is. + if not flow: + raise exceptions.CommandError("No flow selected.") if part == "cookies": self.master.switch_view("edit_focus_cookies") elif part == "form": @@ -369,6 +390,21 @@ class ConsoleAddon: self.master.switch_view("edit_focus_request_headers") elif part == "response-headers": self.master.switch_view("edit_focus_response_headers") + elif part in ("request-body", "response-body"): + if part == "request-body": + message = flow.request + else: + message = flow.response + if not message: + raise exceptions.CommandError("Flow has no {}.".format(part.split("-")[0])) + c = self.master.spawn_editor(message.get_content(strict=False) or b"") + # Fix an issue caused by some editors when editing a + # request/response body. Many editors make it hard to save a + # file without a terminating newline on the last line. When + # editing message bodies, this can cause problems. For now, I + # just strip the newlines off the end of the body when we return + # from an editor. + message.content = c.rstrip(b"\n") elif part == "set-cookies": self.master.switch_view("edit_focus_setcookies") elif part in ["url", "method", "status_code", "reason"]: @@ -404,14 +440,14 @@ class ConsoleAddon: self._grideditor().cmd_delete() @command.command("console.grideditor.load") - def grideditor_load(self, path: command.Path) -> None: + def grideditor_load(self, path: mitmproxy.types.Path) -> None: """ Read a file into the currrent cell. """ self._grideditor().cmd_read_file(path) @command.command("console.grideditor.load_escaped") - def grideditor_load_escaped(self, path: command.Path) -> None: + def grideditor_load_escaped(self, path: mitmproxy.types.Path) -> None: """ Read a file containing a Python-style escaped string into the currrent cell. @@ -419,7 +455,7 @@ class ConsoleAddon: self._grideditor().cmd_read_file_escaped(path) @command.command("console.grideditor.save") - def grideditor_save(self, path: command.Path) -> None: + def grideditor_save(self, path: mitmproxy.types.Path) -> None: """ Save data to file as a CSV. """ @@ -440,7 +476,7 @@ class ConsoleAddon: self._grideditor().cmd_spawn_editor() @command.command("console.flowview.mode.set") - @command.argument("mode", type=command.Choice("console.flowview.mode.options")) + @command.argument("mode", type=mitmproxy.types.Choice("console.flowview.mode.options")) def flowview_mode_set(self, mode: str) -> None: """ Set the display mode for the current flow view. @@ -498,8 +534,8 @@ class ConsoleAddon: self, contexts: typing.Sequence[str], key: str, - cmd: command.Cmd, - *args: command.Arg + cmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.Arg ) -> None: """ Bind a shortcut key. diff --git a/mitmproxy/tools/console/defaultkeys.py b/mitmproxy/tools/console/defaultkeys.py index 0fdec10c..f8a3df2d 100644 --- a/mitmproxy/tools/console/defaultkeys.py +++ b/mitmproxy/tools/console/defaultkeys.py @@ -41,7 +41,7 @@ def map(km): "e", """ console.choose.cmd Format export.formats - console.command export.file {choice} @focus '' + console.command export.file {choice} @focus """, ["flowlist", "flowview"], "Export this flow to file" diff --git a/mitmproxy/tools/console/keymap.py b/mitmproxy/tools/console/keymap.py index b268906c..fbb569a4 100644 --- a/mitmproxy/tools/console/keymap.py +++ b/mitmproxy/tools/console/keymap.py @@ -17,6 +17,13 @@ Contexts = { } +navkeys = [ + "m_start", "m_end", "m_next", "m_select", + "up", "down", "page_up", "page_down", + "left", "right" +] + + class Binding: def __init__(self, key, command, contexts, help): self.key, self.command, self.contexts = key, command, sorted(contexts) @@ -122,3 +129,13 @@ class Keymap: if b: return self.executor(b.command) return key + + def handle_only(self, context: str, key: str) -> typing.Optional[str]: + """ + Like handle, but ignores global bindings. Returns the key if it has + not been handled, or None. + """ + b = self.get(context, key) + if b: + return self.executor(b.command) + return key diff --git a/mitmproxy/tools/console/options.py b/mitmproxy/tools/console/options.py index 4d55aeec..54772cf0 100644 --- a/mitmproxy/tools/console/options.py +++ b/mitmproxy/tools/console/options.py @@ -117,6 +117,7 @@ class OptionListWalker(urwid.ListWalker): def stop_editing(self): self.editing = False self.focus_obj = self._get(self.index, False) + self.set_focus(self.index) self._modified() def get_edit_text(self): diff --git a/mitmproxy/tools/console/overlay.py b/mitmproxy/tools/console/overlay.py index f97f23f9..55acbfdd 100644 --- a/mitmproxy/tools/console/overlay.py +++ b/mitmproxy/tools/console/overlay.py @@ -5,6 +5,7 @@ import urwid from mitmproxy.tools.console import signals from mitmproxy.tools.console import grideditor from mitmproxy.tools.console import layoutwidget +from mitmproxy.tools.console import keymap class SimpleOverlay(urwid.Overlay, layoutwidget.LayoutWidget): @@ -114,13 +115,21 @@ class Chooser(urwid.WidgetWrap, layoutwidget.LayoutWidget): return True def keypress(self, size, key): - key = self.master.keymap.handle("chooser", key) + key = self.master.keymap.handle_only("chooser", key) if key == "m_select": self.callback(self.choices[self.walker.index]) signals.pop_view_state.send(self) + return elif key == "esc": signals.pop_view_state.send(self) - return super().keypress(size, key) + return + + binding = self.master.keymap.get("global", key) + # This is extremely awkward. We need a better way to match nav keys only. + if binding and binding.command.startswith("console.nav"): + self.master.keymap.handle("global", key) + elif key in keymap.navkeys: + return super().keypress(size, key) class OptionsOverlay(urwid.WidgetWrap, layoutwidget.LayoutWidget): diff --git a/mitmproxy/tools/console/palettes.py b/mitmproxy/tools/console/palettes.py index 7fbdcfd8..465fd574 100644 --- a/mitmproxy/tools/console/palettes.py +++ b/mitmproxy/tools/console/palettes.py @@ -32,6 +32,9 @@ class Palette: # Grid Editor 'focusfield', 'focusfield_error', 'field_error', 'editfield', + + # Commander + 'commander_command', 'commander_invalid', 'commander_hint' ] high = None # type: typing.Mapping[str, typing.Sequence[str]] @@ -117,6 +120,11 @@ class LowDark(Palette): focusfield_error = ('dark red', 'light gray'), field_error = ('dark red', 'default'), editfield = ('white', 'default'), + + + commander_command = ('white,bold', 'default'), + commander_invalid = ('light red', 'default'), + commander_hint = ('dark gray', 'default'), ) @@ -183,6 +191,10 @@ class LowLight(Palette): focusfield_error = ('dark red', 'light gray'), field_error = ('dark red', 'black'), editfield = ('black', 'default'), + + commander_command = ('dark magenta', 'default'), + commander_invalid = ('light red', 'default'), + commander_hint = ('light gray', 'default'), ) @@ -267,6 +279,10 @@ class SolarizedLight(LowLight): focusfield_error = (sol_red, sol_base2), field_error = (sol_red, 'default'), editfield = (sol_base01, 'default'), + + commander_command = (sol_cyan, 'default'), + commander_invalid = (sol_orange, 'default'), + commander_hint = (sol_base1, 'default'), ) @@ -317,6 +333,10 @@ class SolarizedDark(LowDark): focusfield_error = (sol_red, sol_base02), field_error = (sol_red, 'default'), editfield = (sol_base1, 'default'), + + commander_command = (sol_blue, 'default'), + commander_invalid = (sol_orange, 'default'), + commander_hint = (sol_base00, 'default'), ) diff --git a/mitmproxy/types.py b/mitmproxy/types.py new file mode 100644 index 00000000..8ae8b309 --- /dev/null +++ b/mitmproxy/types.py @@ -0,0 +1,445 @@ +import os +import glob +import typing + +from mitmproxy import exceptions +from mitmproxy import flow + + +class Path(str): + pass + + +class Cmd(str): + pass + + +class Arg(str): + pass + + +class Unknown(str): + pass + + +class CutSpec(typing.Sequence[str]): + pass + + +class Data(typing.Sequence[typing.Sequence[typing.Union[str, bytes]]]): + pass + + +class Choice: + def __init__(self, options_command): + self.options_command = options_command + + def __instancecheck__(self, instance): # pragma: no cover + # return false here so that arguments are piped through parsearg, + # which does extended validation. + return False + + +# One of the many charming things about mypy is that introducing type +# annotations can cause circular dependencies where there were none before. +# Rather than putting types and the CommandManger in the same file, we introduce +# a stub type with the signature we use. +class _CommandBase: + commands = {} # type: typing.MutableMapping[str, typing.Any] + + def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any: + raise NotImplementedError + + def call(self, cmd: str) -> typing.Any: + raise NotImplementedError + + +class _BaseType: + typ = object # type: typing.Type + display = "" # type: str + + def completion( + self, manager: _CommandBase, t: typing.Any, s: str + ) -> typing.Sequence[str]: + """ + Returns a list of completion strings for a given prefix. The strings + returned don't necessarily need to be suffixes of the prefix, since + completers will do prefix filtering themselves.. + """ + raise NotImplementedError + + def parse( + self, manager: _CommandBase, typ: typing.Any, s: str + ) -> typing.Any: + """ + Parse a string, given the specific type instance (to allow rich type annotations like Choice) and a string. + + Raises exceptions.TypeError if the value is invalid. + """ + raise NotImplementedError + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + """ + Check if data is valid for this type. + """ + raise NotImplementedError + + +class _BoolType(_BaseType): + typ = bool + display = "bool" + + def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + return ["false", "true"] + + def parse(self, manager: _CommandBase, t: type, s: str) -> bool: + if s == "true": + return True + elif s == "false": + return False + else: + raise exceptions.TypeError( + "Booleans are 'true' or 'false', got %s" % s + ) + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + return val in [True, False] + + +class _StrType(_BaseType): + typ = str + display = "str" + + def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandBase, t: type, s: str) -> str: + return s + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + return isinstance(val, str) + + +class _UnknownType(_BaseType): + typ = Unknown + display = "unknown" + + def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandBase, t: type, s: str) -> str: + return s + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + return False + + +class _IntType(_BaseType): + typ = int + display = "int" + + def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandBase, t: type, s: str) -> int: + try: + return int(s) + except ValueError as e: + raise exceptions.TypeError from e + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + return isinstance(val, int) + + +class _PathType(_BaseType): + typ = Path + display = "path" + + def completion(self, manager: _CommandBase, t: type, start: str) -> typing.Sequence[str]: + if not start: + start = "./" + path = os.path.expanduser(start) + ret = [] + if os.path.isdir(path): + files = glob.glob(os.path.join(path, "*")) + prefix = start + else: + files = glob.glob(path + "*") + prefix = os.path.dirname(start) + prefix = prefix or "./" + for f in files: + display = os.path.join(prefix, os.path.normpath(os.path.basename(f))) + if os.path.isdir(f): + display += "/" + ret.append(display) + if not ret: + ret = [start] + ret.sort() + return ret + + def parse(self, manager: _CommandBase, t: type, s: str) -> str: + return s + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + return isinstance(val, str) + + +class _CmdType(_BaseType): + typ = Cmd + display = "cmd" + + def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + return list(manager.commands.keys()) + + def parse(self, manager: _CommandBase, t: type, s: str) -> str: + if s not in manager.commands: + raise exceptions.TypeError("Unknown command: %s" % s) + return s + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + return val in manager.commands + + +class _ArgType(_BaseType): + typ = Arg + display = "arg" + + def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandBase, t: type, s: str) -> str: + return s + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + return isinstance(val, str) + + +class _StrSeqType(_BaseType): + typ = typing.Sequence[str] + display = "[str]" + + def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + return [x.strip() for x in s.split(",")] + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + if isinstance(val, str) or isinstance(val, bytes): + return False + try: + for v in val: + if not isinstance(v, str): + return False + except TypeError: + return False + return True + + +class _CutSpecType(_BaseType): + typ = CutSpec + display = "[cut]" + valid_prefixes = [ + "request.method", + "request.scheme", + "request.host", + "request.http_version", + "request.port", + "request.path", + "request.url", + "request.text", + "request.content", + "request.raw_content", + "request.timestamp_start", + "request.timestamp_end", + "request.header[", + + "response.status_code", + "response.reason", + "response.text", + "response.content", + "response.timestamp_start", + "response.timestamp_end", + "response.raw_content", + "response.header[", + + "client_conn.address.port", + "client_conn.address.host", + "client_conn.tls_version", + "client_conn.sni", + "client_conn.ssl_established", + + "server_conn.address.port", + "server_conn.address.host", + "server_conn.ip_address.host", + "server_conn.tls_version", + "server_conn.sni", + "server_conn.ssl_established", + ] + + def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + spec = s.split(",") + opts = [] + for pref in self.valid_prefixes: + spec[-1] = pref + opts.append(",".join(spec)) + return opts + + def parse(self, manager: _CommandBase, t: type, s: str) -> CutSpec: + parts = s.split(",") # type: typing.Any + return parts + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + if not isinstance(val, str): + return False + parts = [x.strip() for x in val.split(",")] + for p in parts: + for pref in self.valid_prefixes: + if p.startswith(pref): + break + else: + return False + return True + + +class _BaseFlowType(_BaseType): + viewmarkers = [ + "@all", + "@focus", + "@shown", + "@hidden", + "@marked", + "@unmarked", + ] + valid_prefixes = viewmarkers + [ + "~q", + "~s", + "~a", + "~hq", + "~hs", + "~b", + "~bq", + "~bs", + "~t", + "~d", + "~m", + "~u", + "~c", + ] + + def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + return self.valid_prefixes + + +class _FlowType(_BaseFlowType): + typ = flow.Flow + display = "flow" + + def parse(self, manager: _CommandBase, t: type, s: str) -> flow.Flow: + try: + flows = manager.call_args("view.resolve", [s]) + except exceptions.CommandError as e: + raise exceptions.TypeError from e + if len(flows) != 1: + raise exceptions.TypeError( + "Command requires one flow, specification matched %s." % len(flows) + ) + return flows[0] + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + return isinstance(val, flow.Flow) + + +class _FlowsType(_BaseFlowType): + typ = typing.Sequence[flow.Flow] + display = "[flow]" + + def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[flow.Flow]: + try: + return manager.call_args("view.resolve", [s]) + except exceptions.CommandError as e: + raise exceptions.TypeError from e + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + try: + for v in val: + if not isinstance(v, flow.Flow): + return False + except TypeError: + return False + return True + + +class _DataType(_BaseType): + typ = Data + display = "[data]" + + def completion( + self, manager: _CommandBase, t: type, s: str + ) -> typing.Sequence[str]: # pragma: no cover + raise exceptions.TypeError("data cannot be passed as argument") + + def parse( + self, manager: _CommandBase, t: type, s: str + ) -> typing.Any: # pragma: no cover + raise exceptions.TypeError("data cannot be passed as argument") + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + # FIXME: validate that all rows have equal length, and all columns have equal types + try: + for row in val: + for cell in row: + if not (isinstance(cell, str) or isinstance(cell, bytes)): + return False + except TypeError: + return False + return True + + +class _ChoiceType(_BaseType): + typ = Choice + display = "choice" + + def completion(self, manager: _CommandBase, t: Choice, s: str) -> typing.Sequence[str]: + return manager.call(t.options_command) + + def parse(self, manager: _CommandBase, t: Choice, s: str) -> str: + opts = manager.call(t.options_command) + if s not in opts: + raise exceptions.TypeError("Invalid choice.") + return s + + def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + try: + opts = manager.call(typ.options_command) + except exceptions.CommandError: + return False + return val in opts + + +class TypeManager: + def __init__(self, *types): + self.typemap = {} + for t in types: + self.typemap[t.typ] = t() + + def get(self, t: type, default=None) -> _BaseType: + if type(t) in self.typemap: + return self.typemap[type(t)] + return self.typemap.get(t, default) + + +CommandTypes = TypeManager( + _ArgType, + _BoolType, + _ChoiceType, + _CmdType, + _CutSpecType, + _DataType, + _FlowType, + _FlowsType, + _IntType, + _PathType, + _StrType, + _StrSeqType, +) diff --git a/mitmproxy/utils/typecheck.py b/mitmproxy/utils/typecheck.py index 87a0e804..1070fad0 100644 --- a/mitmproxy/utils/typecheck.py +++ b/mitmproxy/utils/typecheck.py @@ -1,41 +1,6 @@ import typing -def check_command_type(value: typing.Any, typeinfo: typing.Any) -> bool: - """ - Check if the provided value is an instance of typeinfo. Returns True if the - types match, False otherwise. This function supports only those types - required for command return values. - """ - typename = str(typeinfo) - if typename.startswith("typing.Sequence"): - try: - T = typeinfo.__args__[0] # type: ignore - except AttributeError: - # Python 3.5.0 - T = typeinfo.__parameters__[0] # type: ignore - if not isinstance(value, (tuple, list)): - return False - for v in value: - if not check_command_type(v, T): - return False - elif typename.startswith("typing.Union"): - try: - types = typeinfo.__args__ # type: ignore - except AttributeError: - # Python 3.5.x - types = typeinfo.__union_params__ # type: ignore - for T in types: - checks = [check_command_type(value, T) for T in types] - if not any(checks): - return False - elif value is None and typeinfo is None: - return True - elif not isinstance(value, typeinfo): - return False - return True - - def check_option_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None: """ Check if the provided value is an instance of typeinfo and raises a diff --git a/mitmproxy/websocket.py b/mitmproxy/websocket.py index 6c1e7000..8efd4117 100644 --- a/mitmproxy/websocket.py +++ b/mitmproxy/websocket.py @@ -1,6 +1,8 @@ import time from typing import List, Optional +from mitmproxy.contrib import wsproto + from mitmproxy import flow from mitmproxy.net import websockets from mitmproxy.coretypes import serializable @@ -11,7 +13,7 @@ class WebSocketMessage(serializable.Serializable): def __init__( self, type: int, from_client: bool, content: bytes, timestamp: Optional[int]=None ) -> None: - self.type = type + self.type = wsproto.frame_protocol.Opcode(type) # type: ignore self.from_client = from_client self.content = content self.timestamp = timestamp or int(time.time()) # type: int @@ -21,13 +23,14 @@ class WebSocketMessage(serializable.Serializable): return cls(*state) def get_state(self): - return self.type, self.from_client, self.content, self.timestamp + return int(self.type), self.from_client, self.content, self.timestamp def set_state(self, state): self.type, self.from_client, self.content, self.timestamp = state + self.type = wsproto.frame_protocol.Opcode(self.type) # replace enum with bare int def __repr__(self): - if self.type == websockets.OPCODE.TEXT: + if self.type == wsproto.frame_protocol.Opcode.TEXT: return "text message: {}".format(repr(self.content)) else: return "binary message: {}".format(strutils.bytes_to_escaped_str(self.content)) @@ -42,7 +45,7 @@ class WebSocketFlow(flow.Flow): super().__init__("websocket", client_conn, server_conn, live) self.messages = [] # type: List[WebSocketMessage] self.close_sender = 'client' - self.close_code = '(status code missing)' + self.close_code = wsproto.frame_protocol.CloseReason.NORMAL_CLOSURE self.close_message = '(message missing)' self.close_reason = 'unknown status code' self.stream = False @@ -69,7 +72,7 @@ class WebSocketFlow(flow.Flow): _stateobject_attributes.update(dict( messages=List[WebSocketMessage], close_sender=str, - close_code=str, + close_code=int, close_message=str, close_reason=str, client_key=str, @@ -83,6 +86,11 @@ class WebSocketFlow(flow.Flow): # dumping the handshake_flow will include the WebSocketFlow too. )) + def get_state(self): + d = super().get_state() + d['close_code'] = int(d['close_code']) # replace enum with bare int + return d + @classmethod def from_state(cls, state): f = cls(None, None, None) @@ -19,6 +19,9 @@ exclude_lines = pragma: no cover raise NotImplementedError() +[mypy-mitmproxy.contrib.*] +ignore_errors = True + [tool:full_coverage] exclude = mitmproxy/proxy/protocol/base.py diff --git a/test/mitmproxy/addons/test_cut.py b/test/mitmproxy/addons/test_cut.py index 0a523fff..71e699db 100644 --- a/test/mitmproxy/addons/test_cut.py +++ b/test/mitmproxy/addons/test_cut.py @@ -135,6 +135,11 @@ def test_cut(): with pytest.raises(exceptions.CommandError): assert c.cut(tflows, ["__dict__"]) == [[""]] + with taddons.context(): + tflows = [tflow.tflow(resp=False)] + assert c.cut(tflows, ["response.reason"]) == [[""]] + assert c.cut(tflows, ["response.header[key]"]) == [[""]] + c = cut.Cut() with taddons.context(): tflows = [tflow.ttcpflow()] diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py index a4e425cd..2dee708f 100644 --- a/test/mitmproxy/addons/test_save.py +++ b/test/mitmproxy/addons/test_save.py @@ -44,6 +44,19 @@ def test_tcp(tmpdir): assert rd(p) +def test_websocket(tmpdir): + sa = save.Save() + with taddons.context() as tctx: + p = str(tmpdir.join("foo")) + tctx.configure(sa, save_stream_file=p) + + f = tflow.twebsocketflow() + sa.websocket_start(f) + sa.websocket_end(f) + tctx.configure(sa, save_stream_file=None) + assert rd(p) + + def test_save_command(tmpdir): sa = save.Save() with taddons.context() as tctx: diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index 50ad3d55..c777192d 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -4,11 +4,10 @@ from mitmproxy import flow from mitmproxy import exceptions from mitmproxy.test import tflow from mitmproxy.test import taddons +import mitmproxy.types import io import pytest -from mitmproxy.utils import typecheck - class TAddon: @command.command("cmd1") @@ -24,8 +23,12 @@ class TAddon: def cmd3(self, foo: int) -> int: return foo + @command.command("cmd4") + def cmd4(self, a: int, b: str, c: mitmproxy.types.Path) -> str: + return "ok" + @command.command("subcommand") - def subcommand(self, cmd: command.Cmd, *args: command.Arg) -> str: + def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.Arg) -> str: return "ok" @command.command("empty") @@ -39,12 +42,16 @@ class TAddon: def choices(self) -> typing.Sequence[str]: return ["one", "two", "three"] - @command.argument("arg", type=command.Choice("choices")) + @command.argument("arg", type=mitmproxy.types.Choice("choices")) def choose(self, arg: str) -> typing.Sequence[str]: return ["one", "two", "three"] @command.command("path") - def path(self, arg: command.Path) -> None: + def path(self, arg: mitmproxy.types.Path) -> None: + pass + + @command.command("flow") + def flow(self, f: flow.Flow, s: str) -> None: pass @@ -79,53 +86,138 @@ class TestCommand: [ "foo bar", [ - command.ParseResult(value = "foo", type = command.Cmd), - command.ParseResult(value = "bar", type = str) + command.ParseResult( + value = "foo", type = mitmproxy.types.Cmd, valid = False + ), + command.ParseResult( + value = "bar", type = mitmproxy.types.Unknown, valid = False + ) ], + [], ], [ - "foo 'bar", + "cmd1 'bar", [ - command.ParseResult(value = "foo", type = command.Cmd), - command.ParseResult(value = "'bar", type = str) - ] + command.ParseResult(value = "cmd1", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value = "'bar", type = str, valid = True) + ], + [], + ], + [ + "a", + [command.ParseResult(value = "a", type = mitmproxy.types.Cmd, valid = False)], + [], + ], + [ + "", + [command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False)], + [] ], - ["a", [command.ParseResult(value = "a", type = command.Cmd)]], - ["", [command.ParseResult(value = "", type = command.Cmd)]], [ "cmd3 1", [ - command.ParseResult(value = "cmd3", type = command.Cmd), - command.ParseResult(value = "1", type = int), - ] + command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value = "1", type = int, valid = True), + ], + [] ], [ "cmd3 ", [ - command.ParseResult(value = "cmd3", type = command.Cmd), - command.ParseResult(value = "", type = int), - ] + command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value = "", type = int, valid = False), + ], + [] ], [ "subcommand ", [ - command.ParseResult(value = "subcommand", type = command.Cmd), - command.ParseResult(value = "", type = command.Cmd), - ] + command.ParseResult( + value = "subcommand", type = mitmproxy.types.Cmd, valid = True, + ), + command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False), + ], + ["arg"], ], [ "subcommand cmd3 ", [ - command.ParseResult(value = "subcommand", type = command.Cmd), - command.ParseResult(value = "cmd3", type = command.Cmd), - command.ParseResult(value = "", type = int), - ] + command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value = "", type = int, valid = False), + ], + [] + ], + [ + "cmd4", + [ + command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), + ], + ["int", "str", "path"] + ], + [ + "cmd4 ", + [ + command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value = "", type = int, valid = False), + ], + ["str", "path"] + ], + [ + "cmd4 1", + [ + command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value = "1", type = int, valid = True), + ], + ["str", "path"] + ], + [ + "cmd4 1", + [ + command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value = "1", type = int, valid = True), + ], + ["str", "path"] + ], + [ + "flow", + [ + command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), + ], + ["flow", "str"] + ], + [ + "flow ", + [ + command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value = "", type = flow.Flow, valid = False), + ], + ["str"] + ], + [ + "flow x", + [ + command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value = "x", type = flow.Flow, valid = False), + ], + ["str"] + ], + [ + "flow x ", + [ + command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value = "x", type = flow.Flow, valid = False), + command.ParseResult(value = "", type = str, valid = True), + ], + [] ], ] with taddons.context() as tctx: tctx.master.addons.add(TAddon()) - for s, expected in tests: - assert tctx.master.commands.parse_partial(s) == expected + for s, expected, expectedremain in tests: + current, remain = tctx.master.commands.parse_partial(s) + assert current == expected + assert expectedremain == remain def test_simple(): @@ -139,7 +231,7 @@ def test_simple(): c.call("nonexistent") with pytest.raises(exceptions.CommandError, match="Invalid"): c.call("") - with pytest.raises(exceptions.CommandError, match="Usage"): + with pytest.raises(exceptions.CommandError, match="argument mismatch"): c.call("one.two too many args") c.add("empty", a.empty) @@ -154,15 +246,15 @@ def test_typename(): assert command.typename(str) == "str" assert command.typename(typing.Sequence[flow.Flow]) == "[flow]" - assert command.typename(command.Cuts) == "[cuts]" - assert command.typename(typing.Sequence[command.Cut]) == "[cut]" + assert command.typename(mitmproxy.types.Data) == "[data]" + assert command.typename(mitmproxy.types.CutSpec) == "[cut]" assert command.typename(flow.Flow) == "flow" assert command.typename(typing.Sequence[str]) == "[str]" - assert command.typename(command.Choice("foo")) == "choice" - assert command.typename(command.Path) == "path" - assert command.typename(command.Cmd) == "cmd" + assert command.typename(mitmproxy.types.Choice("foo")) == "choice" + assert command.typename(mitmproxy.types.Path) == "path" + assert command.typename(mitmproxy.types.Cmd) == "cmd" class DummyConsole: @@ -172,7 +264,7 @@ class DummyConsole: return [tflow.tflow(resp=True)] * n @command.command("cut") - def cut(self, spec: str) -> command.Cuts: + def cut(self, spec: str) -> mitmproxy.types.Data: return [["test"]] @@ -180,55 +272,11 @@ def test_parsearg(): with taddons.context() as tctx: tctx.master.addons.add(DummyConsole()) assert command.parsearg(tctx.master.commands, "foo", str) == "foo" - - assert command.parsearg(tctx.master.commands, "1", int) == 1 + with pytest.raises(exceptions.CommandError, match="Unsupported"): + command.parsearg(tctx.master.commands, "foo", type) with pytest.raises(exceptions.CommandError): command.parsearg(tctx.master.commands, "foo", int) - assert command.parsearg(tctx.master.commands, "true", bool) is True - assert command.parsearg(tctx.master.commands, "false", bool) is False - with pytest.raises(exceptions.CommandError): - command.parsearg(tctx.master.commands, "flobble", bool) - - assert len(command.parsearg( - tctx.master.commands, "2", typing.Sequence[flow.Flow] - )) == 2 - assert command.parsearg(tctx.master.commands, "1", flow.Flow) - with pytest.raises(exceptions.CommandError): - command.parsearg(tctx.master.commands, "2", flow.Flow) - with pytest.raises(exceptions.CommandError): - command.parsearg(tctx.master.commands, "0", flow.Flow) - with pytest.raises(exceptions.CommandError): - command.parsearg(tctx.master.commands, "foo", Exception) - - assert command.parsearg( - tctx.master.commands, "foo", command.Cuts - ) == [["test"]] - - assert command.parsearg( - tctx.master.commands, "foo", typing.Sequence[str] - ) == ["foo"] - assert command.parsearg( - tctx.master.commands, "foo, bar", typing.Sequence[str] - ) == ["foo", "bar"] - - a = TAddon() - tctx.master.commands.add("choices", a.choices) - assert command.parsearg( - tctx.master.commands, "one", command.Choice("choices"), - ) == "one" - with pytest.raises(exceptions.CommandError): - assert command.parsearg( - tctx.master.commands, "invalid", command.Choice("choices"), - ) - - assert command.parsearg( - tctx.master.commands, "foo", command.Path - ) == "foo" - assert command.parsearg( - tctx.master.commands, "foo", command.Cmd - ) == "foo" - class TDec: @command.command("cmd1") @@ -265,12 +313,3 @@ def test_verify_arg_signature(): command.verify_arg_signature(lambda: None, [1, 2], {}) print('hello there') command.verify_arg_signature(lambda a, b: None, [1, 2], {}) - - -def test_choice(): - """ - basic typechecking for choices should fail as we cannot verify if strings are a valid choice - at this point. - """ - c = command.Choice("foo") - assert not typecheck.check_command_type("foo", c) diff --git a/test/mitmproxy/test_flow.py b/test/mitmproxy/test_flow.py index fcc766b5..8cc11a16 100644 --- a/test/mitmproxy/test_flow.py +++ b/test/mitmproxy/test_flow.py @@ -97,7 +97,7 @@ class TestSerialize: class TestFlowMaster: - def test_load_flow_reverse(self): + def test_load_http_flow_reverse(self): s = tservers.TestState() opts = options.Options( mode="reverse:https://use-this-domain" @@ -108,6 +108,20 @@ class TestFlowMaster: fm.load_flow(f) assert s.flows[0].request.host == "use-this-domain" + def test_load_websocket_flow(self): + s = tservers.TestState() + opts = options.Options( + mode="reverse:https://use-this-domain" + ) + fm = master.Master(opts) + fm.addons.add(s) + f = tflow.twebsocketflow() + fm.load_flow(f.handshake_flow) + fm.load_flow(f) + assert s.flows[0].request.host == "use-this-domain" + assert s.flows[1].handshake_flow == f.handshake_flow + assert len(s.flows[1].messages) == len(f.messages) + def test_replay(self): opts = options.Options() fm = master.Master(opts) diff --git a/test/mitmproxy/test_flowfilter.py b/test/mitmproxy/test_flowfilter.py index c411258a..4eb37d81 100644 --- a/test/mitmproxy/test_flowfilter.py +++ b/test/mitmproxy/test_flowfilter.py @@ -420,6 +420,20 @@ class TestMatchingWebSocketFlow: e = self.err() assert self.q("~e", e) + def test_domain(self): + q = self.flow() + assert self.q("~d example.com", q) + assert not self.q("~d none", q) + + def test_url(self): + q = self.flow() + assert self.q("~u example.com", q) + assert self.q("~u example.com/ws", q) + assert not self.q("~u moo/path", q) + + q.handshake_flow = None + assert not self.q("~u example.com", q) + def test_body(self): f = self.flow() diff --git a/test/mitmproxy/test_typemanager.py b/test/mitmproxy/test_typemanager.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/mitmproxy/test_typemanager.py diff --git a/test/mitmproxy/test_types.py b/test/mitmproxy/test_types.py new file mode 100644 index 00000000..72492fa9 --- /dev/null +++ b/test/mitmproxy/test_types.py @@ -0,0 +1,237 @@ +import pytest +import os +import typing +import contextlib + +from mitmproxy.test import tutils +import mitmproxy.exceptions +import mitmproxy.types +from mitmproxy.test import taddons +from mitmproxy.test import tflow +from mitmproxy import command +from mitmproxy import flow + +from . import test_command + + +@contextlib.contextmanager +def chdir(path: str): + old_dir = os.getcwd() + os.chdir(path) + yield + os.chdir(old_dir) + + +def test_bool(): + with taddons.context() as tctx: + b = mitmproxy.types._BoolType() + assert b.completion(tctx.master.commands, bool, "b") == ["false", "true"] + assert b.parse(tctx.master.commands, bool, "true") is True + assert b.parse(tctx.master.commands, bool, "false") is False + assert b.is_valid(tctx.master.commands, bool, True) is True + assert b.is_valid(tctx.master.commands, bool, "foo") is False + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, bool, "foo") + + +def test_str(): + with taddons.context() as tctx: + b = mitmproxy.types._StrType() + assert b.is_valid(tctx.master.commands, str, "foo") is True + assert b.is_valid(tctx.master.commands, str, 1) is False + assert b.completion(tctx.master.commands, str, "") == [] + assert b.parse(tctx.master.commands, str, "foo") == "foo" + + +def test_unknown(): + with taddons.context() as tctx: + b = mitmproxy.types._UnknownType() + assert b.is_valid(tctx.master.commands, mitmproxy.types.Unknown, "foo") is False + assert b.is_valid(tctx.master.commands, mitmproxy.types.Unknown, 1) is False + assert b.completion(tctx.master.commands, mitmproxy.types.Unknown, "") == [] + assert b.parse(tctx.master.commands, mitmproxy.types.Unknown, "foo") == "foo" + + +def test_int(): + with taddons.context() as tctx: + b = mitmproxy.types._IntType() + assert b.is_valid(tctx.master.commands, int, "foo") is False + assert b.is_valid(tctx.master.commands, int, 1) is True + assert b.completion(tctx.master.commands, int, "b") == [] + assert b.parse(tctx.master.commands, int, "1") == 1 + assert b.parse(tctx.master.commands, int, "999") == 999 + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, int, "foo") + + +def test_path(): + with taddons.context() as tctx: + b = mitmproxy.types._PathType() + assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo" + assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar" + assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "foo") is True + assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, 3) is False + + def normPathOpts(prefix, match): + ret = [] + for s in b.completion(tctx.master.commands, mitmproxy.types.Path, match): + s = s[len(prefix):] + s = s.replace(os.sep, "/") + ret.append(s) + return ret + + cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion")) + assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/'] + assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac'] + with chdir(cd): + assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/'] + assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/'] + assert b.completion( + tctx.master.commands, mitmproxy.types.Path, "nonexistent" + ) == ["nonexistent"] + + +def test_cmd(): + with taddons.context() as tctx: + tctx.master.addons.add(test_command.TAddon()) + b = mitmproxy.types._CmdType() + assert b.is_valid(tctx.master.commands, mitmproxy.types.Cmd, "foo") is False + assert b.is_valid(tctx.master.commands, mitmproxy.types.Cmd, "cmd1") is True + assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "cmd1") == "cmd1" + with pytest.raises(mitmproxy.exceptions.TypeError): + assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "foo") + assert len( + b.completion(tctx.master.commands, mitmproxy.types.Cmd, "") + ) == len(tctx.master.commands.commands.keys()) + + +def test_cutspec(): + with taddons.context() as tctx: + b = mitmproxy.types._CutSpecType() + b.parse(tctx.master.commands, mitmproxy.types.CutSpec, "foo,bar") == ["foo", "bar"] + assert b.is_valid(tctx.master.commands, mitmproxy.types.CutSpec, 1) is False + assert b.is_valid(tctx.master.commands, mitmproxy.types.CutSpec, "foo") is False + assert b.is_valid(tctx.master.commands, mitmproxy.types.CutSpec, "request.path") is True + + assert b.completion( + tctx.master.commands, mitmproxy.types.CutSpec, "request.p" + ) == b.valid_prefixes + ret = b.completion(tctx.master.commands, mitmproxy.types.CutSpec, "request.port,f") + assert ret[0].startswith("request.port,") + assert len(ret) == len(b.valid_prefixes) + + +def test_arg(): + with taddons.context() as tctx: + b = mitmproxy.types._ArgType() + assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == [] + assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo" + assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, "foo") is True + assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, 1) is False + + +def test_strseq(): + with taddons.context() as tctx: + b = mitmproxy.types._StrSeqType() + assert b.completion(tctx.master.commands, typing.Sequence[str], "") == [] + assert b.parse(tctx.master.commands, typing.Sequence[str], "foo") == ["foo"] + assert b.parse(tctx.master.commands, typing.Sequence[str], "foo,bar") == ["foo", "bar"] + assert b.is_valid(tctx.master.commands, typing.Sequence[str], ["foo"]) is True + assert b.is_valid(tctx.master.commands, typing.Sequence[str], ["a", "b", 3]) is False + assert b.is_valid(tctx.master.commands, typing.Sequence[str], 1) is False + assert b.is_valid(tctx.master.commands, typing.Sequence[str], "foo") is False + + +class DummyConsole: + @command.command("view.resolve") + def resolve(self, spec: str) -> typing.Sequence[flow.Flow]: + if spec == "err": + raise mitmproxy.exceptions.CommandError() + n = int(spec) + return [tflow.tflow(resp=True)] * n + + @command.command("cut") + def cut(self, spec: str) -> mitmproxy.types.Data: + return [["test"]] + + @command.command("options") + def options(self) -> typing.Sequence[str]: + return ["one", "two", "three"] + + +def test_flow(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + b = mitmproxy.types._FlowType() + assert len(b.completion(tctx.master.commands, flow.Flow, "")) == len(b.valid_prefixes) + assert b.parse(tctx.master.commands, flow.Flow, "1") + assert b.is_valid(tctx.master.commands, flow.Flow, tflow.tflow()) is True + assert b.is_valid(tctx.master.commands, flow.Flow, "xx") is False + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, flow.Flow, "0") + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, flow.Flow, "2") + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, flow.Flow, "err") + + +def test_flows(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + b = mitmproxy.types._FlowsType() + assert len( + b.completion(tctx.master.commands, typing.Sequence[flow.Flow], "") + ) == len(b.valid_prefixes) + assert b.is_valid(tctx.master.commands, typing.Sequence[flow.Flow], [tflow.tflow()]) is True + assert b.is_valid(tctx.master.commands, typing.Sequence[flow.Flow], "xx") is False + assert b.is_valid(tctx.master.commands, typing.Sequence[flow.Flow], 0) is False + assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "0")) == 0 + assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "1")) == 1 + assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "2")) == 2 + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "err") + + +def test_data(): + with taddons.context() as tctx: + b = mitmproxy.types._DataType() + assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, 0) is False + assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, []) is True + assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, [["x"]]) is True + assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, [[b"x"]]) is True + assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, [[1]]) is False + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, mitmproxy.types.Data, "foo") + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, mitmproxy.types.Data, "foo") + + +def test_choice(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + b = mitmproxy.types._ChoiceType() + assert b.is_valid( + tctx.master.commands, + mitmproxy.types.Choice("options"), + "one", + ) is True + assert b.is_valid( + tctx.master.commands, + mitmproxy.types.Choice("options"), + "invalid", + ) is False + assert b.is_valid( + tctx.master.commands, + mitmproxy.types.Choice("nonexistent"), + "invalid", + ) is False + comp = b.completion(tctx.master.commands, mitmproxy.types.Choice("options"), "") + assert comp == ["one", "two", "three"] + assert b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "one") == "one" + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "invalid") + + +def test_typemanager(): + assert mitmproxy.types.CommandTypes.get(bool, None) + assert mitmproxy.types.CommandTypes.get(mitmproxy.types.Choice("choide"), None) diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index 823af06d..2a96995d 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -1,36 +1,6 @@ -import os -import contextlib from mitmproxy.tools.console.commander import commander from mitmproxy.test import taddons -from mitmproxy.test import tutils - - -@contextlib.contextmanager -def chdir(path: str): - old_dir = os.getcwd() - os.chdir(path) - yield - os.chdir(old_dir) - - -def normPathOpts(prefix, match): - ret = [] - for s in commander.pathOptions(match): - s = s[len(prefix):] - s = s.replace(os.sep, "/") - ret.append(s) - return ret - - -def test_pathOptions(): - cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion")) - assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/'] - assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac'] - with chdir(cd): - assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/'] - assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/'] - assert commander.pathOptions("nonexistent") == ["nonexistent"] class TestListCompleter: @@ -72,16 +42,16 @@ class TestCommandBuffer: with taddons.context() as tctx: for start, output in tests: cb = commander.CommandBuffer(tctx.master) - cb.buf, cb.cursor = start[0], start[1] + cb.text, cb.cursor = start[0], start[1] cb.backspace() - assert cb.buf == output[0] + assert cb.text == output[0] assert cb.cursor == output[1] def test_left(self): cursors = [3, 2, 1, 0, 0] with taddons.context() as tctx: cb = commander.CommandBuffer(tctx.master) - cb.buf, cb.cursor = "abcd", 4 + cb.text, cb.cursor = "abcd", 4 for c in cursors: cb.left() assert cb.cursor == c @@ -90,7 +60,7 @@ class TestCommandBuffer: cursors = [1, 2, 3, 4, 4] with taddons.context() as tctx: cb = commander.CommandBuffer(tctx.master) - cb.buf, cb.cursor = "abcd", 0 + cb.text, cb.cursor = "abcd", 0 for c in cursors: cb.right() assert cb.cursor == c @@ -104,20 +74,25 @@ class TestCommandBuffer: with taddons.context() as tctx: for start, output in tests: cb = commander.CommandBuffer(tctx.master) - cb.buf, cb.cursor = start[0], start[1] + cb.text, cb.cursor = start[0], start[1] cb.insert("x") - assert cb.buf == output[0] + assert cb.text == output[0] assert cb.cursor == output[1] def test_cycle_completion(self): with taddons.context() as tctx: cb = commander.CommandBuffer(tctx.master) - cb.buf = "foo bar" - cb.cursor = len(cb.buf) + cb.text = "foo bar" + cb.cursor = len(cb.text) cb.cycle_completion() def test_render(self): with taddons.context() as tctx: cb = commander.CommandBuffer(tctx.master) - cb.buf = "foo" - assert cb.render() == "foo" + cb.text = "foo" + assert cb.render() + + def test_flatten(self): + with taddons.context() as tctx: + cb = commander.CommandBuffer(tctx.master) + assert cb.flatten("foo bar") == "foo bar" diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py index 66b1884e..5295fff5 100644 --- a/test/mitmproxy/utils/test_typecheck.py +++ b/test/mitmproxy/utils/test_typecheck.py @@ -4,7 +4,6 @@ from unittest import mock import pytest from mitmproxy.utils import typecheck -from mitmproxy import command class TBase: @@ -88,31 +87,6 @@ def test_check_any(): typecheck.check_option_type("foo", None, typing.Any) -def test_check_command_type(): - assert(typecheck.check_command_type("foo", str)) - assert(typecheck.check_command_type(["foo"], typing.Sequence[str])) - assert(not typecheck.check_command_type(["foo", 1], typing.Sequence[str])) - assert(typecheck.check_command_type(None, None)) - assert(not typecheck.check_command_type(["foo"], typing.Sequence[int])) - assert(not typecheck.check_command_type("foo", typing.Sequence[int])) - assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts)) - assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts)) - assert(not typecheck.check_command_type([["foo", 22]], command.Cuts)) - - # Python 3.5 only defines __parameters__ - m = mock.Mock() - m.__str__ = lambda self: "typing.Sequence" - m.__parameters__ = (int,) - - typecheck.check_command_type([10], m) - - # Python 3.5 only defines __union_params__ - m = mock.Mock() - m.__str__ = lambda self: "typing.Union" - m.__union_params__ = (int,) - assert not typecheck.check_command_type([22], m) - - def test_typesec_to_str(): assert(typecheck.typespec_to_str(str)) == "str" assert(typecheck.typespec_to_str(typing.Sequence[str])) == "sequence of str" |