aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/addons/clientplayback.py3
-rw-r--r--mitmproxy/addons/core.py9
-rw-r--r--mitmproxy/addons/cut.py19
-rw-r--r--mitmproxy/addons/export.py3
-rw-r--r--mitmproxy/addons/save.py12
-rw-r--r--mitmproxy/addons/serverplayback.py3
-rw-r--r--mitmproxy/addons/view.py2
-rw-r--r--mitmproxy/command.py268
-rw-r--r--mitmproxy/contrib/wsproto/__init__.py13
-rw-r--r--mitmproxy/contrib/wsproto/extensions.py2
-rw-r--r--mitmproxy/contrib/wsproto/frame_protocol.py2
-rw-r--r--mitmproxy/exceptions.py4
-rw-r--r--mitmproxy/flowfilter.py10
-rw-r--r--mitmproxy/master.py30
-rw-r--r--mitmproxy/proxy/protocol/http.py1
-rw-r--r--mitmproxy/proxy/protocol/websocket.py10
-rw-r--r--mitmproxy/test/tflow.py6
-rw-r--r--mitmproxy/tools/console/commander/commander.py118
-rw-r--r--mitmproxy/tools/console/commandexecutor.py4
-rw-r--r--mitmproxy/tools/console/commands.py2
-rw-r--r--mitmproxy/tools/console/consoleaddons.py62
-rw-r--r--mitmproxy/tools/console/defaultkeys.py2
-rw-r--r--mitmproxy/tools/console/keymap.py17
-rw-r--r--mitmproxy/tools/console/options.py1
-rw-r--r--mitmproxy/tools/console/overlay.py13
-rw-r--r--mitmproxy/tools/console/palettes.py20
-rw-r--r--mitmproxy/types.py445
-rw-r--r--mitmproxy/utils/typecheck.py35
-rw-r--r--mitmproxy/websocket.py18
-rw-r--r--setup.cfg3
-rw-r--r--test/mitmproxy/addons/test_cut.py5
-rw-r--r--test/mitmproxy/addons/test_save.py13
-rw-r--r--test/mitmproxy/test_command.py219
-rw-r--r--test/mitmproxy/test_flow.py16
-rw-r--r--test/mitmproxy/test_flowfilter.py14
-rw-r--r--test/mitmproxy/test_typemanager.py0
-rw-r--r--test/mitmproxy/test_types.py237
-rw-r--r--test/mitmproxy/tools/console/test_commander.py55
-rw-r--r--test/mitmproxy/utils/test_typecheck.py26
39 files changed, 1208 insertions, 514 deletions
diff --git a/mitmproxy/addons/clientplayback.py b/mitmproxy/addons/clientplayback.py
index fcc3209b..bed06e82 100644
--- a/mitmproxy/addons/clientplayback.py
+++ b/mitmproxy/addons/clientplayback.py
@@ -3,6 +3,7 @@ from mitmproxy import ctx
from mitmproxy import io
from mitmproxy import flow
from mitmproxy import command
+import mitmproxy.types
import typing
@@ -37,7 +38,7 @@ class ClientPlayback:
ctx.master.addons.trigger("update", [])
@command.command("replay.client.file")
- def load_file(self, path: command.Path) -> None:
+ def load_file(self, path: mitmproxy.types.Path) -> None:
try:
flows = io.read_flows_from_paths([path])
except exceptions.FlowReadException as e:
diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py
index 4191d490..2b0b2f14 100644
--- a/mitmproxy/addons/core.py
+++ b/mitmproxy/addons/core.py
@@ -6,6 +6,7 @@ from mitmproxy import command
from mitmproxy import flow
from mitmproxy import optmanager
from mitmproxy.net.http import status_codes
+import mitmproxy.types
class Core:
@@ -96,7 +97,7 @@ class Core:
]
@command.command("flow.set")
- @command.argument("spec", type=command.Choice("flow.set.options"))
+ @command.argument("spec", type=mitmproxy.types.Choice("flow.set.options"))
def flow_set(
self,
flows: typing.Sequence[flow.Flow],
@@ -187,7 +188,7 @@ class Core:
ctx.log.alert("Toggled encoding on %s flows." % len(updated))
@command.command("flow.encode")
- @command.argument("enc", type=command.Choice("flow.encode.options"))
+ @command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options"))
def encode(
self,
flows: typing.Sequence[flow.Flow],
@@ -216,7 +217,7 @@ class Core:
return ["gzip", "deflate", "br"]
@command.command("options.load")
- def options_load(self, path: command.Path) -> None:
+ def options_load(self, path: mitmproxy.types.Path) -> None:
"""
Load options from a file.
"""
@@ -228,7 +229,7 @@ class Core:
) from e
@command.command("options.save")
- def options_save(self, path: command.Path) -> None:
+ def options_save(self, path: mitmproxy.types.Path) -> None:
"""
Save options to a file.
"""
diff --git a/mitmproxy/addons/cut.py b/mitmproxy/addons/cut.py
index efc9e5df..f4b560e8 100644
--- a/mitmproxy/addons/cut.py
+++ b/mitmproxy/addons/cut.py
@@ -7,6 +7,7 @@ from mitmproxy import flow
from mitmproxy import ctx
from mitmproxy import certs
from mitmproxy.utils import strutils
+import mitmproxy.types
import pyperclip
@@ -35,6 +36,8 @@ def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]:
if spec == "host" and is_addr(current):
return str(current[0])
elif spec.startswith("header["):
+ if not current:
+ return ""
return current.headers.get(headername(spec), "")
elif isinstance(part, bytes):
return part
@@ -51,8 +54,8 @@ class Cut:
def cut(
self,
flows: typing.Sequence[flow.Flow],
- cuts: typing.Sequence[command.Cut]
- ) -> command.Cuts:
+ cuts: mitmproxy.types.CutSpec,
+ ) -> mitmproxy.types.Data:
"""
Cut data from a set of flows. Cut specifications are attribute paths
from the base of the flow object, with a few conveniences - "port"
@@ -62,17 +65,17 @@ class Cut:
or "false", "bytes" are preserved, and all other values are
converted to strings.
"""
- ret = []
+ ret = [] # type:typing.List[typing.List[typing.Union[str, bytes]]]
for f in flows:
ret.append([extract(c, f) for c in cuts])
- return ret
+ return ret # type: ignore
@command.command("cut.save")
def save(
self,
flows: typing.Sequence[flow.Flow],
- cuts: typing.Sequence[command.Cut],
- path: command.Path
+ cuts: mitmproxy.types.CutSpec,
+ path: mitmproxy.types.Path
) -> None:
"""
Save cuts to file. If there are multiple flows or cuts, the format
@@ -84,7 +87,7 @@ class Cut:
append = False
if path.startswith("+"):
append = True
- path = command.Path(path[1:])
+ path = mitmproxy.types.Path(path[1:])
if len(cuts) == 1 and len(flows) == 1:
with open(path, "ab" if append else "wb") as fp:
if fp.tell() > 0:
@@ -110,7 +113,7 @@ class Cut:
def clip(
self,
flows: typing.Sequence[flow.Flow],
- cuts: typing.Sequence[command.Cut],
+ cuts: mitmproxy.types.CutSpec,
) -> None:
"""
Send cuts to the clipboard. If there are multiple flows or cuts, the
diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py
index 5388a0e8..0169f5b1 100644
--- a/mitmproxy/addons/export.py
+++ b/mitmproxy/addons/export.py
@@ -5,6 +5,7 @@ from mitmproxy import flow
from mitmproxy import exceptions
from mitmproxy.utils import strutils
from mitmproxy.net.http.http1 import assemble
+import mitmproxy.types
import pyperclip
@@ -49,7 +50,7 @@ class Export():
return list(sorted(formats.keys()))
@command.command("export.file")
- def file(self, fmt: str, f: flow.Flow, path: command.Path) -> None:
+ def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None:
"""
Export a flow to path.
"""
diff --git a/mitmproxy/addons/save.py b/mitmproxy/addons/save.py
index 40cd6f82..44afef68 100644
--- a/mitmproxy/addons/save.py
+++ b/mitmproxy/addons/save.py
@@ -7,6 +7,7 @@ from mitmproxy import flowfilter
from mitmproxy import io
from mitmproxy import ctx
from mitmproxy import flow
+import mitmproxy.types
class Save:
@@ -50,7 +51,7 @@ class Save:
self.start_stream_to_path(ctx.options.save_stream_file, self.filt)
@command.command("save.file")
- def save(self, flows: typing.Sequence[flow.Flow], path: command.Path) -> None:
+ def save(self, flows: typing.Sequence[flow.Flow], path: mitmproxy.types.Path) -> None:
"""
Save flows to a file. If the path starts with a +, flows are
appended to the file, otherwise it is over-written.
@@ -74,6 +75,15 @@ class Save:
self.stream.add(flow)
self.active_flows.discard(flow)
+ def websocket_start(self, flow):
+ if self.stream:
+ self.active_flows.add(flow)
+
+ def websocket_end(self, flow):
+ if self.stream:
+ self.stream.add(flow)
+ self.active_flows.discard(flow)
+
def response(self, flow):
if self.stream:
self.stream.add(flow)
diff --git a/mitmproxy/addons/serverplayback.py b/mitmproxy/addons/serverplayback.py
index 46968a8d..20fcfc2a 100644
--- a/mitmproxy/addons/serverplayback.py
+++ b/mitmproxy/addons/serverplayback.py
@@ -9,6 +9,7 @@ from mitmproxy import flow
from mitmproxy import exceptions
from mitmproxy import io
from mitmproxy import command
+import mitmproxy.types
class ServerPlayback:
@@ -31,7 +32,7 @@ class ServerPlayback:
ctx.master.addons.trigger("update", [])
@command.command("replay.server.file")
- def load_file(self, path: command.Path) -> None:
+ def load_file(self, path: mitmproxy.types.Path) -> None:
try:
flows = io.read_flows_from_paths([path])
except exceptions.FlowReadException as e:
diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py
index e45f2baf..3a15fd3e 100644
--- a/mitmproxy/addons/view.py
+++ b/mitmproxy/addons/view.py
@@ -351,7 +351,7 @@ class View(collections.Sequence):
ctx.master.addons.trigger("update", updated)
@command.command("view.load")
- def load_file(self, path: command.Path) -> None:
+ def load_file(self, path: mitmproxy.types.Path) -> None:
"""
Load flows into the view, without processing them with addons.
"""
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
index c86d9792..e1e56d3a 100644
--- a/mitmproxy/command.py
+++ b/mitmproxy/command.py
@@ -10,9 +10,16 @@ import textwrap
import functools
import sys
-from mitmproxy.utils import typecheck
from mitmproxy import exceptions
-from mitmproxy import flow
+import mitmproxy.types
+
+
+def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
+ sig = inspect.signature(f)
+ try:
+ sig.bind(*args, **kwargs)
+ except TypeError as v:
+ raise exceptions.CommandError("command argument mismatch: %s" % v.args[0])
def lexer(s):
@@ -24,113 +31,14 @@ def lexer(s):
return lex
-# This is an awkward location for these values, but it's better than having
-# the console core import and depend on an addon. FIXME: Add a way for
-# addons to add custom types and manage their completion and validation.
-valid_flow_prefixes = [
- "@all",
- "@focus",
- "@shown",
- "@hidden",
- "@marked",
- "@unmarked",
- "~q",
- "~s",
- "~a",
- "~hq",
- "~hs",
- "~b",
- "~bq",
- "~bs",
- "~t",
- "~d",
- "~m",
- "~u",
- "~c",
-]
-
-
-Cuts = typing.Sequence[
- typing.Sequence[typing.Union[str, bytes]]
-]
-
-
-class Cut(str):
- # This is an awkward location for these values, but it's better than having
- # the console core import and depend on an addon. FIXME: Add a way for
- # addons to add custom types and manage their completion and validation.
- valid_prefixes = [
- "request.method",
- "request.scheme",
- "request.host",
- "request.http_version",
- "request.port",
- "request.path",
- "request.url",
- "request.text",
- "request.content",
- "request.raw_content",
- "request.timestamp_start",
- "request.timestamp_end",
- "request.header[",
-
- "response.status_code",
- "response.reason",
- "response.text",
- "response.content",
- "response.timestamp_start",
- "response.timestamp_end",
- "response.raw_content",
- "response.header[",
-
- "client_conn.address.port",
- "client_conn.address.host",
- "client_conn.tls_version",
- "client_conn.sni",
- "client_conn.ssl_established",
-
- "server_conn.address.port",
- "server_conn.address.host",
- "server_conn.ip_address.host",
- "server_conn.tls_version",
- "server_conn.sni",
- "server_conn.ssl_established",
- ]
-
-
-class Path(str):
- pass
-
-
-class Cmd(str):
- pass
-
-
-class Arg(str):
- pass
-
-
def typename(t: type) -> str:
"""
- Translates a type to an explanatory string. If ret is True, we're
- looking at a return type, else we're looking at a parameter type.
+ Translates a type to an explanatory string.
"""
- if isinstance(t, Choice):
- return "choice"
- elif t == typing.Sequence[flow.Flow]:
- return "[flow]"
- elif t == typing.Sequence[str]:
- return "[str]"
- elif t == typing.Sequence[Cut]:
- return "[cut]"
- elif t == Cuts:
- return "[cuts]"
- elif t == flow.Flow:
- return "flow"
- elif issubclass(t, (str, int, bool)):
- return t.__name__.lower()
- else: # pragma: no cover
+ to = mitmproxy.types.CommandTypes.get(t, None)
+ if not to:
raise NotImplementedError(t)
+ return to.display
class Command:
@@ -168,13 +76,12 @@ class Command:
ret = " -> " + ret
return "%s %s%s" % (self.path, params, ret)
- def call(self, args: typing.Sequence[str]):
+ def call(self, args: typing.Sequence[str]) -> typing.Any:
"""
Call the command with a list of arguments. At this point, all
arguments are strings.
"""
- if not self.has_positional and (len(self.paramtypes) != len(args)):
- raise exceptions.CommandError("Usage: %s" % self.signature_help())
+ verify_arg_signature(self.func, list(args), {})
remainder = [] # type: typing.Sequence[str]
if self.has_positional:
@@ -183,37 +90,35 @@ class Command:
pargs = []
for arg, paramtype in zip(args, self.paramtypes):
- if typecheck.check_command_type(arg, paramtype):
- pargs.append(arg)
- else:
- pargs.append(parsearg(self.manager, arg, paramtype))
-
- if remainder:
- chk = typecheck.check_command_type(
- remainder,
- typing.Sequence[self.paramtypes[-1]] # type: ignore
- )
- if chk:
- pargs.extend(remainder)
- else:
- raise exceptions.CommandError("Invalid value type: %s - expected %s" % (remainder, self.paramtypes[-1]))
+ pargs.append(parsearg(self.manager, arg, paramtype))
+ pargs.extend(remainder)
with self.manager.master.handlecontext():
ret = self.func(*pargs)
- if not typecheck.check_command_type(ret, self.returntype):
- raise exceptions.CommandError("Command returned unexpected data")
-
+ if ret is None and self.returntype is None:
+ return
+ typ = mitmproxy.types.CommandTypes.get(self.returntype)
+ if not typ.is_valid(self.manager, typ, ret):
+ raise exceptions.CommandError(
+ "%s returned unexpected data - expected %s" % (
+ self.path, typ.display
+ )
+ )
return ret
ParseResult = typing.NamedTuple(
"ParseResult",
- [("value", str), ("type", typing.Type)],
+ [
+ ("value", str),
+ ("type", typing.Type),
+ ("valid", bool),
+ ],
)
-class CommandManager:
+class CommandManager(mitmproxy.types._CommandBase):
def __init__(self, master):
self.master = master
self.commands = {}
@@ -228,9 +133,12 @@ class CommandManager:
def add(self, path: str, func: typing.Callable):
self.commands[path] = Command(self, path, func)
- def parse_partial(self, cmdstr: str) -> typing.Sequence[ParseResult]:
+ def parse_partial(
+ self,
+ cmdstr: str
+ ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[str]]:
"""
- Parse a possibly partial command. Return a sequence of (part, type) tuples.
+ Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
"""
buf = io.StringIO(cmdstr)
parts = [] # type: typing.List[str]
@@ -255,21 +163,43 @@ class CommandManager:
typ = None # type: typing.Type
for i in range(len(parts)):
if i == 0:
- typ = Cmd
+ typ = mitmproxy.types.Cmd
if parts[i] in self.commands:
params.extend(self.commands[parts[i]].paramtypes)
elif params:
typ = params.pop(0)
- # FIXME: Do we need to check that Arg is positional?
- if typ == Cmd and params and params[0] == Arg:
+ if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg:
if parts[i] in self.commands:
params[:] = self.commands[parts[i]].paramtypes
else:
- typ = str
- parse.append(ParseResult(value=parts[i], type=typ))
- return parse
+ typ = mitmproxy.types.Unknown
+
+ to = mitmproxy.types.CommandTypes.get(typ, None)
+ valid = False
+ if to:
+ try:
+ to.parse(self, typ, parts[i])
+ except exceptions.TypeError:
+ valid = False
+ else:
+ valid = True
+
+ parse.append(
+ ParseResult(
+ value=parts[i],
+ type=typ,
+ valid=valid,
+ )
+ )
- def call_args(self, path, args):
+ remhelp = [] # type: typing.List[str]
+ for x in params:
+ remt = mitmproxy.types.CommandTypes.get(x, None)
+ remhelp.append(remt.display)
+
+ return parse, remhelp
+
+ def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any:
"""
Call a command using a list of string arguments. May raise CommandError.
"""
@@ -300,53 +230,13 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
Convert a string to a argument to the appropriate type.
"""
- if isinstance(argtype, Choice):
- cmd = argtype.options_command
- opts = manager.call(cmd)
- if spec not in opts:
- raise exceptions.CommandError(
- "Invalid choice: see %s for options" % cmd
- )
- return spec
- elif issubclass(argtype, str):
- return spec
- elif argtype == bool:
- if spec == "true":
- return True
- elif spec == "false":
- return False
- else:
- raise exceptions.CommandError(
- "Booleans are 'true' or 'false', got %s" % spec
- )
- elif issubclass(argtype, int):
- try:
- return int(spec)
- except ValueError as e:
- raise exceptions.CommandError("Expected an integer, got %s." % spec)
- elif argtype == typing.Sequence[flow.Flow]:
- return manager.call_args("view.resolve", [spec])
- elif argtype == Cuts:
- return manager.call_args("cut", [spec])
- elif argtype == flow.Flow:
- flows = manager.call_args("view.resolve", [spec])
- if len(flows) != 1:
- raise exceptions.CommandError(
- "Command requires one flow, specification matched %s." % len(flows)
- )
- return flows[0]
- elif argtype in (typing.Sequence[str], typing.Sequence[Cut]):
- return [i.strip() for i in spec.split(",")]
- else:
+ t = mitmproxy.types.CommandTypes.get(argtype, None)
+ if not t:
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
-
-
-def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
- sig = inspect.signature(f)
try:
- sig.bind(*args, **kwargs)
- except TypeError as v:
- raise exceptions.CommandError("Argument mismatch: %s" % v.args[0])
+ return t.parse(manager, argtype, spec) # type: ignore
+ except exceptions.TypeError as e:
+ raise exceptions.CommandError from e
def command(path):
@@ -360,21 +250,11 @@ def command(path):
return decorator
-class Choice:
- def __init__(self, options_command):
- self.options_command = options_command
-
- def __instancecheck__(self, instance):
- # return false here so that arguments are piped through parsearg,
- # which does extended validation.
- return False
-
-
def argument(name, type):
"""
- Set the type of a command argument at runtime.
- This is useful for more specific types such as command.Choice, which we cannot annotate
- directly as mypy does not like that.
+ Set the type of a command argument at runtime. This is useful for more
+ specific types such as mitmproxy.types.Choice, which we cannot annotate
+ directly as mypy does not like that.
"""
def decorator(f: types.FunctionType) -> types.FunctionType:
assert name in f.__annotations__
diff --git a/mitmproxy/contrib/wsproto/__init__.py b/mitmproxy/contrib/wsproto/__init__.py
new file mode 100644
index 00000000..d0592bc5
--- /dev/null
+++ b/mitmproxy/contrib/wsproto/__init__.py
@@ -0,0 +1,13 @@
+from . import compat
+from . import connection
+from . import events
+from . import extensions
+from . import frame_protocol
+
+__all__ = [
+ 'compat',
+ 'connection',
+ 'events',
+ 'extensions',
+ 'frame_protocol',
+]
diff --git a/mitmproxy/contrib/wsproto/extensions.py b/mitmproxy/contrib/wsproto/extensions.py
index f7cf4fb6..0e0d2018 100644
--- a/mitmproxy/contrib/wsproto/extensions.py
+++ b/mitmproxy/contrib/wsproto/extensions.py
@@ -1,3 +1,5 @@
+# type: ignore
+
# -*- coding: utf-8 -*-
"""
wsproto/extensions
diff --git a/mitmproxy/contrib/wsproto/frame_protocol.py b/mitmproxy/contrib/wsproto/frame_protocol.py
index b95dceec..30f146c6 100644
--- a/mitmproxy/contrib/wsproto/frame_protocol.py
+++ b/mitmproxy/contrib/wsproto/frame_protocol.py
@@ -1,3 +1,5 @@
+# type: ignore
+
# -*- coding: utf-8 -*-
"""
wsproto/frame_protocol
diff --git a/mitmproxy/exceptions.py b/mitmproxy/exceptions.py
index 71517480..d568898b 100644
--- a/mitmproxy/exceptions.py
+++ b/mitmproxy/exceptions.py
@@ -112,6 +112,10 @@ class AddonHalt(MitmproxyException):
pass
+class TypeError(MitmproxyException):
+ pass
+
+
"""
Net-layer exceptions
"""
diff --git a/mitmproxy/flowfilter.py b/mitmproxy/flowfilter.py
index 23e47e2b..d1fd8299 100644
--- a/mitmproxy/flowfilter.py
+++ b/mitmproxy/flowfilter.py
@@ -322,8 +322,10 @@ class FDomain(_Rex):
flags = re.IGNORECASE
is_binary = False
- @only(http.HTTPFlow)
+ @only(http.HTTPFlow, websocket.WebSocketFlow)
def __call__(self, f):
+ if isinstance(f, websocket.WebSocketFlow):
+ f = f.handshake_flow
return bool(
self.re.search(f.request.host) or
self.re.search(f.request.pretty_host)
@@ -342,9 +344,11 @@ class FUrl(_Rex):
toks = toks[1:]
return klass(*toks)
- @only(http.HTTPFlow)
+ @only(http.HTTPFlow, websocket.WebSocketFlow)
def __call__(self, f):
- if not f.request:
+ if isinstance(f, websocket.WebSocketFlow):
+ f = f.handshake_flow
+ if not f or not f.request:
return False
return self.re.search(f.request.pretty_url)
diff --git a/mitmproxy/master.py b/mitmproxy/master.py
index 5997ff6d..de3b24e1 100644
--- a/mitmproxy/master.py
+++ b/mitmproxy/master.py
@@ -9,6 +9,7 @@ from mitmproxy import eventsequence
from mitmproxy import exceptions
from mitmproxy import command
from mitmproxy import http
+from mitmproxy import websocket
from mitmproxy import log
from mitmproxy.net import server_spec
from mitmproxy.proxy.protocol import http_replay
@@ -41,6 +42,7 @@ class Master:
self.should_exit = threading.Event()
self._server = None
self.first_tick = True
+ self.waiting_flows = []
@property
def server(self):
@@ -117,15 +119,33 @@ class Master:
self.should_exit.set()
self.addons.trigger("done")
+ def _change_reverse_host(self, f):
+ """
+ When we load flows in reverse proxy mode, we adjust the target host to
+ the reverse proxy destination for all flows we load. This makes it very
+ easy to replay saved flows against a different host.
+ """
+ if self.options.mode.startswith("reverse:"):
+ _, upstream_spec = server_spec.parse_with_mode(self.options.mode)
+ f.request.host, f.request.port = upstream_spec.address
+ f.request.scheme = upstream_spec.scheme
+
def load_flow(self, f):
"""
- Loads a flow
+ Loads a flow and links websocket & handshake flows
"""
+
if isinstance(f, http.HTTPFlow):
- if self.options.mode.startswith("reverse:"):
- _, upstream_spec = server_spec.parse_with_mode(self.options.mode)
- f.request.host, f.request.port = upstream_spec.address
- f.request.scheme = upstream_spec.scheme
+ self._change_reverse_host(f)
+ if 'websocket' in f.metadata:
+ self.waiting_flows.append(f)
+
+ if isinstance(f, websocket.WebSocketFlow):
+ hf = [hf for hf in self.waiting_flows if hf.id == f.metadata['websocket_handshake']][0]
+ f.handshake_flow = hf
+ self.waiting_flows.remove(hf)
+ self._change_reverse_host(f.handshake_flow)
+
f.reply = controller.DummyReply()
for e, o in eventsequence.iterate(f):
self.addons.handle_lifecycle(e, o)
diff --git a/mitmproxy/proxy/protocol/http.py b/mitmproxy/proxy/protocol/http.py
index 57ac0f16..076ffa62 100644
--- a/mitmproxy/proxy/protocol/http.py
+++ b/mitmproxy/proxy/protocol/http.py
@@ -321,6 +321,7 @@ class HttpLayer(base.Layer):
try:
if websockets.check_handshake(request.headers) and websockets.check_client_version(request.headers):
+ f.metadata['websocket'] = True
# We only support RFC6455 with WebSocket version 13
# allow inline scripts to manipulate the client handshake
self.channel.ask("websocket_handshake", f)
diff --git a/mitmproxy/proxy/protocol/websocket.py b/mitmproxy/proxy/protocol/websocket.py
index 34dcba06..1bd5284d 100644
--- a/mitmproxy/proxy/protocol/websocket.py
+++ b/mitmproxy/proxy/protocol/websocket.py
@@ -1,10 +1,11 @@
import socket
from OpenSSL import SSL
+
+from mitmproxy.contrib import wsproto
from mitmproxy.contrib.wsproto import events
from mitmproxy.contrib.wsproto.connection import ConnectionType, WSConnection
from mitmproxy.contrib.wsproto.extensions import PerMessageDeflate
-from mitmproxy.contrib.wsproto.frame_protocol import Opcode
from mitmproxy import exceptions
from mitmproxy import flow
@@ -93,11 +94,14 @@ class WebSocketLayer(base.Layer):
if event.message_finished:
original_chunk_sizes = [len(f) for f in fb]
- message_type = Opcode.TEXT if isinstance(event, events.TextReceived) else Opcode.BINARY
- if message_type == Opcode.TEXT:
+
+ if isinstance(event, events.TextReceived):
+ message_type = wsproto.frame_protocol.Opcode.TEXT
payload = ''.join(fb)
else:
+ message_type = wsproto.frame_protocol.Opcode.BINARY
payload = b''.join(fb)
+
fb.clear()
websocket_message = WebSocketMessage(message_type, not is_server, payload)
diff --git a/mitmproxy/test/tflow.py b/mitmproxy/test/tflow.py
index c3dab30c..91747866 100644
--- a/mitmproxy/test/tflow.py
+++ b/mitmproxy/test/tflow.py
@@ -44,7 +44,7 @@ def twebsocketflow(client_conn=True, server_conn=True, messages=True, err=None,
"GET",
"http",
"example.com",
- "80",
+ 80,
"/ws",
"HTTP/1.1",
headers=net_http.Headers(
@@ -75,7 +75,9 @@ def twebsocketflow(client_conn=True, server_conn=True, messages=True, err=None,
handshake_flow.response = resp
f = websocket.WebSocketFlow(client_conn, server_conn, handshake_flow)
- handshake_flow.metadata['websocket_flow'] = f
+ f.metadata['websocket_handshake'] = handshake_flow.id
+ handshake_flow.metadata['websocket_flow'] = f.id
+ handshake_flow.metadata['websocket'] = True
if messages is True:
messages = [
diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py
index ef32b953..30e8b13b 100644
--- a/mitmproxy/tools/console/commander/commander.py
+++ b/mitmproxy/tools/console/commander/commander.py
@@ -1,6 +1,4 @@
import abc
-import glob
-import os
import typing
import urwid
@@ -9,6 +7,7 @@ from urwid.text_layout import calc_coords
import mitmproxy.flow
import mitmproxy.master
import mitmproxy.command
+import mitmproxy.types
class Completer: # pragma: no cover
@@ -39,30 +38,6 @@ class ListCompleter(Completer):
return ret
-# Generates the completion options for a specific starting input
-def pathOptions(start: str) -> typing.Sequence[str]:
- if not start:
- start = "./"
- path = os.path.expanduser(start)
- ret = []
- if os.path.isdir(path):
- files = glob.glob(os.path.join(path, "*"))
- prefix = start
- else:
- files = glob.glob(path + "*")
- prefix = os.path.dirname(start)
- prefix = prefix or "./"
- for f in files:
- display = os.path.join(prefix, os.path.normpath(os.path.basename(f)))
- if os.path.isdir(f):
- display += "/"
- ret.append(display)
- if not ret:
- ret = [start]
- ret.sort()
- return ret
-
-
CompletionState = typing.NamedTuple(
"CompletionState",
[
@@ -75,9 +50,9 @@ CompletionState = typing.NamedTuple(
class CommandBuffer():
def __init__(self, master: mitmproxy.master.Master, start: str = "") -> None:
self.master = master
- self.buf = start
+ self.text = self.flatten(start)
# Cursor is always within the range [0:len(buffer)].
- self._cursor = len(self.buf)
+ self._cursor = len(self.text)
self.completion = None # type: CompletionState
@property
@@ -88,13 +63,40 @@ class CommandBuffer():
def cursor(self, x) -> None:
if x < 0:
self._cursor = 0
- elif x > len(self.buf):
- self._cursor = len(self.buf)
+ elif x > len(self.text):
+ self._cursor = len(self.text)
else:
self._cursor = x
def render(self):
- return self.buf
+ """
+ This function is somewhat tricky - in order to make the cursor
+ position valid, we have to make sure there is a
+ character-for-character offset match in the rendered output, up
+ to the cursor. Beyond that, we can add stuff.
+ """
+ parts, remhelp = self.master.commands.parse_partial(self.text)
+ ret = []
+ for p in parts:
+ if p.valid:
+ if p.type == mitmproxy.types.Cmd:
+ ret.append(("commander_command", p.value))
+ else:
+ ret.append(("text", p.value))
+ elif p.value:
+ ret.append(("commander_invalid", p.value))
+ else:
+ ret.append(("text", ""))
+ ret.append(("text", " "))
+ if remhelp:
+ ret.append(("text", " "))
+ for v in remhelp:
+ ret.append(("commander_hint", "%s " % v))
+ return ret
+
+ def flatten(self, txt):
+ parts, _ = self.master.commands.parse_partial(txt)
+ return " ".join([x.value for x in parts])
def left(self) -> None:
self.cursor = self.cursor - 1
@@ -104,50 +106,14 @@ class CommandBuffer():
def cycle_completion(self) -> None:
if not self.completion:
- parts = self.master.commands.parse_partial(self.buf[:self.cursor])
+ parts, remainhelp = self.master.commands.parse_partial(self.text[:self.cursor])
last = parts[-1]
- if last.type == mitmproxy.command.Cmd:
- self.completion = CompletionState(
- completer = ListCompleter(
- parts[-1].value,
- self.master.commands.commands.keys(),
- ),
- parse = parts,
- )
- if last.type == typing.Sequence[mitmproxy.command.Cut]:
- spec = parts[-1].value.split(",")
- opts = []
- for pref in mitmproxy.command.Cut.valid_prefixes:
- spec[-1] = pref
- opts.append(",".join(spec))
- self.completion = CompletionState(
- completer = ListCompleter(
- parts[-1].value,
- opts,
- ),
- parse = parts,
- )
- elif isinstance(last.type, mitmproxy.command.Choice):
+ ct = mitmproxy.types.CommandTypes.get(last.type, None)
+ if ct:
self.completion = CompletionState(
completer = ListCompleter(
parts[-1].value,
- self.master.commands.call(last.type.options_command),
- ),
- parse = parts,
- )
- elif last.type == mitmproxy.command.Path:
- self.completion = CompletionState(
- completer = ListCompleter(
- "",
- pathOptions(parts[1].value)
- ),
- parse = parts,
- )
- elif last.type in (typing.Sequence[mitmproxy.flow.Flow], mitmproxy.flow.Flow):
- self.completion = CompletionState(
- completer = ListCompleter(
- "",
- mitmproxy.command.valid_flow_prefixes,
+ ct.completion(self.master.commands, last.type, parts[-1].value)
),
parse = parts,
)
@@ -155,13 +121,13 @@ class CommandBuffer():
nxt = self.completion.completer.cycle()
buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt
buf = buf.strip()
- self.buf = buf
- self.cursor = len(self.buf)
+ self.text = self.flatten(buf)
+ self.cursor = len(self.text)
def backspace(self) -> None:
if self.cursor == 0:
return
- self.buf = self.buf[:self.cursor - 1] + self.buf[self.cursor:]
+ self.text = self.flatten(self.text[:self.cursor - 1] + self.text[self.cursor:])
self.cursor = self.cursor - 1
self.completion = None
@@ -169,7 +135,7 @@ class CommandBuffer():
"""
Inserts text at the cursor.
"""
- self.buf = self.buf = self.buf[:self.cursor] + k + self.buf[self.cursor:]
+ self.text = self.flatten(self.text[:self.cursor] + k + self.text[self.cursor:])
self.cursor += 1
self.completion = None
@@ -213,4 +179,4 @@ class CommandEdit(urwid.WidgetWrap):
return x, y
def get_value(self):
- return self.cbuf.buf
+ return self.cbuf.text
diff --git a/mitmproxy/tools/console/commandexecutor.py b/mitmproxy/tools/console/commandexecutor.py
index 829daee1..26f92238 100644
--- a/mitmproxy/tools/console/commandexecutor.py
+++ b/mitmproxy/tools/console/commandexecutor.py
@@ -23,6 +23,10 @@ class CommandExecutor:
signals.status_message.send(
message="Command returned %s flows" % len(ret)
)
+ elif type(ret) == flow.Flow:
+ signals.status_message.send(
+ message="Command returned 1 flow"
+ )
else:
self.master.overlay(
overlay.DataViewerOverlay(
diff --git a/mitmproxy/tools/console/commands.py b/mitmproxy/tools/console/commands.py
index 20efcee3..1183ee9d 100644
--- a/mitmproxy/tools/console/commands.py
+++ b/mitmproxy/tools/console/commands.py
@@ -124,7 +124,7 @@ class CommandHelp(urwid.Frame):
class Commands(urwid.Pile, layoutwidget.LayoutWidget):
- title = "Commands"
+ title = "Command Reference"
keyctx = "commands"
def __init__(self, master):
diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py
index 453e9e1c..20d54bc6 100644
--- a/mitmproxy/tools/console/consoleaddons.py
+++ b/mitmproxy/tools/console/consoleaddons.py
@@ -7,6 +7,8 @@ from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import contentviews
from mitmproxy.utils import strutils
+import mitmproxy.types
+
from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import signals
@@ -102,7 +104,7 @@ class ConsoleAddon:
@command.command("console.layout.options")
def layout_options(self) -> typing.Sequence[str]:
"""
- Returns the available options for the consoler_layout option.
+ Returns the available options for the console_layout option.
"""
return ["single", "vertical", "horizontal"]
@@ -218,8 +220,8 @@ class ConsoleAddon:
self,
prompt: str,
choices: typing.Sequence[str],
- cmd: command.Cmd,
- *args: command.Arg
+ cmd: mitmproxy.types.Cmd,
+ *args: mitmproxy.types.Arg
) -> None:
"""
Prompt the user to choose from a specified list of strings, then
@@ -241,7 +243,11 @@ class ConsoleAddon:
@command.command("console.choose.cmd")
def console_choose_cmd(
- self, prompt: str, choicecmd: command.Cmd, *cmd: command.Arg
+ self,
+ prompt: str,
+ choicecmd: mitmproxy.types.Cmd,
+ subcmd: mitmproxy.types.Cmd,
+ *args: mitmproxy.types.Arg
) -> None:
"""
Prompt the user to choose from a list of strings returned by a
@@ -252,10 +258,10 @@ class ConsoleAddon:
def callback(opt):
# We're now outside of the call context...
- repl = " ".join(cmd)
+ repl = " ".join(args)
repl = repl.replace("{choice}", opt)
try:
- self.master.commands.call(repl)
+ self.master.commands.call(subcmd + " " + repl)
except exceptions.CommandError as e:
signals.status_message.send(message=str(e))
@@ -316,6 +322,7 @@ class ConsoleAddon:
signals.pop_view_state.send(self)
@command.command("console.bodyview")
+ @command.argument("part", type=mitmproxy.types.Choice("console.bodyview.options"))
def bodyview(self, f: flow.Flow, part: str) -> None:
"""
Spawn an external viewer for a flow request or response body based
@@ -332,6 +339,13 @@ class ConsoleAddon:
raise exceptions.CommandError("No content to view.")
self.master.spawn_external_viewer(content, t)
+ @command.command("console.bodyview.options")
+ def bodyview_options(self) -> typing.Sequence[str]:
+ """
+ Possible parts for console.bodyview.
+ """
+ return ["request", "response"]
+
@command.command("console.edit.focus.options")
def edit_focus_options(self) -> typing.Sequence[str]:
"""
@@ -346,17 +360,24 @@ class ConsoleAddon:
"reason",
"request-headers",
"response-headers",
+ "request-body",
+ "response-body",
"status_code",
"set-cookies",
"url",
]
@command.command("console.edit.focus")
- @command.argument("part", type=command.Choice("console.edit.focus.options"))
+ @command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options"))
def edit_focus(self, part: str) -> None:
"""
Edit a component of the currently focused flow.
"""
+ flow = self.master.view.focus.flow
+ # This shouldn't be necessary once this command is "console.edit @focus",
+ # but for now it is.
+ if not flow:
+ raise exceptions.CommandError("No flow selected.")
if part == "cookies":
self.master.switch_view("edit_focus_cookies")
elif part == "form":
@@ -369,6 +390,21 @@ class ConsoleAddon:
self.master.switch_view("edit_focus_request_headers")
elif part == "response-headers":
self.master.switch_view("edit_focus_response_headers")
+ elif part in ("request-body", "response-body"):
+ if part == "request-body":
+ message = flow.request
+ else:
+ message = flow.response
+ if not message:
+ raise exceptions.CommandError("Flow has no {}.".format(part.split("-")[0]))
+ c = self.master.spawn_editor(message.get_content(strict=False) or b"")
+ # Fix an issue caused by some editors when editing a
+ # request/response body. Many editors make it hard to save a
+ # file without a terminating newline on the last line. When
+ # editing message bodies, this can cause problems. For now, I
+ # just strip the newlines off the end of the body when we return
+ # from an editor.
+ message.content = c.rstrip(b"\n")
elif part == "set-cookies":
self.master.switch_view("edit_focus_setcookies")
elif part in ["url", "method", "status_code", "reason"]:
@@ -404,14 +440,14 @@ class ConsoleAddon:
self._grideditor().cmd_delete()
@command.command("console.grideditor.load")
- def grideditor_load(self, path: command.Path) -> None:
+ def grideditor_load(self, path: mitmproxy.types.Path) -> None:
"""
Read a file into the currrent cell.
"""
self._grideditor().cmd_read_file(path)
@command.command("console.grideditor.load_escaped")
- def grideditor_load_escaped(self, path: command.Path) -> None:
+ def grideditor_load_escaped(self, path: mitmproxy.types.Path) -> None:
"""
Read a file containing a Python-style escaped string into the
currrent cell.
@@ -419,7 +455,7 @@ class ConsoleAddon:
self._grideditor().cmd_read_file_escaped(path)
@command.command("console.grideditor.save")
- def grideditor_save(self, path: command.Path) -> None:
+ def grideditor_save(self, path: mitmproxy.types.Path) -> None:
"""
Save data to file as a CSV.
"""
@@ -440,7 +476,7 @@ class ConsoleAddon:
self._grideditor().cmd_spawn_editor()
@command.command("console.flowview.mode.set")
- @command.argument("mode", type=command.Choice("console.flowview.mode.options"))
+ @command.argument("mode", type=mitmproxy.types.Choice("console.flowview.mode.options"))
def flowview_mode_set(self, mode: str) -> None:
"""
Set the display mode for the current flow view.
@@ -498,8 +534,8 @@ class ConsoleAddon:
self,
contexts: typing.Sequence[str],
key: str,
- cmd: command.Cmd,
- *args: command.Arg
+ cmd: mitmproxy.types.Cmd,
+ *args: mitmproxy.types.Arg
) -> None:
"""
Bind a shortcut key.
diff --git a/mitmproxy/tools/console/defaultkeys.py b/mitmproxy/tools/console/defaultkeys.py
index 0fdec10c..f8a3df2d 100644
--- a/mitmproxy/tools/console/defaultkeys.py
+++ b/mitmproxy/tools/console/defaultkeys.py
@@ -41,7 +41,7 @@ def map(km):
"e",
"""
console.choose.cmd Format export.formats
- console.command export.file {choice} @focus ''
+ console.command export.file {choice} @focus
""",
["flowlist", "flowview"],
"Export this flow to file"
diff --git a/mitmproxy/tools/console/keymap.py b/mitmproxy/tools/console/keymap.py
index b268906c..fbb569a4 100644
--- a/mitmproxy/tools/console/keymap.py
+++ b/mitmproxy/tools/console/keymap.py
@@ -17,6 +17,13 @@ Contexts = {
}
+navkeys = [
+ "m_start", "m_end", "m_next", "m_select",
+ "up", "down", "page_up", "page_down",
+ "left", "right"
+]
+
+
class Binding:
def __init__(self, key, command, contexts, help):
self.key, self.command, self.contexts = key, command, sorted(contexts)
@@ -122,3 +129,13 @@ class Keymap:
if b:
return self.executor(b.command)
return key
+
+ def handle_only(self, context: str, key: str) -> typing.Optional[str]:
+ """
+ Like handle, but ignores global bindings. Returns the key if it has
+ not been handled, or None.
+ """
+ b = self.get(context, key)
+ if b:
+ return self.executor(b.command)
+ return key
diff --git a/mitmproxy/tools/console/options.py b/mitmproxy/tools/console/options.py
index 4d55aeec..54772cf0 100644
--- a/mitmproxy/tools/console/options.py
+++ b/mitmproxy/tools/console/options.py
@@ -117,6 +117,7 @@ class OptionListWalker(urwid.ListWalker):
def stop_editing(self):
self.editing = False
self.focus_obj = self._get(self.index, False)
+ self.set_focus(self.index)
self._modified()
def get_edit_text(self):
diff --git a/mitmproxy/tools/console/overlay.py b/mitmproxy/tools/console/overlay.py
index f97f23f9..55acbfdd 100644
--- a/mitmproxy/tools/console/overlay.py
+++ b/mitmproxy/tools/console/overlay.py
@@ -5,6 +5,7 @@ import urwid
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import grideditor
from mitmproxy.tools.console import layoutwidget
+from mitmproxy.tools.console import keymap
class SimpleOverlay(urwid.Overlay, layoutwidget.LayoutWidget):
@@ -114,13 +115,21 @@ class Chooser(urwid.WidgetWrap, layoutwidget.LayoutWidget):
return True
def keypress(self, size, key):
- key = self.master.keymap.handle("chooser", key)
+ key = self.master.keymap.handle_only("chooser", key)
if key == "m_select":
self.callback(self.choices[self.walker.index])
signals.pop_view_state.send(self)
+ return
elif key == "esc":
signals.pop_view_state.send(self)
- return super().keypress(size, key)
+ return
+
+ binding = self.master.keymap.get("global", key)
+ # This is extremely awkward. We need a better way to match nav keys only.
+ if binding and binding.command.startswith("console.nav"):
+ self.master.keymap.handle("global", key)
+ elif key in keymap.navkeys:
+ return super().keypress(size, key)
class OptionsOverlay(urwid.WidgetWrap, layoutwidget.LayoutWidget):
diff --git a/mitmproxy/tools/console/palettes.py b/mitmproxy/tools/console/palettes.py
index 7fbdcfd8..465fd574 100644
--- a/mitmproxy/tools/console/palettes.py
+++ b/mitmproxy/tools/console/palettes.py
@@ -32,6 +32,9 @@ class Palette:
# Grid Editor
'focusfield', 'focusfield_error', 'field_error', 'editfield',
+
+ # Commander
+ 'commander_command', 'commander_invalid', 'commander_hint'
]
high = None # type: typing.Mapping[str, typing.Sequence[str]]
@@ -117,6 +120,11 @@ class LowDark(Palette):
focusfield_error = ('dark red', 'light gray'),
field_error = ('dark red', 'default'),
editfield = ('white', 'default'),
+
+
+ commander_command = ('white,bold', 'default'),
+ commander_invalid = ('light red', 'default'),
+ commander_hint = ('dark gray', 'default'),
)
@@ -183,6 +191,10 @@ class LowLight(Palette):
focusfield_error = ('dark red', 'light gray'),
field_error = ('dark red', 'black'),
editfield = ('black', 'default'),
+
+ commander_command = ('dark magenta', 'default'),
+ commander_invalid = ('light red', 'default'),
+ commander_hint = ('light gray', 'default'),
)
@@ -267,6 +279,10 @@ class SolarizedLight(LowLight):
focusfield_error = (sol_red, sol_base2),
field_error = (sol_red, 'default'),
editfield = (sol_base01, 'default'),
+
+ commander_command = (sol_cyan, 'default'),
+ commander_invalid = (sol_orange, 'default'),
+ commander_hint = (sol_base1, 'default'),
)
@@ -317,6 +333,10 @@ class SolarizedDark(LowDark):
focusfield_error = (sol_red, sol_base02),
field_error = (sol_red, 'default'),
editfield = (sol_base1, 'default'),
+
+ commander_command = (sol_blue, 'default'),
+ commander_invalid = (sol_orange, 'default'),
+ commander_hint = (sol_base00, 'default'),
)
diff --git a/mitmproxy/types.py b/mitmproxy/types.py
new file mode 100644
index 00000000..8ae8b309
--- /dev/null
+++ b/mitmproxy/types.py
@@ -0,0 +1,445 @@
+import os
+import glob
+import typing
+
+from mitmproxy import exceptions
+from mitmproxy import flow
+
+
+class Path(str):
+ pass
+
+
+class Cmd(str):
+ pass
+
+
+class Arg(str):
+ pass
+
+
+class Unknown(str):
+ pass
+
+
+class CutSpec(typing.Sequence[str]):
+ pass
+
+
+class Data(typing.Sequence[typing.Sequence[typing.Union[str, bytes]]]):
+ pass
+
+
+class Choice:
+ def __init__(self, options_command):
+ self.options_command = options_command
+
+ def __instancecheck__(self, instance): # pragma: no cover
+ # return false here so that arguments are piped through parsearg,
+ # which does extended validation.
+ return False
+
+
+# One of the many charming things about mypy is that introducing type
+# annotations can cause circular dependencies where there were none before.
+# Rather than putting types and the CommandManger in the same file, we introduce
+# a stub type with the signature we use.
+class _CommandBase:
+ commands = {} # type: typing.MutableMapping[str, typing.Any]
+
+ def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any:
+ raise NotImplementedError
+
+ def call(self, cmd: str) -> typing.Any:
+ raise NotImplementedError
+
+
+class _BaseType:
+ typ = object # type: typing.Type
+ display = "" # type: str
+
+ def completion(
+ self, manager: _CommandBase, t: typing.Any, s: str
+ ) -> typing.Sequence[str]:
+ """
+ Returns a list of completion strings for a given prefix. The strings
+ returned don't necessarily need to be suffixes of the prefix, since
+ completers will do prefix filtering themselves..
+ """
+ raise NotImplementedError
+
+ def parse(
+ self, manager: _CommandBase, typ: typing.Any, s: str
+ ) -> typing.Any:
+ """
+ Parse a string, given the specific type instance (to allow rich type annotations like Choice) and a string.
+
+ Raises exceptions.TypeError if the value is invalid.
+ """
+ raise NotImplementedError
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ """
+ Check if data is valid for this type.
+ """
+ raise NotImplementedError
+
+
+class _BoolType(_BaseType):
+ typ = bool
+ display = "bool"
+
+ def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ return ["false", "true"]
+
+ def parse(self, manager: _CommandBase, t: type, s: str) -> bool:
+ if s == "true":
+ return True
+ elif s == "false":
+ return False
+ else:
+ raise exceptions.TypeError(
+ "Booleans are 'true' or 'false', got %s" % s
+ )
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ return val in [True, False]
+
+
+class _StrType(_BaseType):
+ typ = str
+ display = "str"
+
+ def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ return []
+
+ def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ return s
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ return isinstance(val, str)
+
+
+class _UnknownType(_BaseType):
+ typ = Unknown
+ display = "unknown"
+
+ def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ return []
+
+ def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ return s
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ return False
+
+
+class _IntType(_BaseType):
+ typ = int
+ display = "int"
+
+ def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ return []
+
+ def parse(self, manager: _CommandBase, t: type, s: str) -> int:
+ try:
+ return int(s)
+ except ValueError as e:
+ raise exceptions.TypeError from e
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ return isinstance(val, int)
+
+
+class _PathType(_BaseType):
+ typ = Path
+ display = "path"
+
+ def completion(self, manager: _CommandBase, t: type, start: str) -> typing.Sequence[str]:
+ if not start:
+ start = "./"
+ path = os.path.expanduser(start)
+ ret = []
+ if os.path.isdir(path):
+ files = glob.glob(os.path.join(path, "*"))
+ prefix = start
+ else:
+ files = glob.glob(path + "*")
+ prefix = os.path.dirname(start)
+ prefix = prefix or "./"
+ for f in files:
+ display = os.path.join(prefix, os.path.normpath(os.path.basename(f)))
+ if os.path.isdir(f):
+ display += "/"
+ ret.append(display)
+ if not ret:
+ ret = [start]
+ ret.sort()
+ return ret
+
+ def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ return s
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ return isinstance(val, str)
+
+
+class _CmdType(_BaseType):
+ typ = Cmd
+ display = "cmd"
+
+ def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ return list(manager.commands.keys())
+
+ def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ if s not in manager.commands:
+ raise exceptions.TypeError("Unknown command: %s" % s)
+ return s
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ return val in manager.commands
+
+
+class _ArgType(_BaseType):
+ typ = Arg
+ display = "arg"
+
+ def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ return []
+
+ def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ return s
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ return isinstance(val, str)
+
+
+class _StrSeqType(_BaseType):
+ typ = typing.Sequence[str]
+ display = "[str]"
+
+ def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ return []
+
+ def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ return [x.strip() for x in s.split(",")]
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ if isinstance(val, str) or isinstance(val, bytes):
+ return False
+ try:
+ for v in val:
+ if not isinstance(v, str):
+ return False
+ except TypeError:
+ return False
+ return True
+
+
+class _CutSpecType(_BaseType):
+ typ = CutSpec
+ display = "[cut]"
+ valid_prefixes = [
+ "request.method",
+ "request.scheme",
+ "request.host",
+ "request.http_version",
+ "request.port",
+ "request.path",
+ "request.url",
+ "request.text",
+ "request.content",
+ "request.raw_content",
+ "request.timestamp_start",
+ "request.timestamp_end",
+ "request.header[",
+
+ "response.status_code",
+ "response.reason",
+ "response.text",
+ "response.content",
+ "response.timestamp_start",
+ "response.timestamp_end",
+ "response.raw_content",
+ "response.header[",
+
+ "client_conn.address.port",
+ "client_conn.address.host",
+ "client_conn.tls_version",
+ "client_conn.sni",
+ "client_conn.ssl_established",
+
+ "server_conn.address.port",
+ "server_conn.address.host",
+ "server_conn.ip_address.host",
+ "server_conn.tls_version",
+ "server_conn.sni",
+ "server_conn.ssl_established",
+ ]
+
+ def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ spec = s.split(",")
+ opts = []
+ for pref in self.valid_prefixes:
+ spec[-1] = pref
+ opts.append(",".join(spec))
+ return opts
+
+ def parse(self, manager: _CommandBase, t: type, s: str) -> CutSpec:
+ parts = s.split(",") # type: typing.Any
+ return parts
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ if not isinstance(val, str):
+ return False
+ parts = [x.strip() for x in val.split(",")]
+ for p in parts:
+ for pref in self.valid_prefixes:
+ if p.startswith(pref):
+ break
+ else:
+ return False
+ return True
+
+
+class _BaseFlowType(_BaseType):
+ viewmarkers = [
+ "@all",
+ "@focus",
+ "@shown",
+ "@hidden",
+ "@marked",
+ "@unmarked",
+ ]
+ valid_prefixes = viewmarkers + [
+ "~q",
+ "~s",
+ "~a",
+ "~hq",
+ "~hs",
+ "~b",
+ "~bq",
+ "~bs",
+ "~t",
+ "~d",
+ "~m",
+ "~u",
+ "~c",
+ ]
+
+ def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ return self.valid_prefixes
+
+
+class _FlowType(_BaseFlowType):
+ typ = flow.Flow
+ display = "flow"
+
+ def parse(self, manager: _CommandBase, t: type, s: str) -> flow.Flow:
+ try:
+ flows = manager.call_args("view.resolve", [s])
+ except exceptions.CommandError as e:
+ raise exceptions.TypeError from e
+ if len(flows) != 1:
+ raise exceptions.TypeError(
+ "Command requires one flow, specification matched %s." % len(flows)
+ )
+ return flows[0]
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ return isinstance(val, flow.Flow)
+
+
+class _FlowsType(_BaseFlowType):
+ typ = typing.Sequence[flow.Flow]
+ display = "[flow]"
+
+ def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[flow.Flow]:
+ try:
+ return manager.call_args("view.resolve", [s])
+ except exceptions.CommandError as e:
+ raise exceptions.TypeError from e
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ try:
+ for v in val:
+ if not isinstance(v, flow.Flow):
+ return False
+ except TypeError:
+ return False
+ return True
+
+
+class _DataType(_BaseType):
+ typ = Data
+ display = "[data]"
+
+ def completion(
+ self, manager: _CommandBase, t: type, s: str
+ ) -> typing.Sequence[str]: # pragma: no cover
+ raise exceptions.TypeError("data cannot be passed as argument")
+
+ def parse(
+ self, manager: _CommandBase, t: type, s: str
+ ) -> typing.Any: # pragma: no cover
+ raise exceptions.TypeError("data cannot be passed as argument")
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ # FIXME: validate that all rows have equal length, and all columns have equal types
+ try:
+ for row in val:
+ for cell in row:
+ if not (isinstance(cell, str) or isinstance(cell, bytes)):
+ return False
+ except TypeError:
+ return False
+ return True
+
+
+class _ChoiceType(_BaseType):
+ typ = Choice
+ display = "choice"
+
+ def completion(self, manager: _CommandBase, t: Choice, s: str) -> typing.Sequence[str]:
+ return manager.call(t.options_command)
+
+ def parse(self, manager: _CommandBase, t: Choice, s: str) -> str:
+ opts = manager.call(t.options_command)
+ if s not in opts:
+ raise exceptions.TypeError("Invalid choice.")
+ return s
+
+ def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ try:
+ opts = manager.call(typ.options_command)
+ except exceptions.CommandError:
+ return False
+ return val in opts
+
+
+class TypeManager:
+ def __init__(self, *types):
+ self.typemap = {}
+ for t in types:
+ self.typemap[t.typ] = t()
+
+ def get(self, t: type, default=None) -> _BaseType:
+ if type(t) in self.typemap:
+ return self.typemap[type(t)]
+ return self.typemap.get(t, default)
+
+
+CommandTypes = TypeManager(
+ _ArgType,
+ _BoolType,
+ _ChoiceType,
+ _CmdType,
+ _CutSpecType,
+ _DataType,
+ _FlowType,
+ _FlowsType,
+ _IntType,
+ _PathType,
+ _StrType,
+ _StrSeqType,
+)
diff --git a/mitmproxy/utils/typecheck.py b/mitmproxy/utils/typecheck.py
index 87a0e804..1070fad0 100644
--- a/mitmproxy/utils/typecheck.py
+++ b/mitmproxy/utils/typecheck.py
@@ -1,41 +1,6 @@
import typing
-def check_command_type(value: typing.Any, typeinfo: typing.Any) -> bool:
- """
- Check if the provided value is an instance of typeinfo. Returns True if the
- types match, False otherwise. This function supports only those types
- required for command return values.
- """
- typename = str(typeinfo)
- if typename.startswith("typing.Sequence"):
- try:
- T = typeinfo.__args__[0] # type: ignore
- except AttributeError:
- # Python 3.5.0
- T = typeinfo.__parameters__[0] # type: ignore
- if not isinstance(value, (tuple, list)):
- return False
- for v in value:
- if not check_command_type(v, T):
- return False
- elif typename.startswith("typing.Union"):
- try:
- types = typeinfo.__args__ # type: ignore
- except AttributeError:
- # Python 3.5.x
- types = typeinfo.__union_params__ # type: ignore
- for T in types:
- checks = [check_command_type(value, T) for T in types]
- if not any(checks):
- return False
- elif value is None and typeinfo is None:
- return True
- elif not isinstance(value, typeinfo):
- return False
- return True
-
-
def check_option_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
"""
Check if the provided value is an instance of typeinfo and raises a
diff --git a/mitmproxy/websocket.py b/mitmproxy/websocket.py
index 6c1e7000..8efd4117 100644
--- a/mitmproxy/websocket.py
+++ b/mitmproxy/websocket.py
@@ -1,6 +1,8 @@
import time
from typing import List, Optional
+from mitmproxy.contrib import wsproto
+
from mitmproxy import flow
from mitmproxy.net import websockets
from mitmproxy.coretypes import serializable
@@ -11,7 +13,7 @@ class WebSocketMessage(serializable.Serializable):
def __init__(
self, type: int, from_client: bool, content: bytes, timestamp: Optional[int]=None
) -> None:
- self.type = type
+ self.type = wsproto.frame_protocol.Opcode(type) # type: ignore
self.from_client = from_client
self.content = content
self.timestamp = timestamp or int(time.time()) # type: int
@@ -21,13 +23,14 @@ class WebSocketMessage(serializable.Serializable):
return cls(*state)
def get_state(self):
- return self.type, self.from_client, self.content, self.timestamp
+ return int(self.type), self.from_client, self.content, self.timestamp
def set_state(self, state):
self.type, self.from_client, self.content, self.timestamp = state
+ self.type = wsproto.frame_protocol.Opcode(self.type) # replace enum with bare int
def __repr__(self):
- if self.type == websockets.OPCODE.TEXT:
+ if self.type == wsproto.frame_protocol.Opcode.TEXT:
return "text message: {}".format(repr(self.content))
else:
return "binary message: {}".format(strutils.bytes_to_escaped_str(self.content))
@@ -42,7 +45,7 @@ class WebSocketFlow(flow.Flow):
super().__init__("websocket", client_conn, server_conn, live)
self.messages = [] # type: List[WebSocketMessage]
self.close_sender = 'client'
- self.close_code = '(status code missing)'
+ self.close_code = wsproto.frame_protocol.CloseReason.NORMAL_CLOSURE
self.close_message = '(message missing)'
self.close_reason = 'unknown status code'
self.stream = False
@@ -69,7 +72,7 @@ class WebSocketFlow(flow.Flow):
_stateobject_attributes.update(dict(
messages=List[WebSocketMessage],
close_sender=str,
- close_code=str,
+ close_code=int,
close_message=str,
close_reason=str,
client_key=str,
@@ -83,6 +86,11 @@ class WebSocketFlow(flow.Flow):
# dumping the handshake_flow will include the WebSocketFlow too.
))
+ def get_state(self):
+ d = super().get_state()
+ d['close_code'] = int(d['close_code']) # replace enum with bare int
+ return d
+
@classmethod
def from_state(cls, state):
f = cls(None, None, None)
diff --git a/setup.cfg b/setup.cfg
index fd31d15b..7c754722 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -19,6 +19,9 @@ exclude_lines =
pragma: no cover
raise NotImplementedError()
+[mypy-mitmproxy.contrib.*]
+ignore_errors = True
+
[tool:full_coverage]
exclude =
mitmproxy/proxy/protocol/base.py
diff --git a/test/mitmproxy/addons/test_cut.py b/test/mitmproxy/addons/test_cut.py
index 0a523fff..71e699db 100644
--- a/test/mitmproxy/addons/test_cut.py
+++ b/test/mitmproxy/addons/test_cut.py
@@ -135,6 +135,11 @@ def test_cut():
with pytest.raises(exceptions.CommandError):
assert c.cut(tflows, ["__dict__"]) == [[""]]
+ with taddons.context():
+ tflows = [tflow.tflow(resp=False)]
+ assert c.cut(tflows, ["response.reason"]) == [[""]]
+ assert c.cut(tflows, ["response.header[key]"]) == [[""]]
+
c = cut.Cut()
with taddons.context():
tflows = [tflow.ttcpflow()]
diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py
index a4e425cd..2dee708f 100644
--- a/test/mitmproxy/addons/test_save.py
+++ b/test/mitmproxy/addons/test_save.py
@@ -44,6 +44,19 @@ def test_tcp(tmpdir):
assert rd(p)
+def test_websocket(tmpdir):
+ sa = save.Save()
+ with taddons.context() as tctx:
+ p = str(tmpdir.join("foo"))
+ tctx.configure(sa, save_stream_file=p)
+
+ f = tflow.twebsocketflow()
+ sa.websocket_start(f)
+ sa.websocket_end(f)
+ tctx.configure(sa, save_stream_file=None)
+ assert rd(p)
+
+
def test_save_command(tmpdir):
sa = save.Save()
with taddons.context() as tctx:
diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py
index 50ad3d55..c777192d 100644
--- a/test/mitmproxy/test_command.py
+++ b/test/mitmproxy/test_command.py
@@ -4,11 +4,10 @@ from mitmproxy import flow
from mitmproxy import exceptions
from mitmproxy.test import tflow
from mitmproxy.test import taddons
+import mitmproxy.types
import io
import pytest
-from mitmproxy.utils import typecheck
-
class TAddon:
@command.command("cmd1")
@@ -24,8 +23,12 @@ class TAddon:
def cmd3(self, foo: int) -> int:
return foo
+ @command.command("cmd4")
+ def cmd4(self, a: int, b: str, c: mitmproxy.types.Path) -> str:
+ return "ok"
+
@command.command("subcommand")
- def subcommand(self, cmd: command.Cmd, *args: command.Arg) -> str:
+ def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.Arg) -> str:
return "ok"
@command.command("empty")
@@ -39,12 +42,16 @@ class TAddon:
def choices(self) -> typing.Sequence[str]:
return ["one", "two", "three"]
- @command.argument("arg", type=command.Choice("choices"))
+ @command.argument("arg", type=mitmproxy.types.Choice("choices"))
def choose(self, arg: str) -> typing.Sequence[str]:
return ["one", "two", "three"]
@command.command("path")
- def path(self, arg: command.Path) -> None:
+ def path(self, arg: mitmproxy.types.Path) -> None:
+ pass
+
+ @command.command("flow")
+ def flow(self, f: flow.Flow, s: str) -> None:
pass
@@ -79,53 +86,138 @@ class TestCommand:
[
"foo bar",
[
- command.ParseResult(value = "foo", type = command.Cmd),
- command.ParseResult(value = "bar", type = str)
+ command.ParseResult(
+ value = "foo", type = mitmproxy.types.Cmd, valid = False
+ ),
+ command.ParseResult(
+ value = "bar", type = mitmproxy.types.Unknown, valid = False
+ )
],
+ [],
],
[
- "foo 'bar",
+ "cmd1 'bar",
[
- command.ParseResult(value = "foo", type = command.Cmd),
- command.ParseResult(value = "'bar", type = str)
- ]
+ command.ParseResult(value = "cmd1", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value = "'bar", type = str, valid = True)
+ ],
+ [],
+ ],
+ [
+ "a",
+ [command.ParseResult(value = "a", type = mitmproxy.types.Cmd, valid = False)],
+ [],
+ ],
+ [
+ "",
+ [command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False)],
+ []
],
- ["a", [command.ParseResult(value = "a", type = command.Cmd)]],
- ["", [command.ParseResult(value = "", type = command.Cmd)]],
[
"cmd3 1",
[
- command.ParseResult(value = "cmd3", type = command.Cmd),
- command.ParseResult(value = "1", type = int),
- ]
+ command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value = "1", type = int, valid = True),
+ ],
+ []
],
[
"cmd3 ",
[
- command.ParseResult(value = "cmd3", type = command.Cmd),
- command.ParseResult(value = "", type = int),
- ]
+ command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value = "", type = int, valid = False),
+ ],
+ []
],
[
"subcommand ",
[
- command.ParseResult(value = "subcommand", type = command.Cmd),
- command.ParseResult(value = "", type = command.Cmd),
- ]
+ command.ParseResult(
+ value = "subcommand", type = mitmproxy.types.Cmd, valid = True,
+ ),
+ command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False),
+ ],
+ ["arg"],
],
[
"subcommand cmd3 ",
[
- command.ParseResult(value = "subcommand", type = command.Cmd),
- command.ParseResult(value = "cmd3", type = command.Cmd),
- command.ParseResult(value = "", type = int),
- ]
+ command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value = "", type = int, valid = False),
+ ],
+ []
+ ],
+ [
+ "cmd4",
+ [
+ command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True),
+ ],
+ ["int", "str", "path"]
+ ],
+ [
+ "cmd4 ",
+ [
+ command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value = "", type = int, valid = False),
+ ],
+ ["str", "path"]
+ ],
+ [
+ "cmd4 1",
+ [
+ command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value = "1", type = int, valid = True),
+ ],
+ ["str", "path"]
+ ],
+ [
+ "cmd4 1",
+ [
+ command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value = "1", type = int, valid = True),
+ ],
+ ["str", "path"]
+ ],
+ [
+ "flow",
+ [
+ command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
+ ],
+ ["flow", "str"]
+ ],
+ [
+ "flow ",
+ [
+ command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value = "", type = flow.Flow, valid = False),
+ ],
+ ["str"]
+ ],
+ [
+ "flow x",
+ [
+ command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value = "x", type = flow.Flow, valid = False),
+ ],
+ ["str"]
+ ],
+ [
+ "flow x ",
+ [
+ command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value = "x", type = flow.Flow, valid = False),
+ command.ParseResult(value = "", type = str, valid = True),
+ ],
+ []
],
]
with taddons.context() as tctx:
tctx.master.addons.add(TAddon())
- for s, expected in tests:
- assert tctx.master.commands.parse_partial(s) == expected
+ for s, expected, expectedremain in tests:
+ current, remain = tctx.master.commands.parse_partial(s)
+ assert current == expected
+ assert expectedremain == remain
def test_simple():
@@ -139,7 +231,7 @@ def test_simple():
c.call("nonexistent")
with pytest.raises(exceptions.CommandError, match="Invalid"):
c.call("")
- with pytest.raises(exceptions.CommandError, match="Usage"):
+ with pytest.raises(exceptions.CommandError, match="argument mismatch"):
c.call("one.two too many args")
c.add("empty", a.empty)
@@ -154,15 +246,15 @@ def test_typename():
assert command.typename(str) == "str"
assert command.typename(typing.Sequence[flow.Flow]) == "[flow]"
- assert command.typename(command.Cuts) == "[cuts]"
- assert command.typename(typing.Sequence[command.Cut]) == "[cut]"
+ assert command.typename(mitmproxy.types.Data) == "[data]"
+ assert command.typename(mitmproxy.types.CutSpec) == "[cut]"
assert command.typename(flow.Flow) == "flow"
assert command.typename(typing.Sequence[str]) == "[str]"
- assert command.typename(command.Choice("foo")) == "choice"
- assert command.typename(command.Path) == "path"
- assert command.typename(command.Cmd) == "cmd"
+ assert command.typename(mitmproxy.types.Choice("foo")) == "choice"
+ assert command.typename(mitmproxy.types.Path) == "path"
+ assert command.typename(mitmproxy.types.Cmd) == "cmd"
class DummyConsole:
@@ -172,7 +264,7 @@ class DummyConsole:
return [tflow.tflow(resp=True)] * n
@command.command("cut")
- def cut(self, spec: str) -> command.Cuts:
+ def cut(self, spec: str) -> mitmproxy.types.Data:
return [["test"]]
@@ -180,55 +272,11 @@ def test_parsearg():
with taddons.context() as tctx:
tctx.master.addons.add(DummyConsole())
assert command.parsearg(tctx.master.commands, "foo", str) == "foo"
-
- assert command.parsearg(tctx.master.commands, "1", int) == 1
+ with pytest.raises(exceptions.CommandError, match="Unsupported"):
+ command.parsearg(tctx.master.commands, "foo", type)
with pytest.raises(exceptions.CommandError):
command.parsearg(tctx.master.commands, "foo", int)
- assert command.parsearg(tctx.master.commands, "true", bool) is True
- assert command.parsearg(tctx.master.commands, "false", bool) is False
- with pytest.raises(exceptions.CommandError):
- command.parsearg(tctx.master.commands, "flobble", bool)
-
- assert len(command.parsearg(
- tctx.master.commands, "2", typing.Sequence[flow.Flow]
- )) == 2
- assert command.parsearg(tctx.master.commands, "1", flow.Flow)
- with pytest.raises(exceptions.CommandError):
- command.parsearg(tctx.master.commands, "2", flow.Flow)
- with pytest.raises(exceptions.CommandError):
- command.parsearg(tctx.master.commands, "0", flow.Flow)
- with pytest.raises(exceptions.CommandError):
- command.parsearg(tctx.master.commands, "foo", Exception)
-
- assert command.parsearg(
- tctx.master.commands, "foo", command.Cuts
- ) == [["test"]]
-
- assert command.parsearg(
- tctx.master.commands, "foo", typing.Sequence[str]
- ) == ["foo"]
- assert command.parsearg(
- tctx.master.commands, "foo, bar", typing.Sequence[str]
- ) == ["foo", "bar"]
-
- a = TAddon()
- tctx.master.commands.add("choices", a.choices)
- assert command.parsearg(
- tctx.master.commands, "one", command.Choice("choices"),
- ) == "one"
- with pytest.raises(exceptions.CommandError):
- assert command.parsearg(
- tctx.master.commands, "invalid", command.Choice("choices"),
- )
-
- assert command.parsearg(
- tctx.master.commands, "foo", command.Path
- ) == "foo"
- assert command.parsearg(
- tctx.master.commands, "foo", command.Cmd
- ) == "foo"
-
class TDec:
@command.command("cmd1")
@@ -265,12 +313,3 @@ def test_verify_arg_signature():
command.verify_arg_signature(lambda: None, [1, 2], {})
print('hello there')
command.verify_arg_signature(lambda a, b: None, [1, 2], {})
-
-
-def test_choice():
- """
- basic typechecking for choices should fail as we cannot verify if strings are a valid choice
- at this point.
- """
- c = command.Choice("foo")
- assert not typecheck.check_command_type("foo", c)
diff --git a/test/mitmproxy/test_flow.py b/test/mitmproxy/test_flow.py
index fcc766b5..8cc11a16 100644
--- a/test/mitmproxy/test_flow.py
+++ b/test/mitmproxy/test_flow.py
@@ -97,7 +97,7 @@ class TestSerialize:
class TestFlowMaster:
- def test_load_flow_reverse(self):
+ def test_load_http_flow_reverse(self):
s = tservers.TestState()
opts = options.Options(
mode="reverse:https://use-this-domain"
@@ -108,6 +108,20 @@ class TestFlowMaster:
fm.load_flow(f)
assert s.flows[0].request.host == "use-this-domain"
+ def test_load_websocket_flow(self):
+ s = tservers.TestState()
+ opts = options.Options(
+ mode="reverse:https://use-this-domain"
+ )
+ fm = master.Master(opts)
+ fm.addons.add(s)
+ f = tflow.twebsocketflow()
+ fm.load_flow(f.handshake_flow)
+ fm.load_flow(f)
+ assert s.flows[0].request.host == "use-this-domain"
+ assert s.flows[1].handshake_flow == f.handshake_flow
+ assert len(s.flows[1].messages) == len(f.messages)
+
def test_replay(self):
opts = options.Options()
fm = master.Master(opts)
diff --git a/test/mitmproxy/test_flowfilter.py b/test/mitmproxy/test_flowfilter.py
index c411258a..4eb37d81 100644
--- a/test/mitmproxy/test_flowfilter.py
+++ b/test/mitmproxy/test_flowfilter.py
@@ -420,6 +420,20 @@ class TestMatchingWebSocketFlow:
e = self.err()
assert self.q("~e", e)
+ def test_domain(self):
+ q = self.flow()
+ assert self.q("~d example.com", q)
+ assert not self.q("~d none", q)
+
+ def test_url(self):
+ q = self.flow()
+ assert self.q("~u example.com", q)
+ assert self.q("~u example.com/ws", q)
+ assert not self.q("~u moo/path", q)
+
+ q.handshake_flow = None
+ assert not self.q("~u example.com", q)
+
def test_body(self):
f = self.flow()
diff --git a/test/mitmproxy/test_typemanager.py b/test/mitmproxy/test_typemanager.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/mitmproxy/test_typemanager.py
diff --git a/test/mitmproxy/test_types.py b/test/mitmproxy/test_types.py
new file mode 100644
index 00000000..72492fa9
--- /dev/null
+++ b/test/mitmproxy/test_types.py
@@ -0,0 +1,237 @@
+import pytest
+import os
+import typing
+import contextlib
+
+from mitmproxy.test import tutils
+import mitmproxy.exceptions
+import mitmproxy.types
+from mitmproxy.test import taddons
+from mitmproxy.test import tflow
+from mitmproxy import command
+from mitmproxy import flow
+
+from . import test_command
+
+
+@contextlib.contextmanager
+def chdir(path: str):
+ old_dir = os.getcwd()
+ os.chdir(path)
+ yield
+ os.chdir(old_dir)
+
+
+def test_bool():
+ with taddons.context() as tctx:
+ b = mitmproxy.types._BoolType()
+ assert b.completion(tctx.master.commands, bool, "b") == ["false", "true"]
+ assert b.parse(tctx.master.commands, bool, "true") is True
+ assert b.parse(tctx.master.commands, bool, "false") is False
+ assert b.is_valid(tctx.master.commands, bool, True) is True
+ assert b.is_valid(tctx.master.commands, bool, "foo") is False
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, bool, "foo")
+
+
+def test_str():
+ with taddons.context() as tctx:
+ b = mitmproxy.types._StrType()
+ assert b.is_valid(tctx.master.commands, str, "foo") is True
+ assert b.is_valid(tctx.master.commands, str, 1) is False
+ assert b.completion(tctx.master.commands, str, "") == []
+ assert b.parse(tctx.master.commands, str, "foo") == "foo"
+
+
+def test_unknown():
+ with taddons.context() as tctx:
+ b = mitmproxy.types._UnknownType()
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Unknown, "foo") is False
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Unknown, 1) is False
+ assert b.completion(tctx.master.commands, mitmproxy.types.Unknown, "") == []
+ assert b.parse(tctx.master.commands, mitmproxy.types.Unknown, "foo") == "foo"
+
+
+def test_int():
+ with taddons.context() as tctx:
+ b = mitmproxy.types._IntType()
+ assert b.is_valid(tctx.master.commands, int, "foo") is False
+ assert b.is_valid(tctx.master.commands, int, 1) is True
+ assert b.completion(tctx.master.commands, int, "b") == []
+ assert b.parse(tctx.master.commands, int, "1") == 1
+ assert b.parse(tctx.master.commands, int, "999") == 999
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, int, "foo")
+
+
+def test_path():
+ with taddons.context() as tctx:
+ b = mitmproxy.types._PathType()
+ assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo"
+ assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar"
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "foo") is True
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, 3) is False
+
+ def normPathOpts(prefix, match):
+ ret = []
+ for s in b.completion(tctx.master.commands, mitmproxy.types.Path, match):
+ s = s[len(prefix):]
+ s = s.replace(os.sep, "/")
+ ret.append(s)
+ return ret
+
+ cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion"))
+ assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/']
+ assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac']
+ with chdir(cd):
+ assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/']
+ assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/']
+ assert b.completion(
+ tctx.master.commands, mitmproxy.types.Path, "nonexistent"
+ ) == ["nonexistent"]
+
+
+def test_cmd():
+ with taddons.context() as tctx:
+ tctx.master.addons.add(test_command.TAddon())
+ b = mitmproxy.types._CmdType()
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Cmd, "foo") is False
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Cmd, "cmd1") is True
+ assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "cmd1") == "cmd1"
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "foo")
+ assert len(
+ b.completion(tctx.master.commands, mitmproxy.types.Cmd, "")
+ ) == len(tctx.master.commands.commands.keys())
+
+
+def test_cutspec():
+ with taddons.context() as tctx:
+ b = mitmproxy.types._CutSpecType()
+ b.parse(tctx.master.commands, mitmproxy.types.CutSpec, "foo,bar") == ["foo", "bar"]
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.CutSpec, 1) is False
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.CutSpec, "foo") is False
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.CutSpec, "request.path") is True
+
+ assert b.completion(
+ tctx.master.commands, mitmproxy.types.CutSpec, "request.p"
+ ) == b.valid_prefixes
+ ret = b.completion(tctx.master.commands, mitmproxy.types.CutSpec, "request.port,f")
+ assert ret[0].startswith("request.port,")
+ assert len(ret) == len(b.valid_prefixes)
+
+
+def test_arg():
+ with taddons.context() as tctx:
+ b = mitmproxy.types._ArgType()
+ assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == []
+ assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo"
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, "foo") is True
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, 1) is False
+
+
+def test_strseq():
+ with taddons.context() as tctx:
+ b = mitmproxy.types._StrSeqType()
+ assert b.completion(tctx.master.commands, typing.Sequence[str], "") == []
+ assert b.parse(tctx.master.commands, typing.Sequence[str], "foo") == ["foo"]
+ assert b.parse(tctx.master.commands, typing.Sequence[str], "foo,bar") == ["foo", "bar"]
+ assert b.is_valid(tctx.master.commands, typing.Sequence[str], ["foo"]) is True
+ assert b.is_valid(tctx.master.commands, typing.Sequence[str], ["a", "b", 3]) is False
+ assert b.is_valid(tctx.master.commands, typing.Sequence[str], 1) is False
+ assert b.is_valid(tctx.master.commands, typing.Sequence[str], "foo") is False
+
+
+class DummyConsole:
+ @command.command("view.resolve")
+ def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
+ if spec == "err":
+ raise mitmproxy.exceptions.CommandError()
+ n = int(spec)
+ return [tflow.tflow(resp=True)] * n
+
+ @command.command("cut")
+ def cut(self, spec: str) -> mitmproxy.types.Data:
+ return [["test"]]
+
+ @command.command("options")
+ def options(self) -> typing.Sequence[str]:
+ return ["one", "two", "three"]
+
+
+def test_flow():
+ with taddons.context() as tctx:
+ tctx.master.addons.add(DummyConsole())
+ b = mitmproxy.types._FlowType()
+ assert len(b.completion(tctx.master.commands, flow.Flow, "")) == len(b.valid_prefixes)
+ assert b.parse(tctx.master.commands, flow.Flow, "1")
+ assert b.is_valid(tctx.master.commands, flow.Flow, tflow.tflow()) is True
+ assert b.is_valid(tctx.master.commands, flow.Flow, "xx") is False
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, flow.Flow, "0")
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, flow.Flow, "2")
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, flow.Flow, "err")
+
+
+def test_flows():
+ with taddons.context() as tctx:
+ tctx.master.addons.add(DummyConsole())
+ b = mitmproxy.types._FlowsType()
+ assert len(
+ b.completion(tctx.master.commands, typing.Sequence[flow.Flow], "")
+ ) == len(b.valid_prefixes)
+ assert b.is_valid(tctx.master.commands, typing.Sequence[flow.Flow], [tflow.tflow()]) is True
+ assert b.is_valid(tctx.master.commands, typing.Sequence[flow.Flow], "xx") is False
+ assert b.is_valid(tctx.master.commands, typing.Sequence[flow.Flow], 0) is False
+ assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "0")) == 0
+ assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "1")) == 1
+ assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "2")) == 2
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "err")
+
+
+def test_data():
+ with taddons.context() as tctx:
+ b = mitmproxy.types._DataType()
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, 0) is False
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, []) is True
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, [["x"]]) is True
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, [[b"x"]]) is True
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, [[1]]) is False
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, mitmproxy.types.Data, "foo")
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, mitmproxy.types.Data, "foo")
+
+
+def test_choice():
+ with taddons.context() as tctx:
+ tctx.master.addons.add(DummyConsole())
+ b = mitmproxy.types._ChoiceType()
+ assert b.is_valid(
+ tctx.master.commands,
+ mitmproxy.types.Choice("options"),
+ "one",
+ ) is True
+ assert b.is_valid(
+ tctx.master.commands,
+ mitmproxy.types.Choice("options"),
+ "invalid",
+ ) is False
+ assert b.is_valid(
+ tctx.master.commands,
+ mitmproxy.types.Choice("nonexistent"),
+ "invalid",
+ ) is False
+ comp = b.completion(tctx.master.commands, mitmproxy.types.Choice("options"), "")
+ assert comp == ["one", "two", "three"]
+ assert b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "one") == "one"
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "invalid")
+
+
+def test_typemanager():
+ assert mitmproxy.types.CommandTypes.get(bool, None)
+ assert mitmproxy.types.CommandTypes.get(mitmproxy.types.Choice("choide"), None)
diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py
index 823af06d..2a96995d 100644
--- a/test/mitmproxy/tools/console/test_commander.py
+++ b/test/mitmproxy/tools/console/test_commander.py
@@ -1,36 +1,6 @@
-import os
-import contextlib
from mitmproxy.tools.console.commander import commander
from mitmproxy.test import taddons
-from mitmproxy.test import tutils
-
-
-@contextlib.contextmanager
-def chdir(path: str):
- old_dir = os.getcwd()
- os.chdir(path)
- yield
- os.chdir(old_dir)
-
-
-def normPathOpts(prefix, match):
- ret = []
- for s in commander.pathOptions(match):
- s = s[len(prefix):]
- s = s.replace(os.sep, "/")
- ret.append(s)
- return ret
-
-
-def test_pathOptions():
- cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion"))
- assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/']
- assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac']
- with chdir(cd):
- assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/']
- assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/']
- assert commander.pathOptions("nonexistent") == ["nonexistent"]
class TestListCompleter:
@@ -72,16 +42,16 @@ class TestCommandBuffer:
with taddons.context() as tctx:
for start, output in tests:
cb = commander.CommandBuffer(tctx.master)
- cb.buf, cb.cursor = start[0], start[1]
+ cb.text, cb.cursor = start[0], start[1]
cb.backspace()
- assert cb.buf == output[0]
+ assert cb.text == output[0]
assert cb.cursor == output[1]
def test_left(self):
cursors = [3, 2, 1, 0, 0]
with taddons.context() as tctx:
cb = commander.CommandBuffer(tctx.master)
- cb.buf, cb.cursor = "abcd", 4
+ cb.text, cb.cursor = "abcd", 4
for c in cursors:
cb.left()
assert cb.cursor == c
@@ -90,7 +60,7 @@ class TestCommandBuffer:
cursors = [1, 2, 3, 4, 4]
with taddons.context() as tctx:
cb = commander.CommandBuffer(tctx.master)
- cb.buf, cb.cursor = "abcd", 0
+ cb.text, cb.cursor = "abcd", 0
for c in cursors:
cb.right()
assert cb.cursor == c
@@ -104,20 +74,25 @@ class TestCommandBuffer:
with taddons.context() as tctx:
for start, output in tests:
cb = commander.CommandBuffer(tctx.master)
- cb.buf, cb.cursor = start[0], start[1]
+ cb.text, cb.cursor = start[0], start[1]
cb.insert("x")
- assert cb.buf == output[0]
+ assert cb.text == output[0]
assert cb.cursor == output[1]
def test_cycle_completion(self):
with taddons.context() as tctx:
cb = commander.CommandBuffer(tctx.master)
- cb.buf = "foo bar"
- cb.cursor = len(cb.buf)
+ cb.text = "foo bar"
+ cb.cursor = len(cb.text)
cb.cycle_completion()
def test_render(self):
with taddons.context() as tctx:
cb = commander.CommandBuffer(tctx.master)
- cb.buf = "foo"
- assert cb.render() == "foo"
+ cb.text = "foo"
+ assert cb.render()
+
+ def test_flatten(self):
+ with taddons.context() as tctx:
+ cb = commander.CommandBuffer(tctx.master)
+ assert cb.flatten("foo bar") == "foo bar"
diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py
index 66b1884e..5295fff5 100644
--- a/test/mitmproxy/utils/test_typecheck.py
+++ b/test/mitmproxy/utils/test_typecheck.py
@@ -4,7 +4,6 @@ from unittest import mock
import pytest
from mitmproxy.utils import typecheck
-from mitmproxy import command
class TBase:
@@ -88,31 +87,6 @@ def test_check_any():
typecheck.check_option_type("foo", None, typing.Any)
-def test_check_command_type():
- assert(typecheck.check_command_type("foo", str))
- assert(typecheck.check_command_type(["foo"], typing.Sequence[str]))
- assert(not typecheck.check_command_type(["foo", 1], typing.Sequence[str]))
- assert(typecheck.check_command_type(None, None))
- assert(not typecheck.check_command_type(["foo"], typing.Sequence[int]))
- assert(not typecheck.check_command_type("foo", typing.Sequence[int]))
- assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts))
- assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts))
- assert(not typecheck.check_command_type([["foo", 22]], command.Cuts))
-
- # Python 3.5 only defines __parameters__
- m = mock.Mock()
- m.__str__ = lambda self: "typing.Sequence"
- m.__parameters__ = (int,)
-
- typecheck.check_command_type([10], m)
-
- # Python 3.5 only defines __union_params__
- m = mock.Mock()
- m.__str__ = lambda self: "typing.Union"
- m.__union_params__ = (int,)
- assert not typecheck.check_command_type([22], m)
-
-
def test_typesec_to_str():
assert(typecheck.typespec_to_str(str)) == "str"
assert(typecheck.typespec_to_str(typing.Sequence[str])) == "sequence of str"