aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/addons/clientplayback.py3
-rw-r--r--mitmproxy/addons/core.py9
-rw-r--r--mitmproxy/addons/cut.py17
-rw-r--r--mitmproxy/addons/export.py3
-rw-r--r--mitmproxy/addons/save.py3
-rw-r--r--mitmproxy/addons/serverplayback.py3
-rw-r--r--mitmproxy/addons/view.py2
-rw-r--r--mitmproxy/command.py177
-rw-r--r--mitmproxy/exceptions.py4
-rw-r--r--mitmproxy/tools/console/commander/commander.py69
-rw-r--r--mitmproxy/tools/console/consoleaddons.py22
-rw-r--r--mitmproxy/types.py330
-rw-r--r--test/mitmproxy/test_command.py53
-rw-r--r--test/mitmproxy/test_typemanager.py0
-rw-r--r--test/mitmproxy/test_types.py175
-rw-r--r--test/mitmproxy/tools/console/test_commander.py30
-rw-r--r--test/mitmproxy/utils/test_typecheck.py4
17 files changed, 591 insertions, 313 deletions
diff --git a/mitmproxy/addons/clientplayback.py b/mitmproxy/addons/clientplayback.py
index fcc3209b..bed06e82 100644
--- a/mitmproxy/addons/clientplayback.py
+++ b/mitmproxy/addons/clientplayback.py
@@ -3,6 +3,7 @@ from mitmproxy import ctx
from mitmproxy import io
from mitmproxy import flow
from mitmproxy import command
+import mitmproxy.types
import typing
@@ -37,7 +38,7 @@ class ClientPlayback:
ctx.master.addons.trigger("update", [])
@command.command("replay.client.file")
- def load_file(self, path: command.Path) -> None:
+ def load_file(self, path: mitmproxy.types.Path) -> None:
try:
flows = io.read_flows_from_paths([path])
except exceptions.FlowReadException as e:
diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py
index 4191d490..2b0b2f14 100644
--- a/mitmproxy/addons/core.py
+++ b/mitmproxy/addons/core.py
@@ -6,6 +6,7 @@ from mitmproxy import command
from mitmproxy import flow
from mitmproxy import optmanager
from mitmproxy.net.http import status_codes
+import mitmproxy.types
class Core:
@@ -96,7 +97,7 @@ class Core:
]
@command.command("flow.set")
- @command.argument("spec", type=command.Choice("flow.set.options"))
+ @command.argument("spec", type=mitmproxy.types.Choice("flow.set.options"))
def flow_set(
self,
flows: typing.Sequence[flow.Flow],
@@ -187,7 +188,7 @@ class Core:
ctx.log.alert("Toggled encoding on %s flows." % len(updated))
@command.command("flow.encode")
- @command.argument("enc", type=command.Choice("flow.encode.options"))
+ @command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options"))
def encode(
self,
flows: typing.Sequence[flow.Flow],
@@ -216,7 +217,7 @@ class Core:
return ["gzip", "deflate", "br"]
@command.command("options.load")
- def options_load(self, path: command.Path) -> None:
+ def options_load(self, path: mitmproxy.types.Path) -> None:
"""
Load options from a file.
"""
@@ -228,7 +229,7 @@ class Core:
) from e
@command.command("options.save")
- def options_save(self, path: command.Path) -> None:
+ def options_save(self, path: mitmproxy.types.Path) -> None:
"""
Save options to a file.
"""
diff --git a/mitmproxy/addons/cut.py b/mitmproxy/addons/cut.py
index efc9e5df..b90df549 100644
--- a/mitmproxy/addons/cut.py
+++ b/mitmproxy/addons/cut.py
@@ -7,6 +7,7 @@ from mitmproxy import flow
from mitmproxy import ctx
from mitmproxy import certs
from mitmproxy.utils import strutils
+import mitmproxy.types
import pyperclip
@@ -51,8 +52,8 @@ class Cut:
def cut(
self,
flows: typing.Sequence[flow.Flow],
- cuts: typing.Sequence[command.Cut]
- ) -> command.Cuts:
+ cuts: mitmproxy.types.CutSpec,
+ ) -> mitmproxy.types.Data:
"""
Cut data from a set of flows. Cut specifications are attribute paths
from the base of the flow object, with a few conveniences - "port"
@@ -62,17 +63,17 @@ class Cut:
or "false", "bytes" are preserved, and all other values are
converted to strings.
"""
- ret = []
+ ret = [] # type:typing.List[typing.List[typing.Union[str, bytes]]]
for f in flows:
ret.append([extract(c, f) for c in cuts])
- return ret
+ return ret # type: ignore
@command.command("cut.save")
def save(
self,
flows: typing.Sequence[flow.Flow],
- cuts: typing.Sequence[command.Cut],
- path: command.Path
+ cuts: mitmproxy.types.CutSpec,
+ path: mitmproxy.types.Path
) -> None:
"""
Save cuts to file. If there are multiple flows or cuts, the format
@@ -84,7 +85,7 @@ class Cut:
append = False
if path.startswith("+"):
append = True
- path = command.Path(path[1:])
+ path = mitmproxy.types.Path(path[1:])
if len(cuts) == 1 and len(flows) == 1:
with open(path, "ab" if append else "wb") as fp:
if fp.tell() > 0:
@@ -110,7 +111,7 @@ class Cut:
def clip(
self,
flows: typing.Sequence[flow.Flow],
- cuts: typing.Sequence[command.Cut],
+ cuts: mitmproxy.types.CutSpec,
) -> None:
"""
Send cuts to the clipboard. If there are multiple flows or cuts, the
diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py
index 5388a0e8..0169f5b1 100644
--- a/mitmproxy/addons/export.py
+++ b/mitmproxy/addons/export.py
@@ -5,6 +5,7 @@ from mitmproxy import flow
from mitmproxy import exceptions
from mitmproxy.utils import strutils
from mitmproxy.net.http.http1 import assemble
+import mitmproxy.types
import pyperclip
@@ -49,7 +50,7 @@ class Export():
return list(sorted(formats.keys()))
@command.command("export.file")
- def file(self, fmt: str, f: flow.Flow, path: command.Path) -> None:
+ def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None:
"""
Export a flow to path.
"""
diff --git a/mitmproxy/addons/save.py b/mitmproxy/addons/save.py
index 40cd6f82..1778855d 100644
--- a/mitmproxy/addons/save.py
+++ b/mitmproxy/addons/save.py
@@ -7,6 +7,7 @@ from mitmproxy import flowfilter
from mitmproxy import io
from mitmproxy import ctx
from mitmproxy import flow
+import mitmproxy.types
class Save:
@@ -50,7 +51,7 @@ class Save:
self.start_stream_to_path(ctx.options.save_stream_file, self.filt)
@command.command("save.file")
- def save(self, flows: typing.Sequence[flow.Flow], path: command.Path) -> None:
+ def save(self, flows: typing.Sequence[flow.Flow], path: mitmproxy.types.Path) -> None:
"""
Save flows to a file. If the path starts with a +, flows are
appended to the file, otherwise it is over-written.
diff --git a/mitmproxy/addons/serverplayback.py b/mitmproxy/addons/serverplayback.py
index 46968a8d..20fcfc2a 100644
--- a/mitmproxy/addons/serverplayback.py
+++ b/mitmproxy/addons/serverplayback.py
@@ -9,6 +9,7 @@ from mitmproxy import flow
from mitmproxy import exceptions
from mitmproxy import io
from mitmproxy import command
+import mitmproxy.types
class ServerPlayback:
@@ -31,7 +32,7 @@ class ServerPlayback:
ctx.master.addons.trigger("update", [])
@command.command("replay.server.file")
- def load_file(self, path: command.Path) -> None:
+ def load_file(self, path: mitmproxy.types.Path) -> None:
try:
flows = io.read_flows_from_paths([path])
except exceptions.FlowReadException as e:
diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py
index e45f2baf..3a15fd3e 100644
--- a/mitmproxy/addons/view.py
+++ b/mitmproxy/addons/view.py
@@ -351,7 +351,7 @@ class View(collections.Sequence):
ctx.master.addons.trigger("update", updated)
@command.command("view.load")
- def load_file(self, path: command.Path) -> None:
+ def load_file(self, path: mitmproxy.types.Path) -> None:
"""
Load flows into the view, without processing them with addons.
"""
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
index c86d9792..a77658fd 100644
--- a/mitmproxy/command.py
+++ b/mitmproxy/command.py
@@ -12,7 +12,7 @@ import sys
from mitmproxy.utils import typecheck
from mitmproxy import exceptions
-from mitmproxy import flow
+import mitmproxy.types
def lexer(s):
@@ -24,113 +24,14 @@ def lexer(s):
return lex
-# This is an awkward location for these values, but it's better than having
-# the console core import and depend on an addon. FIXME: Add a way for
-# addons to add custom types and manage their completion and validation.
-valid_flow_prefixes = [
- "@all",
- "@focus",
- "@shown",
- "@hidden",
- "@marked",
- "@unmarked",
- "~q",
- "~s",
- "~a",
- "~hq",
- "~hs",
- "~b",
- "~bq",
- "~bs",
- "~t",
- "~d",
- "~m",
- "~u",
- "~c",
-]
-
-
-Cuts = typing.Sequence[
- typing.Sequence[typing.Union[str, bytes]]
-]
-
-
-class Cut(str):
- # This is an awkward location for these values, but it's better than having
- # the console core import and depend on an addon. FIXME: Add a way for
- # addons to add custom types and manage their completion and validation.
- valid_prefixes = [
- "request.method",
- "request.scheme",
- "request.host",
- "request.http_version",
- "request.port",
- "request.path",
- "request.url",
- "request.text",
- "request.content",
- "request.raw_content",
- "request.timestamp_start",
- "request.timestamp_end",
- "request.header[",
-
- "response.status_code",
- "response.reason",
- "response.text",
- "response.content",
- "response.timestamp_start",
- "response.timestamp_end",
- "response.raw_content",
- "response.header[",
-
- "client_conn.address.port",
- "client_conn.address.host",
- "client_conn.tls_version",
- "client_conn.sni",
- "client_conn.ssl_established",
-
- "server_conn.address.port",
- "server_conn.address.host",
- "server_conn.ip_address.host",
- "server_conn.tls_version",
- "server_conn.sni",
- "server_conn.ssl_established",
- ]
-
-
-class Path(str):
- pass
-
-
-class Cmd(str):
- pass
-
-
-class Arg(str):
- pass
-
-
def typename(t: type) -> str:
"""
- Translates a type to an explanatory string. If ret is True, we're
- looking at a return type, else we're looking at a parameter type.
+ Translates a type to an explanatory string.
"""
- if isinstance(t, Choice):
- return "choice"
- elif t == typing.Sequence[flow.Flow]:
- return "[flow]"
- elif t == typing.Sequence[str]:
- return "[str]"
- elif t == typing.Sequence[Cut]:
- return "[cut]"
- elif t == Cuts:
- return "[cuts]"
- elif t == flow.Flow:
- return "flow"
- elif issubclass(t, (str, int, bool)):
- return t.__name__.lower()
- else: # pragma: no cover
+ to = mitmproxy.types.CommandTypes.get(t, None)
+ if not to:
raise NotImplementedError(t)
+ return to.display
class Command:
@@ -168,7 +69,7 @@ class Command:
ret = " -> " + ret
return "%s %s%s" % (self.path, params, ret)
- def call(self, args: typing.Sequence[str]):
+ def call(self, args: typing.Sequence[str]) -> typing.Any:
"""
Call the command with a list of arguments. At this point, all
arguments are strings.
@@ -255,13 +156,13 @@ class CommandManager:
typ = None # type: typing.Type
for i in range(len(parts)):
if i == 0:
- typ = Cmd
+ typ = mitmproxy.types.Cmd
if parts[i] in self.commands:
params.extend(self.commands[parts[i]].paramtypes)
elif params:
typ = params.pop(0)
# FIXME: Do we need to check that Arg is positional?
- if typ == Cmd and params and params[0] == Arg:
+ if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg:
if parts[i] in self.commands:
params[:] = self.commands[parts[i]].paramtypes
else:
@@ -269,7 +170,7 @@ class CommandManager:
parse.append(ParseResult(value=parts[i], type=typ))
return parse
- def call_args(self, path, args):
+ def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any:
"""
Call a command using a list of string arguments. May raise CommandError.
"""
@@ -300,45 +201,13 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
Convert a string to a argument to the appropriate type.
"""
- if isinstance(argtype, Choice):
- cmd = argtype.options_command
- opts = manager.call(cmd)
- if spec not in opts:
- raise exceptions.CommandError(
- "Invalid choice: see %s for options" % cmd
- )
- return spec
- elif issubclass(argtype, str):
- return spec
- elif argtype == bool:
- if spec == "true":
- return True
- elif spec == "false":
- return False
- else:
- raise exceptions.CommandError(
- "Booleans are 'true' or 'false', got %s" % spec
- )
- elif issubclass(argtype, int):
- try:
- return int(spec)
- except ValueError as e:
- raise exceptions.CommandError("Expected an integer, got %s." % spec)
- elif argtype == typing.Sequence[flow.Flow]:
- return manager.call_args("view.resolve", [spec])
- elif argtype == Cuts:
- return manager.call_args("cut", [spec])
- elif argtype == flow.Flow:
- flows = manager.call_args("view.resolve", [spec])
- if len(flows) != 1:
- raise exceptions.CommandError(
- "Command requires one flow, specification matched %s." % len(flows)
- )
- return flows[0]
- elif argtype in (typing.Sequence[str], typing.Sequence[Cut]):
- return [i.strip() for i in spec.split(",")]
- else:
+ t = mitmproxy.types.CommandTypes.get(argtype, None)
+ if not t:
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
+ try:
+ return t.parse(manager, argtype, spec) # type: ignore
+ except exceptions.TypeError as e:
+ raise exceptions.CommandError from e
def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
@@ -360,21 +229,11 @@ def command(path):
return decorator
-class Choice:
- def __init__(self, options_command):
- self.options_command = options_command
-
- def __instancecheck__(self, instance):
- # return false here so that arguments are piped through parsearg,
- # which does extended validation.
- return False
-
-
def argument(name, type):
"""
- Set the type of a command argument at runtime.
- This is useful for more specific types such as command.Choice, which we cannot annotate
- directly as mypy does not like that.
+ Set the type of a command argument at runtime. This is useful for more
+ specific types such as mitmproxy.types.Choice, which we cannot annotate
+ directly as mypy does not like that.
"""
def decorator(f: types.FunctionType) -> types.FunctionType:
assert name in f.__annotations__
diff --git a/mitmproxy/exceptions.py b/mitmproxy/exceptions.py
index 71517480..d568898b 100644
--- a/mitmproxy/exceptions.py
+++ b/mitmproxy/exceptions.py
@@ -112,6 +112,10 @@ class AddonHalt(MitmproxyException):
pass
+class TypeError(MitmproxyException):
+ pass
+
+
"""
Net-layer exceptions
"""
diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py
index ef32b953..13c80092 100644
--- a/mitmproxy/tools/console/commander/commander.py
+++ b/mitmproxy/tools/console/commander/commander.py
@@ -1,6 +1,4 @@
import abc
-import glob
-import os
import typing
import urwid
@@ -9,6 +7,7 @@ from urwid.text_layout import calc_coords
import mitmproxy.flow
import mitmproxy.master
import mitmproxy.command
+import mitmproxy.types
class Completer: # pragma: no cover
@@ -39,30 +38,6 @@ class ListCompleter(Completer):
return ret
-# Generates the completion options for a specific starting input
-def pathOptions(start: str) -> typing.Sequence[str]:
- if not start:
- start = "./"
- path = os.path.expanduser(start)
- ret = []
- if os.path.isdir(path):
- files = glob.glob(os.path.join(path, "*"))
- prefix = start
- else:
- files = glob.glob(path + "*")
- prefix = os.path.dirname(start)
- prefix = prefix or "./"
- for f in files:
- display = os.path.join(prefix, os.path.normpath(os.path.basename(f)))
- if os.path.isdir(f):
- display += "/"
- ret.append(display)
- if not ret:
- ret = [start]
- ret.sort()
- return ret
-
-
CompletionState = typing.NamedTuple(
"CompletionState",
[
@@ -106,48 +81,12 @@ class CommandBuffer():
if not self.completion:
parts = self.master.commands.parse_partial(self.buf[:self.cursor])
last = parts[-1]
- if last.type == mitmproxy.command.Cmd:
- self.completion = CompletionState(
- completer = ListCompleter(
- parts[-1].value,
- self.master.commands.commands.keys(),
- ),
- parse = parts,
- )
- if last.type == typing.Sequence[mitmproxy.command.Cut]:
- spec = parts[-1].value.split(",")
- opts = []
- for pref in mitmproxy.command.Cut.valid_prefixes:
- spec[-1] = pref
- opts.append(",".join(spec))
- self.completion = CompletionState(
- completer = ListCompleter(
- parts[-1].value,
- opts,
- ),
- parse = parts,
- )
- elif isinstance(last.type, mitmproxy.command.Choice):
+ ct = mitmproxy.types.CommandTypes.get(last.type, None)
+ if ct:
self.completion = CompletionState(
completer = ListCompleter(
parts[-1].value,
- self.master.commands.call(last.type.options_command),
- ),
- parse = parts,
- )
- elif last.type == mitmproxy.command.Path:
- self.completion = CompletionState(
- completer = ListCompleter(
- "",
- pathOptions(parts[1].value)
- ),
- parse = parts,
- )
- elif last.type in (typing.Sequence[mitmproxy.flow.Flow], mitmproxy.flow.Flow):
- self.completion = CompletionState(
- completer = ListCompleter(
- "",
- mitmproxy.command.valid_flow_prefixes,
+ ct.completion(self.master.commands, last.type, parts[-1].value)
),
parse = parts,
)
diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py
index 453e9e1c..37647e60 100644
--- a/mitmproxy/tools/console/consoleaddons.py
+++ b/mitmproxy/tools/console/consoleaddons.py
@@ -7,6 +7,8 @@ from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import contentviews
from mitmproxy.utils import strutils
+import mitmproxy.types
+
from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import signals
@@ -218,8 +220,8 @@ class ConsoleAddon:
self,
prompt: str,
choices: typing.Sequence[str],
- cmd: command.Cmd,
- *args: command.Arg
+ cmd: mitmproxy.types.Cmd,
+ *args: mitmproxy.types.Arg
) -> None:
"""
Prompt the user to choose from a specified list of strings, then
@@ -241,7 +243,7 @@ class ConsoleAddon:
@command.command("console.choose.cmd")
def console_choose_cmd(
- self, prompt: str, choicecmd: command.Cmd, *cmd: command.Arg
+ self, prompt: str, choicecmd: mitmproxy.types.Cmd, *cmd: mitmproxy.types.Arg
) -> None:
"""
Prompt the user to choose from a list of strings returned by a
@@ -352,7 +354,7 @@ class ConsoleAddon:
]
@command.command("console.edit.focus")
- @command.argument("part", type=command.Choice("console.edit.focus.options"))
+ @command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options"))
def edit_focus(self, part: str) -> None:
"""
Edit a component of the currently focused flow.
@@ -404,14 +406,14 @@ class ConsoleAddon:
self._grideditor().cmd_delete()
@command.command("console.grideditor.load")
- def grideditor_load(self, path: command.Path) -> None:
+ def grideditor_load(self, path: mitmproxy.types.Path) -> None:
"""
Read a file into the currrent cell.
"""
self._grideditor().cmd_read_file(path)
@command.command("console.grideditor.load_escaped")
- def grideditor_load_escaped(self, path: command.Path) -> None:
+ def grideditor_load_escaped(self, path: mitmproxy.types.Path) -> None:
"""
Read a file containing a Python-style escaped string into the
currrent cell.
@@ -419,7 +421,7 @@ class ConsoleAddon:
self._grideditor().cmd_read_file_escaped(path)
@command.command("console.grideditor.save")
- def grideditor_save(self, path: command.Path) -> None:
+ def grideditor_save(self, path: mitmproxy.types.Path) -> None:
"""
Save data to file as a CSV.
"""
@@ -440,7 +442,7 @@ class ConsoleAddon:
self._grideditor().cmd_spawn_editor()
@command.command("console.flowview.mode.set")
- @command.argument("mode", type=command.Choice("console.flowview.mode.options"))
+ @command.argument("mode", type=mitmproxy.types.Choice("console.flowview.mode.options"))
def flowview_mode_set(self, mode: str) -> None:
"""
Set the display mode for the current flow view.
@@ -498,8 +500,8 @@ class ConsoleAddon:
self,
contexts: typing.Sequence[str],
key: str,
- cmd: command.Cmd,
- *args: command.Arg
+ cmd: mitmproxy.types.Cmd,
+ *args: mitmproxy.types.Arg
) -> None:
"""
Bind a shortcut key.
diff --git a/mitmproxy/types.py b/mitmproxy/types.py
new file mode 100644
index 00000000..e1b6f95d
--- /dev/null
+++ b/mitmproxy/types.py
@@ -0,0 +1,330 @@
+import os
+import glob
+import typing
+
+from mitmproxy import exceptions
+from mitmproxy import flow
+
+
+class Path(str):
+ pass
+
+
+class Cmd(str):
+ pass
+
+
+class Arg(str):
+ pass
+
+
+class CutSpec(typing.Sequence[str]):
+ pass
+
+
+class Data(typing.Sequence[typing.Sequence[typing.Union[str, bytes]]]):
+ pass
+
+
+class Choice:
+ def __init__(self, options_command):
+ self.options_command = options_command
+
+ def __instancecheck__(self, instance): # pragma: no cover
+ # return false here so that arguments are piped through parsearg,
+ # which does extended validation.
+ return False
+
+
+# One of the many charming things about mypy is that introducing type
+# annotations can cause circular dependencies where there were none before.
+# Rather than putting types and the CommandManger in the same file, we introduce
+# a stub type with the signature we use.
+class _CommandStub:
+ commands = {} # type: typing.Mapping[str, typing.Any]
+
+ def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover
+ pass
+
+ def call(self, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover
+ pass
+
+
+class BaseType:
+ typ = object # type: typing.Type
+ display = "" # type: str
+
+ def completion(
+ self, manager: _CommandStub, t: type, s: str
+ ) -> typing.Sequence[str]: # pragma: no cover
+ pass
+
+ def parse(
+ self, manager: _CommandStub, t: type, s: str
+ ) -> typing.Any: # pragma: no cover
+ pass
+
+
+class Bool(BaseType):
+ typ = bool
+ display = "bool"
+
+ def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
+ return ["false", "true"]
+
+ def parse(self, manager: _CommandStub, t: type, s: str) -> bool:
+ if s == "true":
+ return True
+ elif s == "false":
+ return False
+ else:
+ raise exceptions.TypeError(
+ "Booleans are 'true' or 'false', got %s" % s
+ )
+
+
+class Str(BaseType):
+ typ = str
+ display = "str"
+
+ def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
+ return []
+
+ def parse(self, manager: _CommandStub, t: type, s: str) -> str:
+ return s
+
+
+class Int(BaseType):
+ typ = int
+ display = "int"
+
+ def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
+ return []
+
+ def parse(self, manager: _CommandStub, t: type, s: str) -> int:
+ try:
+ return int(s)
+ except ValueError as e:
+ raise exceptions.TypeError from e
+
+
+class PathType(BaseType):
+ typ = Path
+ display = "path"
+
+ def completion(self, manager: _CommandStub, t: type, start: str) -> typing.Sequence[str]:
+ if not start:
+ start = "./"
+ path = os.path.expanduser(start)
+ ret = []
+ if os.path.isdir(path):
+ files = glob.glob(os.path.join(path, "*"))
+ prefix = start
+ else:
+ files = glob.glob(path + "*")
+ prefix = os.path.dirname(start)
+ prefix = prefix or "./"
+ for f in files:
+ display = os.path.join(prefix, os.path.normpath(os.path.basename(f)))
+ if os.path.isdir(f):
+ display += "/"
+ ret.append(display)
+ if not ret:
+ ret = [start]
+ ret.sort()
+ return ret
+
+ def parse(self, manager: _CommandStub, t: type, s: str) -> str:
+ return s
+
+
+class CmdType(BaseType):
+ typ = Cmd
+ display = "cmd"
+
+ def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
+ return list(manager.commands.keys())
+
+ def parse(self, manager: _CommandStub, t: type, s: str) -> str:
+ return s
+
+
+class ArgType(BaseType):
+ typ = Arg
+ display = "arg"
+
+ def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
+ return []
+
+ def parse(self, manager: _CommandStub, t: type, s: str) -> str:
+ return s
+
+
+class StrSeq(BaseType):
+ typ = typing.Sequence[str]
+ display = "[str]"
+
+ def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
+ return []
+
+ def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
+ return [x.strip() for x in s.split(",")]
+
+
+class CutSpecType(BaseType):
+ typ = CutSpec
+ display = "[cut]"
+ valid_prefixes = [
+ "request.method",
+ "request.scheme",
+ "request.host",
+ "request.http_version",
+ "request.port",
+ "request.path",
+ "request.url",
+ "request.text",
+ "request.content",
+ "request.raw_content",
+ "request.timestamp_start",
+ "request.timestamp_end",
+ "request.header[",
+
+ "response.status_code",
+ "response.reason",
+ "response.text",
+ "response.content",
+ "response.timestamp_start",
+ "response.timestamp_end",
+ "response.raw_content",
+ "response.header[",
+
+ "client_conn.address.port",
+ "client_conn.address.host",
+ "client_conn.tls_version",
+ "client_conn.sni",
+ "client_conn.ssl_established",
+
+ "server_conn.address.port",
+ "server_conn.address.host",
+ "server_conn.ip_address.host",
+ "server_conn.tls_version",
+ "server_conn.sni",
+ "server_conn.ssl_established",
+ ]
+
+ def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
+ spec = s.split(",")
+ opts = []
+ for pref in self.valid_prefixes:
+ spec[-1] = pref
+ opts.append(",".join(spec))
+ return opts
+
+ def parse(self, manager: _CommandStub, t: type, s: str) -> CutSpec:
+ parts = s.split(",") # type: typing.Any
+ return parts
+
+
+class BaseFlowType(BaseType):
+ valid_prefixes = [
+ "@all",
+ "@focus",
+ "@shown",
+ "@hidden",
+ "@marked",
+ "@unmarked",
+ "~q",
+ "~s",
+ "~a",
+ "~hq",
+ "~hs",
+ "~b",
+ "~bq",
+ "~bs",
+ "~t",
+ "~d",
+ "~m",
+ "~u",
+ "~c",
+ ]
+
+ def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
+ return self.valid_prefixes
+
+
+class FlowType(BaseFlowType):
+ typ = flow.Flow
+ display = "flow"
+
+ def parse(self, manager: _CommandStub, t: type, s: str) -> flow.Flow:
+ flows = manager.call_args("view.resolve", [s])
+ if len(flows) != 1:
+ raise exceptions.TypeError(
+ "Command requires one flow, specification matched %s." % len(flows)
+ )
+ return flows[0]
+
+
+class FlowsType(BaseFlowType):
+ typ = typing.Sequence[flow.Flow]
+ display = "[flow]"
+
+ def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[flow.Flow]:
+ return manager.call_args("view.resolve", [s])
+
+
+class DataType:
+ typ = Data
+ display = "[data]"
+
+ def completion(
+ self, manager: _CommandStub, t: type, s: str
+ ) -> typing.Sequence[str]: # pragma: no cover
+ raise exceptions.TypeError("data cannot be passed as argument")
+
+ def parse(
+ self, manager: _CommandStub, t: type, s: str
+ ) -> typing.Any: # pragma: no cover
+ raise exceptions.TypeError("data cannot be passed as argument")
+
+
+class ChoiceType:
+ typ = Choice
+ display = "choice"
+
+ def completion(self, manager: _CommandStub, t: Choice, s: str) -> typing.Sequence[str]:
+ return manager.call(t.options_command)
+
+ def parse(self, manager: _CommandStub, t: Choice, s: str) -> str:
+ opts = manager.call(t.options_command)
+ if s not in opts:
+ raise exceptions.TypeError("Invalid choice.")
+ return s
+
+
+class TypeManager:
+ def __init__(self, *types):
+ self.typemap = {}
+ for t in types:
+ self.typemap[t.typ] = t()
+
+ def get(self, t: type, default=None) -> BaseType:
+ if type(t) in self.typemap:
+ return self.typemap[type(t)]
+ return self.typemap.get(t, default)
+
+
+CommandTypes = TypeManager(
+ ArgType,
+ Bool,
+ ChoiceType,
+ CmdType,
+ CutSpecType,
+ DataType,
+ FlowType,
+ FlowsType,
+ Int,
+ PathType,
+ Str,
+ StrSeq,
+)
diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py
index 50ad3d55..f9315dd2 100644
--- a/test/mitmproxy/test_command.py
+++ b/test/mitmproxy/test_command.py
@@ -4,6 +4,7 @@ from mitmproxy import flow
from mitmproxy import exceptions
from mitmproxy.test import tflow
from mitmproxy.test import taddons
+import mitmproxy.types
import io
import pytest
@@ -25,7 +26,7 @@ class TAddon:
return foo
@command.command("subcommand")
- def subcommand(self, cmd: command.Cmd, *args: command.Arg) -> str:
+ def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.Arg) -> str:
return "ok"
@command.command("empty")
@@ -39,12 +40,12 @@ class TAddon:
def choices(self) -> typing.Sequence[str]:
return ["one", "two", "three"]
- @command.argument("arg", type=command.Choice("choices"))
+ @command.argument("arg", type=mitmproxy.types.Choice("choices"))
def choose(self, arg: str) -> typing.Sequence[str]:
return ["one", "two", "three"]
@command.command("path")
- def path(self, arg: command.Path) -> None:
+ def path(self, arg: mitmproxy.types.Path) -> None:
pass
@@ -79,45 +80,45 @@ class TestCommand:
[
"foo bar",
[
- command.ParseResult(value = "foo", type = command.Cmd),
+ command.ParseResult(value = "foo", type = mitmproxy.types.Cmd),
command.ParseResult(value = "bar", type = str)
],
],
[
"foo 'bar",
[
- command.ParseResult(value = "foo", type = command.Cmd),
+ command.ParseResult(value = "foo", type = mitmproxy.types.Cmd),
command.ParseResult(value = "'bar", type = str)
]
],
- ["a", [command.ParseResult(value = "a", type = command.Cmd)]],
- ["", [command.ParseResult(value = "", type = command.Cmd)]],
+ ["a", [command.ParseResult(value = "a", type = mitmproxy.types.Cmd)]],
+ ["", [command.ParseResult(value = "", type = mitmproxy.types.Cmd)]],
[
"cmd3 1",
[
- command.ParseResult(value = "cmd3", type = command.Cmd),
+ command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd),
command.ParseResult(value = "1", type = int),
]
],
[
"cmd3 ",
[
- command.ParseResult(value = "cmd3", type = command.Cmd),
+ command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd),
command.ParseResult(value = "", type = int),
]
],
[
"subcommand ",
[
- command.ParseResult(value = "subcommand", type = command.Cmd),
- command.ParseResult(value = "", type = command.Cmd),
+ command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd),
+ command.ParseResult(value = "", type = mitmproxy.types.Cmd),
]
],
[
"subcommand cmd3 ",
[
- command.ParseResult(value = "subcommand", type = command.Cmd),
- command.ParseResult(value = "cmd3", type = command.Cmd),
+ command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd),
+ command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd),
command.ParseResult(value = "", type = int),
]
],
@@ -154,15 +155,15 @@ def test_typename():
assert command.typename(str) == "str"
assert command.typename(typing.Sequence[flow.Flow]) == "[flow]"
- assert command.typename(command.Cuts) == "[cuts]"
- assert command.typename(typing.Sequence[command.Cut]) == "[cut]"
+ assert command.typename(mitmproxy.types.Data) == "[data]"
+ assert command.typename(mitmproxy.types.CutSpec) == "[cut]"
assert command.typename(flow.Flow) == "flow"
assert command.typename(typing.Sequence[str]) == "[str]"
- assert command.typename(command.Choice("foo")) == "choice"
- assert command.typename(command.Path) == "path"
- assert command.typename(command.Cmd) == "cmd"
+ assert command.typename(mitmproxy.types.Choice("foo")) == "choice"
+ assert command.typename(mitmproxy.types.Path) == "path"
+ assert command.typename(mitmproxy.types.Cmd) == "cmd"
class DummyConsole:
@@ -172,7 +173,7 @@ class DummyConsole:
return [tflow.tflow(resp=True)] * n
@command.command("cut")
- def cut(self, spec: str) -> command.Cuts:
+ def cut(self, spec: str) -> mitmproxy.types.Data:
return [["test"]]
@@ -202,10 +203,6 @@ def test_parsearg():
command.parsearg(tctx.master.commands, "foo", Exception)
assert command.parsearg(
- tctx.master.commands, "foo", command.Cuts
- ) == [["test"]]
-
- assert command.parsearg(
tctx.master.commands, "foo", typing.Sequence[str]
) == ["foo"]
assert command.parsearg(
@@ -215,18 +212,18 @@ def test_parsearg():
a = TAddon()
tctx.master.commands.add("choices", a.choices)
assert command.parsearg(
- tctx.master.commands, "one", command.Choice("choices"),
+ tctx.master.commands, "one", mitmproxy.types.Choice("choices"),
) == "one"
with pytest.raises(exceptions.CommandError):
assert command.parsearg(
- tctx.master.commands, "invalid", command.Choice("choices"),
+ tctx.master.commands, "invalid", mitmproxy.types.Choice("choices"),
)
assert command.parsearg(
- tctx.master.commands, "foo", command.Path
+ tctx.master.commands, "foo", mitmproxy.types.Path
) == "foo"
assert command.parsearg(
- tctx.master.commands, "foo", command.Cmd
+ tctx.master.commands, "foo", mitmproxy.types.Cmd
) == "foo"
@@ -272,5 +269,5 @@ def test_choice():
basic typechecking for choices should fail as we cannot verify if strings are a valid choice
at this point.
"""
- c = command.Choice("foo")
+ c = mitmproxy.types.Choice("foo")
assert not typecheck.check_command_type("foo", c)
diff --git a/test/mitmproxy/test_typemanager.py b/test/mitmproxy/test_typemanager.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/mitmproxy/test_typemanager.py
diff --git a/test/mitmproxy/test_types.py b/test/mitmproxy/test_types.py
new file mode 100644
index 00000000..81aaed74
--- /dev/null
+++ b/test/mitmproxy/test_types.py
@@ -0,0 +1,175 @@
+import pytest
+import os
+import typing
+import contextlib
+
+from mitmproxy.test import tutils
+import mitmproxy.exceptions
+import mitmproxy.types
+from mitmproxy.test import taddons
+from mitmproxy.test import tflow
+from mitmproxy import command
+from mitmproxy import flow
+
+from . import test_command
+
+
+@contextlib.contextmanager
+def chdir(path: str):
+ old_dir = os.getcwd()
+ os.chdir(path)
+ yield
+ os.chdir(old_dir)
+
+
+def test_bool():
+ with taddons.context() as tctx:
+ b = mitmproxy.types.Bool()
+ assert b.completion(tctx.master.commands, bool, "b") == ["false", "true"]
+ assert b.parse(tctx.master.commands, bool, "true") is True
+ assert b.parse(tctx.master.commands, bool, "false") is False
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, bool, "foo")
+
+
+def test_str():
+ with taddons.context() as tctx:
+ b = mitmproxy.types.Str()
+ assert b.completion(tctx.master.commands, str, "") == []
+ assert b.parse(tctx.master.commands, str, "foo") == "foo"
+
+
+def test_int():
+ with taddons.context() as tctx:
+ b = mitmproxy.types.Int()
+ assert b.completion(tctx.master.commands, int, "b") == []
+ assert b.parse(tctx.master.commands, int, "1") == 1
+ assert b.parse(tctx.master.commands, int, "999") == 999
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, int, "foo")
+
+
+def test_path():
+ with taddons.context() as tctx:
+ b = mitmproxy.types.PathType()
+ assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo"
+ assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar"
+
+ def normPathOpts(prefix, match):
+ ret = []
+ for s in b.completion(tctx.master.commands, mitmproxy.types.Path, match):
+ s = s[len(prefix):]
+ s = s.replace(os.sep, "/")
+ ret.append(s)
+ return ret
+
+ cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion"))
+ assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/']
+ assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac']
+ with chdir(cd):
+ assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/']
+ assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/']
+ assert b.completion(
+ tctx.master.commands, mitmproxy.types.Path, "nonexistent"
+ ) == ["nonexistent"]
+
+
+def test_cmd():
+ with taddons.context() as tctx:
+ tctx.master.addons.add(test_command.TAddon())
+ b = mitmproxy.types.CmdType()
+ assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "foo") == "foo"
+ assert len(
+ b.completion(tctx.master.commands, mitmproxy.types.Cmd, "")
+ ) == len(tctx.master.commands.commands.keys())
+
+
+def test_cutspec():
+ with taddons.context() as tctx:
+ b = mitmproxy.types.CutSpecType()
+ b.parse(tctx.master.commands, mitmproxy.types.CutSpec, "foo,bar") == ["foo", "bar"]
+ assert b.completion(
+ tctx.master.commands, mitmproxy.types.CutSpec, "request.p"
+ ) == b.valid_prefixes
+ ret = b.completion(tctx.master.commands, mitmproxy.types.CutSpec, "request.port,f")
+ assert ret[0].startswith("request.port,")
+ assert len(ret) == len(b.valid_prefixes)
+
+
+def test_arg():
+ with taddons.context() as tctx:
+ b = mitmproxy.types.ArgType()
+ assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == []
+ assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo"
+
+
+def test_strseq():
+ with taddons.context() as tctx:
+ b = mitmproxy.types.StrSeq()
+ assert b.completion(tctx.master.commands, typing.Sequence[str], "") == []
+ assert b.parse(tctx.master.commands, typing.Sequence[str], "foo") == ["foo"]
+ assert b.parse(tctx.master.commands, typing.Sequence[str], "foo,bar") == ["foo", "bar"]
+
+
+class DummyConsole:
+ @command.command("view.resolve")
+ def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
+ n = int(spec)
+ return [tflow.tflow(resp=True)] * n
+
+ @command.command("cut")
+ def cut(self, spec: str) -> mitmproxy.types.Data:
+ return [["test"]]
+
+ @command.command("options")
+ def options(self) -> typing.Sequence[str]:
+ return ["one", "two", "three"]
+
+
+def test_flow():
+ with taddons.context() as tctx:
+ tctx.master.addons.add(DummyConsole())
+ b = mitmproxy.types.FlowType()
+ assert len(b.completion(tctx.master.commands, flow.Flow, "")) == len(b.valid_prefixes)
+ assert b.parse(tctx.master.commands, flow.Flow, "1")
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ assert b.parse(tctx.master.commands, flow.Flow, "0")
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ assert b.parse(tctx.master.commands, flow.Flow, "2")
+
+
+def test_flows():
+ with taddons.context() as tctx:
+ tctx.master.addons.add(DummyConsole())
+ b = mitmproxy.types.FlowsType()
+ assert len(
+ b.completion(tctx.master.commands, typing.Sequence[flow.Flow], "")
+ ) == len(b.valid_prefixes)
+ assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "0")) == 0
+ assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "1")) == 1
+ assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "2")) == 2
+
+
+def test_data():
+ with taddons.context() as tctx:
+ b = mitmproxy.types.DataType()
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, mitmproxy.types.Data, "foo")
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, mitmproxy.types.Data, "foo")
+
+
+def test_choice():
+ with taddons.context() as tctx:
+ tctx.master.addons.add(DummyConsole())
+ b = mitmproxy.types.ChoiceType()
+ comp = b.completion(tctx.master.commands, mitmproxy.types.Choice("options"), "")
+ assert comp == ["one", "two", "three"]
+ assert b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "one") == "one"
+ with pytest.raises(mitmproxy.exceptions.TypeError):
+ b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "invalid")
+
+
+def test_typemanager():
+ assert mitmproxy.types.CommandTypes.get(bool, None)
+ assert mitmproxy.types.CommandTypes.get(mitmproxy.types.Choice("choide"), None)
diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py
index 823af06d..34062dcb 100644
--- a/test/mitmproxy/tools/console/test_commander.py
+++ b/test/mitmproxy/tools/console/test_commander.py
@@ -1,36 +1,6 @@
-import os
-import contextlib
from mitmproxy.tools.console.commander import commander
from mitmproxy.test import taddons
-from mitmproxy.test import tutils
-
-
-@contextlib.contextmanager
-def chdir(path: str):
- old_dir = os.getcwd()
- os.chdir(path)
- yield
- os.chdir(old_dir)
-
-
-def normPathOpts(prefix, match):
- ret = []
- for s in commander.pathOptions(match):
- s = s[len(prefix):]
- s = s.replace(os.sep, "/")
- ret.append(s)
- return ret
-
-
-def test_pathOptions():
- cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion"))
- assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/']
- assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac']
- with chdir(cd):
- assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/']
- assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/']
- assert commander.pathOptions("nonexistent") == ["nonexistent"]
class TestListCompleter:
diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py
index 66b1884e..365509f1 100644
--- a/test/mitmproxy/utils/test_typecheck.py
+++ b/test/mitmproxy/utils/test_typecheck.py
@@ -4,7 +4,6 @@ from unittest import mock
import pytest
from mitmproxy.utils import typecheck
-from mitmproxy import command
class TBase:
@@ -95,9 +94,6 @@ def test_check_command_type():
assert(typecheck.check_command_type(None, None))
assert(not typecheck.check_command_type(["foo"], typing.Sequence[int]))
assert(not typecheck.check_command_type("foo", typing.Sequence[int]))
- assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts))
- assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts))
- assert(not typecheck.check_command_type([["foo", 22]], command.Cuts))
# Python 3.5 only defines __parameters__
m = mock.Mock()