aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/addons/clientplayback.py2
-rw-r--r--mitmproxy/addons/serverplayback.py2
-rw-r--r--mitmproxy/command.py62
-rw-r--r--mitmproxy/tools/console/commandeditor.py9
-rw-r--r--mitmproxy/tools/console/commander/__init__.py0
-rw-r--r--mitmproxy/tools/console/commander/commander.py158
-rw-r--r--mitmproxy/tools/console/consoleaddons.py20
-rw-r--r--mitmproxy/tools/console/statusbar.py5
-rw-r--r--test/mitmproxy/test_command.py48
-rw-r--r--test/mitmproxy/tools/console/test_commander.py68
10 files changed, 355 insertions, 19 deletions
diff --git a/mitmproxy/addons/clientplayback.py b/mitmproxy/addons/clientplayback.py
index 9e012b67..fcc3209b 100644
--- a/mitmproxy/addons/clientplayback.py
+++ b/mitmproxy/addons/clientplayback.py
@@ -37,7 +37,7 @@ class ClientPlayback:
ctx.master.addons.trigger("update", [])
@command.command("replay.client.file")
- def load_file(self, path: str) -> None:
+ def load_file(self, path: command.Path) -> None:
try:
flows = io.read_flows_from_paths([path])
except exceptions.FlowReadException as e:
diff --git a/mitmproxy/addons/serverplayback.py b/mitmproxy/addons/serverplayback.py
index 927f6e15..46968a8d 100644
--- a/mitmproxy/addons/serverplayback.py
+++ b/mitmproxy/addons/serverplayback.py
@@ -31,7 +31,7 @@ class ServerPlayback:
ctx.master.addons.trigger("update", [])
@command.command("replay.server.file")
- def load_file(self, path: str) -> None:
+ def load_file(self, path: command.Path) -> None:
try:
flows = io.read_flows_from_paths([path])
except exceptions.FlowReadException as e:
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
index c4821973..087f7770 100644
--- a/mitmproxy/command.py
+++ b/mitmproxy/command.py
@@ -3,6 +3,7 @@
"""
import inspect
import types
+import io
import typing
import shlex
import textwrap
@@ -14,6 +15,15 @@ from mitmproxy import exceptions
from mitmproxy import flow
+def lexer(s):
+ # mypy mis-identifies shlex.shlex as abstract
+ lex = shlex.shlex(s) # type: ignore
+ lex.wordchars += "."
+ lex.whitespace_split = True
+ lex.commenters = ''
+ return lex
+
+
Cuts = typing.Sequence[
typing.Sequence[typing.Union[str, bytes]]
]
@@ -23,6 +33,14 @@ class Path(str):
pass
+class Cmd(str):
+ pass
+
+
+class Arg(str):
+ pass
+
+
def typename(t: type, ret: bool) -> str:
"""
Translates a type to an explanatory string. If ret is True, we're
@@ -118,6 +136,12 @@ class Command:
return ret
+ParseResult = typing.NamedTuple(
+ "ParseResult",
+ [("value", str), ("type", typing.Type)],
+)
+
+
class CommandManager:
def __init__(self, master):
self.master = master
@@ -133,6 +157,42 @@ class CommandManager:
def add(self, path: str, func: typing.Callable):
self.commands[path] = Command(self, path, func)
+ def parse_partial(self, cmdstr: str) -> typing.Sequence[ParseResult]:
+ """
+ Parse a possibly partial command. Return a sequence of (part, type) tuples.
+ """
+ buf = io.StringIO(cmdstr)
+ parts = [] # type: typing.List[str]
+ lex = lexer(buf)
+ while 1:
+ remainder = cmdstr[buf.tell():]
+ try:
+ t = lex.get_token()
+ except ValueError:
+ parts.append(remainder)
+ break
+ if not t:
+ break
+ parts.append(t)
+ if not parts:
+ parts = [""]
+ elif cmdstr.endswith(" "):
+ parts.append("")
+
+ parse = [] # type: typing.List[ParseResult]
+ params = [] # type: typing.List[type]
+ for i in range(len(parts)):
+ if i == 0:
+ params[:] = [Cmd]
+ if parts[i] in self.commands:
+ params.extend(self.commands[parts[i]].paramtypes)
+ if params:
+ typ = params.pop(0)
+ else:
+ typ = str
+ parse.append(ParseResult(value=parts[i], type=typ))
+ return parse
+
def call_args(self, path, args):
"""
Call a command using a list of string arguments. May raise CommandError.
@@ -145,7 +205,7 @@ class CommandManager:
"""
Call a command using a string. May raise CommandError.
"""
- parts = shlex.split(cmdstr)
+ parts = list(lexer(cmdstr))
if not len(parts) >= 1:
raise exceptions.CommandError("Invalid command: %s" % cmdstr)
return self.call_args(parts[0], parts[1:])
diff --git a/mitmproxy/tools/console/commandeditor.py b/mitmproxy/tools/console/commandeditor.py
index 17d1506b..e57ddbb4 100644
--- a/mitmproxy/tools/console/commandeditor.py
+++ b/mitmproxy/tools/console/commandeditor.py
@@ -1,19 +1,10 @@
import typing
-import urwid
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy.tools.console import signals
-class CommandEdit(urwid.Edit):
- def __init__(self, partial):
- urwid.Edit.__init__(self, ":", partial)
-
- def keypress(self, size, key):
- return urwid.Edit.keypress(self, size, key)
-
-
class CommandExecutor:
def __init__(self, master):
self.master = master
diff --git a/mitmproxy/tools/console/commander/__init__.py b/mitmproxy/tools/console/commander/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/mitmproxy/tools/console/commander/__init__.py
diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py
new file mode 100644
index 00000000..dbbc8ff2
--- /dev/null
+++ b/mitmproxy/tools/console/commander/commander.py
@@ -0,0 +1,158 @@
+import urwid
+from urwid.text_layout import calc_coords
+import typing
+import abc
+
+import mitmproxy.master
+import mitmproxy.command
+
+
+class Completer:
+ @abc.abstractmethod
+ def cycle(self) -> str:
+ pass
+
+
+class ListCompleter(Completer):
+ def __init__(
+ self,
+ start: str,
+ options: typing.Sequence[str],
+ ) -> None:
+ self.start = start
+ self.options = [] # type: typing.Sequence[str]
+ for o in options:
+ if o.startswith(start):
+ self.options.append(o)
+ self.offset = 0
+
+ def cycle(self) -> str:
+ if not self.options:
+ return self.start
+ ret = self.options[self.offset]
+ self.offset = (self.offset + 1) % len(self.options)
+ return ret
+
+
+CompletionState = typing.NamedTuple(
+ "CompletionState",
+ [
+ ("completer", Completer),
+ ("parse", typing.Sequence[mitmproxy.command.ParseResult])
+ ]
+)
+
+
+class CommandBuffer():
+ def __init__(self, master: mitmproxy.master.Master, start: str = "") -> None:
+ self.master = master
+ self.buf = start
+ # Cursor is always within the range [0:len(buffer)].
+ self._cursor = len(self.buf)
+ self.completion = None # type: CompletionState
+
+ @property
+ def cursor(self) -> int:
+ return self._cursor
+
+ @cursor.setter
+ def cursor(self, x) -> None:
+ if x < 0:
+ self._cursor = 0
+ elif x > len(self.buf):
+ self._cursor = len(self.buf)
+ else:
+ self._cursor = x
+
+ def render(self):
+ return self.buf
+
+ def left(self) -> None:
+ self.cursor = self.cursor - 1
+
+ def right(self) -> None:
+ self.cursor = self.cursor + 1
+
+ def cycle_completion(self) -> None:
+ if not self.completion:
+ parts = self.master.commands.parse_partial(self.buf[:self.cursor])
+ last = parts[-1]
+ if last.type == mitmproxy.command.Cmd:
+ self.completion = CompletionState(
+ completer = ListCompleter(
+ parts[-1].value,
+ self.master.commands.commands.keys(),
+ ),
+ parse = parts,
+ )
+ elif isinstance(last.type, mitmproxy.command.Choice):
+ self.completion = CompletionState(
+ completer = ListCompleter(
+ parts[-1].value,
+ self.master.commands.call(last.type.options_command),
+ ),
+ parse = parts,
+ )
+ if self.completion:
+ nxt = self.completion.completer.cycle()
+ buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt
+ buf = buf.strip()
+ self.buf = buf
+ self.cursor = len(self.buf)
+
+ def backspace(self) -> None:
+ if self.cursor == 0:
+ return
+ self.buf = self.buf[:self.cursor - 1] + self.buf[self.cursor:]
+ self.cursor = self.cursor - 1
+ self.completion = None
+
+ def insert(self, k: str) -> None:
+ """
+ Inserts text at the cursor.
+ """
+ self.buf = self.buf = self.buf[:self.cursor] + k + self.buf[self.cursor:]
+ self.cursor += 1
+ self.completion = None
+
+
+class CommandEdit(urwid.WidgetWrap):
+ leader = ": "
+
+ def __init__(self, master: mitmproxy.master.Master, text: str) -> None:
+ self.master = master
+ self.cbuf = CommandBuffer(master, text)
+ self._w = urwid.Text(self.leader)
+ self.update()
+
+ def keypress(self, size, key):
+ if key == "backspace":
+ self.cbuf.backspace()
+ elif key == "left":
+ self.cbuf.left()
+ elif key == "right":
+ self.cbuf.right()
+ elif key == "tab":
+ self.cbuf.cycle_completion()
+ elif len(key) == 1:
+ self.cbuf.insert(key)
+ self.update()
+
+ def update(self):
+ self._w.set_text([self.leader, self.cbuf.render()])
+
+ def render(self, size, focus=False):
+ (maxcol,) = size
+ canv = self._w.render((maxcol,))
+ canv = urwid.CompositeCanvas(canv)
+ canv.cursor = self.get_cursor_coords((maxcol,))
+ return canv
+
+ def get_cursor_coords(self, size):
+ p = self.cbuf.cursor + len(self.leader)
+ trans = self._w.get_line_translation(size[0])
+ x, y = calc_coords(self._w.get_text()[0], trans, p)
+ return x, y
+
+ def get_value(self):
+ return self.cbuf.buf
diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py
index a10efa51..87f794c2 100644
--- a/mitmproxy/tools/console/consoleaddons.py
+++ b/mitmproxy/tools/console/consoleaddons.py
@@ -214,7 +214,11 @@ class ConsoleAddon:
@command.command("console.choose")
def console_choose(
- self, prompt: str, choices: typing.Sequence[str], *cmd: str
+ self,
+ prompt: str,
+ choices: typing.Sequence[str],
+ cmd: command.Cmd,
+ *args: command.Arg
) -> None:
"""
Prompt the user to choose from a specified list of strings, then
@@ -223,7 +227,7 @@ class ConsoleAddon:
"""
def callback(opt):
# We're now outside of the call context...
- repl = " ".join(cmd)
+ repl = cmd + " " + " ".join(args)
repl = repl.replace("{choice}", opt)
try:
self.master.commands.call(repl)
@@ -236,7 +240,7 @@ class ConsoleAddon:
@command.command("console.choose.cmd")
def console_choose_cmd(
- self, prompt: str, choicecmd: str, *cmd: str
+ self, prompt: str, choicecmd: command.Cmd, *cmd: command.Arg
) -> None:
"""
Prompt the user to choose from a list of strings returned by a
@@ -475,14 +479,20 @@ class ConsoleAddon:
return list(sorted(keymap.Contexts))
@command.command("console.key.bind")
- def key_bind(self, contexts: typing.Sequence[str], key: str, *command: str) -> None:
+ def key_bind(
+ self,
+ contexts: typing.Sequence[str],
+ key: str,
+ cmd: command.Cmd,
+ *args: command.Arg
+ ) -> None:
"""
Bind a shortcut key.
"""
try:
self.master.keymap.add(
key,
- " ".join(command),
+ cmd + " " + " ".join(args),
contexts,
""
)
diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py
index 795b3d8a..6a1f07a9 100644
--- a/mitmproxy/tools/console/statusbar.py
+++ b/mitmproxy/tools/console/statusbar.py
@@ -6,6 +6,7 @@ from mitmproxy.tools.console import common
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import commandeditor
import mitmproxy.tools.console.master # noqa
+from mitmproxy.tools.console.commander import commander
class PromptPath:
@@ -66,7 +67,7 @@ class ActionBar(urwid.WidgetWrap):
def sig_prompt_command(self, sender, partial=""):
signals.focus.send(self, section="footer")
- self._w = commandeditor.CommandEdit(partial)
+ self._w = commander.CommandEdit(self.master, partial)
self.prompting = commandeditor.CommandExecutor(self.master)
def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
@@ -100,7 +101,7 @@ class ActionBar(urwid.WidgetWrap):
elif k in self.onekey:
self.prompt_execute(k)
elif k == "enter":
- self.prompt_execute(self._w.get_edit_text())
+ self.prompt_execute(self._w.get_value())
else:
if common.is_keypress(k):
self._w.keypress(size, k)
diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py
index e1879ba2..76ce2245 100644
--- a/test/mitmproxy/test_command.py
+++ b/test/mitmproxy/test_command.py
@@ -11,19 +11,24 @@ from mitmproxy.utils import typecheck
class TAddon:
+ @command.command("cmd1")
def cmd1(self, foo: str) -> str:
"""cmd1 help"""
return "ret " + foo
+ @command.command("cmd2")
def cmd2(self, foo: str) -> str:
return 99
+ @command.command("cmd3")
def cmd3(self, foo: int) -> int:
return foo
+ @command.command("empty")
def empty(self) -> None:
pass
+ @command.command("varargs")
def varargs(self, one: str, *var: str) -> typing.Sequence[str]:
return list(var)
@@ -34,6 +39,7 @@ class TAddon:
def choose(self, arg: str) -> typing.Sequence[str]:
return ["one", "two", "three"]
+ @command.command("path")
def path(self, arg: command.Path) -> None:
pass
@@ -64,6 +70,44 @@ class TestCommand:
c = command.Command(cm, "cmd.three", a.cmd3)
assert c.call(["1"]) == 1
+ def test_parse_partial(self):
+ tests = [
+ [
+ "foo bar",
+ [
+ command.ParseResult(value = "foo", type = command.Cmd),
+ command.ParseResult(value = "bar", type = str)
+ ],
+ ],
+ [
+ "foo 'bar",
+ [
+ command.ParseResult(value = "foo", type = command.Cmd),
+ command.ParseResult(value = "'bar", type = str)
+ ]
+ ],
+ ["a", [command.ParseResult(value = "a", type = command.Cmd)]],
+ ["", [command.ParseResult(value = "", type = command.Cmd)]],
+ [
+ "cmd3 1",
+ [
+ command.ParseResult(value = "cmd3", type = command.Cmd),
+ command.ParseResult(value = "1", type = int),
+ ]
+ ],
+ [
+ "cmd3 ",
+ [
+ command.ParseResult(value = "cmd3", type = command.Cmd),
+ command.ParseResult(value = "", type = int),
+ ]
+ ],
+ ]
+ with taddons.context() as tctx:
+ tctx.master.addons.add(TAddon())
+ for s, expected in tests:
+ assert tctx.master.commands.parse_partial(s) == expected
+
def test_simple():
with taddons.context() as tctx:
@@ -100,6 +144,7 @@ def test_typename():
assert command.typename(command.Choice("foo"), False) == "choice"
assert command.typename(command.Path, False) == "path"
+ assert command.typename(command.Cmd, False) == "cmd"
class DummyConsole:
@@ -162,6 +207,9 @@ def test_parsearg():
assert command.parsearg(
tctx.master.commands, "foo", command.Path
) == "foo"
+ assert command.parsearg(
+ tctx.master.commands, "foo", command.Cmd
+ ) == "foo"
class TDec:
diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py
new file mode 100644
index 00000000..1ac4c5c6
--- /dev/null
+++ b/test/mitmproxy/tools/console/test_commander.py
@@ -0,0 +1,68 @@
+from mitmproxy.tools.console.commander import commander
+from mitmproxy.test import taddons
+
+
+class TestListCompleter:
+ def test_cycle(self):
+ tests = [
+ [
+ "",
+ ["a", "b", "c"],
+ ["a", "b", "c", "a"]
+ ],
+ [
+ "xxx",
+ ["a", "b", "c"],
+ ["xxx", "xxx", "xxx"]
+ ],
+ [
+ "b",
+ ["a", "b", "ba", "bb", "c"],
+ ["b", "ba", "bb", "b"]
+ ],
+ ]
+ for start, options, cycle in tests:
+ c = commander.ListCompleter(start, options)
+ for expected in cycle:
+ assert c.cycle() == expected
+
+
+class TestCommandBuffer:
+
+ def test_backspace(self):
+ tests = [
+ [("", 0), ("", 0)],
+ [("1", 0), ("1", 0)],
+ [("1", 1), ("", 0)],
+ [("123", 3), ("12", 2)],
+ [("123", 2), ("13", 1)],
+ [("123", 0), ("123", 0)],
+ ]
+ with taddons.context() as tctx:
+ for start, output in tests:
+ cb = commander.CommandBuffer(tctx.master)
+ cb.buf, cb.cursor = start[0], start[1]
+ cb.backspace()
+ assert cb.buf == output[0]
+ assert cb.cursor == output[1]
+
+ def test_insert(self):
+ tests = [
+ [("", 0), ("x", 1)],
+ [("a", 0), ("xa", 1)],
+ [("xa", 2), ("xax", 3)],
+ ]
+ with taddons.context() as tctx:
+ for start, output in tests:
+ cb = commander.CommandBuffer(tctx.master)
+ cb.buf, cb.cursor = start[0], start[1]
+ cb.insert("x")
+ assert cb.buf == output[0]
+ assert cb.cursor == output[1]
+
+ def test_cycle_completion(self):
+ with taddons.context() as tctx:
+ cb = commander.CommandBuffer(tctx.master)
+ cb.buf = "foo bar"
+ cb.cursor = len(cb.buf)
+ cb.cycle_completion()