aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/builtins/replace.py4
-rw-r--r--mitmproxy/console/grideditor.py719
-rw-r--r--mitmproxy/console/grideditor/__init__.py2
-rw-r--r--mitmproxy/console/grideditor/base.py427
-rw-r--r--mitmproxy/console/grideditor/col_bytes.py103
-rw-r--r--mitmproxy/console/grideditor/col_subgrid.py51
-rw-r--r--mitmproxy/console/grideditor/col_text.py55
-rw-r--r--mitmproxy/console/grideditor/editors.py239
-rw-r--r--mitmproxy/console/master.py14
9 files changed, 886 insertions, 728 deletions
diff --git a/mitmproxy/builtins/replace.py b/mitmproxy/builtins/replace.py
index 74d30c05..2c94fbb5 100644
--- a/mitmproxy/builtins/replace.py
+++ b/mitmproxy/builtins/replace.py
@@ -13,8 +13,8 @@ class Replace:
.replacements is a list of tuples (fpat, rex, s):
fpatt: a string specifying a filter pattern.
- rex: a regular expression.
- s: the replacement string
+ rex: a regular expression, as bytes.
+ s: the replacement string, as bytes
"""
lst = []
for fpatt, rex, s in options.replacements:
diff --git a/mitmproxy/console/grideditor.py b/mitmproxy/console/grideditor.py
deleted file mode 100644
index 87700fd7..00000000
--- a/mitmproxy/console/grideditor.py
+++ /dev/null
@@ -1,719 +0,0 @@
-from __future__ import absolute_import, print_function, division
-
-import copy
-import os
-import re
-
-import urwid
-
-from mitmproxy import exceptions
-from mitmproxy import filt
-from mitmproxy.builtins import script
-from mitmproxy.console import common
-from mitmproxy.console import signals
-from netlib import strutils
-from netlib.http import cookies
-from netlib.http import user_agents
-
-FOOTER = [
- ('heading_key', "enter"), ":edit ",
- ('heading_key', "q"), ":back ",
-]
-FOOTER_EDITING = [
- ('heading_key', "esc"), ":stop editing ",
-]
-
-
-class TextColumn:
- subeditor = None
-
- def __init__(self, heading):
- self.heading = heading
-
- def text(self, obj):
- return SEscaped(obj or "")
-
- def blank(self):
- return ""
-
- def keypress(self, key, editor):
- if key == "r":
- if editor.walker.get_current_value() is not None:
- signals.status_prompt_path.send(
- self,
- prompt = "Read file",
- callback = editor.read_file
- )
- elif key == "R":
- if editor.walker.get_current_value() is not None:
- signals.status_prompt_path.send(
- editor,
- prompt = "Read unescaped file",
- callback = editor.read_file,
- args = (True,)
- )
- elif key == "e":
- o = editor.walker.get_current_value()
- if o is not None:
- n = editor.master.spawn_editor(o.encode("string-escape"))
- n = strutils.clean_hanging_newline(n)
- editor.walker.set_current_value(n, False)
- editor.walker._modified()
- elif key in ["enter"]:
- editor.walker.start_edit()
- else:
- return key
-
-
-class SubgridColumn:
-
- def __init__(self, heading, subeditor):
- self.heading = heading
- self.subeditor = subeditor
-
- def text(self, obj):
- p = cookies._format_pairs(obj, sep="\n")
- return urwid.Text(p)
-
- def blank(self):
- return []
-
- def keypress(self, key, editor):
- if key in "rRe":
- signals.status_message.send(
- self,
- message = "Press enter to edit this field.",
- expire = 1000
- )
- return
- elif key in ["enter"]:
- editor.master.view_grideditor(
- self.subeditor(
- editor.master,
- editor.walker.get_current_value(),
- editor.set_subeditor_value,
- editor.walker.focus,
- editor.walker.focus_col
- )
- )
- else:
- return key
-
-
-class SEscaped(urwid.WidgetWrap):
-
- def __init__(self, txt):
- txt = txt.encode("string-escape")
- w = urwid.Text(txt, wrap="any")
- urwid.WidgetWrap.__init__(self, w)
-
- def get_text(self):
- return self._w.get_text()[0]
-
- def keypress(self, size, key):
- return key
-
- def selectable(self):
- return True
-
-
-class SEdit(urwid.WidgetWrap):
-
- def __init__(self, txt):
- txt = txt.encode("string-escape")
- w = urwid.Edit(edit_text=txt, wrap="any", multiline=True)
- w = urwid.AttrWrap(w, "editfield")
- urwid.WidgetWrap.__init__(self, w)
-
- def get_text(self):
- return self._w.get_text()[0].strip()
-
- def selectable(self):
- return True
-
-
-class GridRow(urwid.WidgetWrap):
-
- def __init__(self, focused, editing, editor, values):
- self.focused, self.editing, self.editor = focused, editing, editor
-
- errors = values[1]
- self.fields = []
- for i, v in enumerate(values[0]):
- if focused == i and editing:
- self.editing = SEdit(v)
- self.fields.append(self.editing)
- else:
- w = self.editor.columns[i].text(v)
- if focused == i:
- if i in errors:
- w = urwid.AttrWrap(w, "focusfield_error")
- else:
- w = urwid.AttrWrap(w, "focusfield")
- elif i in errors:
- w = urwid.AttrWrap(w, "field_error")
- self.fields.append(w)
-
- fspecs = self.fields[:]
- if len(self.fields) > 1:
- fspecs[0] = ("fixed", self.editor.first_width + 2, fspecs[0])
- w = urwid.Columns(
- fspecs,
- dividechars = 2
- )
- if focused is not None:
- w.set_focus_column(focused)
- urwid.WidgetWrap.__init__(self, w)
-
- def get_edit_value(self):
- return self.editing.get_text()
-
- def keypress(self, s, k):
- if self.editing:
- w = self._w.column_widths(s)[self.focused]
- k = self.editing.keypress((w,), k)
- return k
-
- def selectable(self):
- return True
-
-
-class GridWalker(urwid.ListWalker):
-
- """
- Stores rows as a list of (rows, errors) tuples, where rows is a list
- and errors is a set with an entry of each offset in rows that is an
- error.
- """
-
- def __init__(self, lst, editor):
- self.lst = [(i, set([])) for i in lst]
- self.editor = editor
- self.focus = 0
- self.focus_col = 0
- self.editing = False
-
- def _modified(self):
- self.editor.show_empty_msg()
- return urwid.ListWalker._modified(self)
-
- def add_value(self, lst):
- self.lst.append((lst[:], set([])))
- self._modified()
-
- def get_current_value(self):
- if self.lst:
- return self.lst[self.focus][0][self.focus_col]
-
- def set_current_value(self, val, unescaped):
- if not unescaped:
- try:
- val = val.decode("string-escape")
- except ValueError:
- signals.status_message.send(
- self,
- message = "Invalid Python-style string encoding.",
- expire = 1000
- )
- return
- errors = self.lst[self.focus][1]
- emsg = self.editor.is_error(self.focus_col, val)
- if emsg:
- signals.status_message.send(message = emsg, expire = 1)
- errors.add(self.focus_col)
- else:
- errors.discard(self.focus_col)
- self.set_value(val, self.focus, self.focus_col, errors)
-
- def set_value(self, val, focus, focus_col, errors=None):
- if not errors:
- errors = set([])
- row = list(self.lst[focus][0])
- row[focus_col] = val
- self.lst[focus] = [tuple(row), errors]
- self._modified()
-
- def delete_focus(self):
- if self.lst:
- del self.lst[self.focus]
- self.focus = min(len(self.lst) - 1, self.focus)
- self._modified()
-
- def _insert(self, pos):
- self.focus = pos
- self.lst.insert(
- self.focus,
- [
- [c.blank() for c in self.editor.columns], set([])
- ]
- )
- self.focus_col = 0
- self.start_edit()
-
- def insert(self):
- return self._insert(self.focus)
-
- def add(self):
- return self._insert(min(self.focus + 1, len(self.lst)))
-
- def start_edit(self):
- col = self.editor.columns[self.focus_col]
- if self.lst and not col.subeditor:
- self.editing = GridRow(
- self.focus_col, True, self.editor, self.lst[self.focus]
- )
- self.editor.master.loop.widget.footer.update(FOOTER_EDITING)
- self._modified()
-
- def stop_edit(self):
- if self.editing:
- self.editor.master.loop.widget.footer.update(FOOTER)
- self.set_current_value(self.editing.get_edit_value(), False)
- self.editing = False
- self._modified()
-
- def left(self):
- self.focus_col = max(self.focus_col - 1, 0)
- self._modified()
-
- def right(self):
- self.focus_col = min(self.focus_col + 1, len(self.editor.columns) - 1)
- self._modified()
-
- def tab_next(self):
- self.stop_edit()
- if self.focus_col < len(self.editor.columns) - 1:
- self.focus_col += 1
- elif self.focus != len(self.lst) - 1:
- self.focus_col = 0
- self.focus += 1
- self._modified()
-
- def get_focus(self):
- if self.editing:
- return self.editing, self.focus
- elif self.lst:
- return GridRow(
- self.focus_col,
- False,
- self.editor,
- self.lst[self.focus]
- ), self.focus
- else:
- return None, None
-
- def set_focus(self, focus):
- self.stop_edit()
- self.focus = focus
- self._modified()
-
- def get_next(self, pos):
- if pos + 1 >= len(self.lst):
- return None, None
- return GridRow(None, False, self.editor, self.lst[pos + 1]), pos + 1
-
- def get_prev(self, pos):
- if pos - 1 < 0:
- return None, None
- return GridRow(None, False, self.editor, self.lst[pos - 1]), pos - 1
-
-
-class GridListBox(urwid.ListBox):
-
- def __init__(self, lw):
- urwid.ListBox.__init__(self, lw)
-
-
-FIRST_WIDTH_MAX = 40
-FIRST_WIDTH_MIN = 20
-
-
-class GridEditor(urwid.WidgetWrap):
- title = None
- columns = None
-
- def __init__(self, master, value, callback, *cb_args, **cb_kwargs):
- value = self.data_in(copy.deepcopy(value))
- self.master, self.value, self.callback = master, value, callback
- self.cb_args, self.cb_kwargs = cb_args, cb_kwargs
-
- first_width = 20
- if value:
- for r in value:
- assert len(r) == len(self.columns)
- first_width = max(len(r), first_width)
- self.first_width = min(first_width, FIRST_WIDTH_MAX)
-
- title = urwid.Text(self.title)
- title = urwid.Padding(title, align="left", width=("relative", 100))
- title = urwid.AttrWrap(title, "heading")
-
- headings = []
- for i, col in enumerate(self.columns):
- c = urwid.Text(col.heading)
- if i == 0 and len(self.columns) > 1:
- headings.append(("fixed", first_width + 2, c))
- else:
- headings.append(c)
- h = urwid.Columns(
- headings,
- dividechars = 2
- )
- h = urwid.AttrWrap(h, "heading")
-
- self.walker = GridWalker(self.value, self)
- self.lb = GridListBox(self.walker)
- self._w = urwid.Frame(
- self.lb,
- header = urwid.Pile([title, h])
- )
- self.master.loop.widget.footer.update("")
- self.show_empty_msg()
-
- def show_empty_msg(self):
- if self.walker.lst:
- self._w.set_footer(None)
- else:
- self._w.set_footer(
- urwid.Text(
- [
- ("highlight", "No values. Press "),
- ("key", "a"),
- ("highlight", " to add some."),
- ]
- )
- )
-
- def encode(self, s):
- if not self.encoding:
- return s
- try:
- return s.encode(self.encoding)
- except ValueError:
- return None
-
- def read_file(self, p, unescaped=False):
- if p:
- try:
- p = os.path.expanduser(p)
- d = open(p, "rb").read()
- self.walker.set_current_value(d, unescaped)
- self.walker._modified()
- except IOError as v:
- return str(v)
-
- def set_subeditor_value(self, val, focus, focus_col):
- self.walker.set_value(val, focus, focus_col)
-
- def keypress(self, size, key):
- if self.walker.editing:
- if key in ["esc"]:
- self.walker.stop_edit()
- elif key == "tab":
- pf, pfc = self.walker.focus, self.walker.focus_col
- self.walker.tab_next()
- if self.walker.focus == pf and self.walker.focus_col != pfc:
- self.walker.start_edit()
- else:
- self._w.keypress(size, key)
- return None
-
- key = common.shortcuts(key)
- column = self.columns[self.walker.focus_col]
- if key in ["q", "esc"]:
- res = []
- for i in self.walker.lst:
- if not i[1] and any([x for x in i[0]]):
- res.append(i[0])
- self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs)
- signals.pop_view_state.send(self)
- elif key == "g":
- self.walker.set_focus(0)
- elif key == "G":
- self.walker.set_focus(len(self.walker.lst) - 1)
- elif key in ["h", "left"]:
- self.walker.left()
- elif key in ["l", "right"]:
- self.walker.right()
- elif key == "tab":
- self.walker.tab_next()
- elif key == "a":
- self.walker.add()
- elif key == "A":
- self.walker.insert()
- elif key == "d":
- self.walker.delete_focus()
- elif column.keypress(key, self) and not self.handle_key(key):
- return self._w.keypress(size, key)
-
- def data_out(self, data):
- """
- Called on raw list data, before data is returned through the
- callback.
- """
- return data
-
- def data_in(self, data):
- """
- Called to prepare provided data.
- """
- return data
-
- def is_error(self, col, val):
- """
- Return False, or a string error message.
- """
- return False
-
- def handle_key(self, key):
- return False
-
- def make_help(self):
- text = []
- text.append(urwid.Text([("text", "Editor control:\n")]))
- keys = [
- ("A", "insert row before cursor"),
- ("a", "add row after cursor"),
- ("d", "delete row"),
- ("e", "spawn external editor on current field"),
- ("q", "save changes and exit editor"),
- ("r", "read value from file"),
- ("R", "read unescaped value from file"),
- ("esc", "save changes and exit editor"),
- ("tab", "next field"),
- ("enter", "edit field"),
- ]
- text.extend(
- common.format_keyvals(keys, key="key", val="text", indent=4)
- )
- text.append(
- urwid.Text(
- [
- "\n",
- ("text", "Values are escaped Python-style strings.\n"),
- ]
- )
- )
- return text
-
-
-class QueryEditor(GridEditor):
- title = "Editing query"
- columns = [
- TextColumn("Key"),
- TextColumn("Value")
- ]
-
-
-class HeaderEditor(GridEditor):
- title = "Editing headers"
- columns = [
- TextColumn("Key"),
- TextColumn("Value")
- ]
-
- def make_help(self):
- h = GridEditor.make_help(self)
- text = []
- text.append(urwid.Text([("text", "Special keys:\n")]))
- keys = [
- ("U", "add User-Agent header"),
- ]
- text.extend(
- common.format_keyvals(keys, key="key", val="text", indent=4)
- )
- text.append(urwid.Text([("text", "\n")]))
- text.extend(h)
- return text
-
- def set_user_agent(self, k):
- ua = user_agents.get_by_shortcut(k)
- if ua:
- self.walker.add_value(
- [
- "User-Agent",
- ua[2]
- ]
- )
-
- def handle_key(self, key):
- if key == "U":
- signals.status_prompt_onekey.send(
- prompt = "Add User-Agent header:",
- keys = [(i[0], i[1]) for i in user_agents.UASTRINGS],
- callback = self.set_user_agent,
- )
- return True
-
-
-class URLEncodedFormEditor(GridEditor):
- title = "Editing URL-encoded form"
- columns = [
- TextColumn("Key"),
- TextColumn("Value")
- ]
-
-
-class ReplaceEditor(GridEditor):
- title = "Editing replacement patterns"
- columns = [
- TextColumn("Filter"),
- TextColumn("Regex"),
- TextColumn("Replacement"),
- ]
-
- def is_error(self, col, val):
- if col == 0:
- if not filt.parse(val):
- return "Invalid filter specification."
- elif col == 1:
- try:
- re.compile(val)
- except re.error:
- return "Invalid regular expression."
- return False
-
-
-class SetHeadersEditor(GridEditor):
- title = "Editing header set patterns"
- columns = [
- TextColumn("Filter"),
- TextColumn("Header"),
- TextColumn("Value"),
- ]
-
- def is_error(self, col, val):
- if col == 0:
- if not filt.parse(val):
- return "Invalid filter specification"
- return False
-
- def make_help(self):
- h = GridEditor.make_help(self)
- text = []
- text.append(urwid.Text([("text", "Special keys:\n")]))
- keys = [
- ("U", "add User-Agent header"),
- ]
- text.extend(
- common.format_keyvals(keys, key="key", val="text", indent=4)
- )
- text.append(urwid.Text([("text", "\n")]))
- text.extend(h)
- return text
-
- def set_user_agent(self, k):
- ua = user_agents.get_by_shortcut(k)
- if ua:
- self.walker.add_value(
- [
- ".*",
- "User-Agent",
- ua[2]
- ]
- )
-
- def handle_key(self, key):
- if key == "U":
- signals.status_prompt_onekey.send(
- prompt = "Add User-Agent header:",
- keys = [(i[0], i[1]) for i in user_agents.UASTRINGS],
- callback = self.set_user_agent,
- )
- return True
-
-
-class PathEditor(GridEditor):
- title = "Editing URL path components"
- columns = [
- TextColumn("Component"),
- ]
-
- def data_in(self, data):
- return [[i] for i in data]
-
- def data_out(self, data):
- return [i[0] for i in data]
-
-
-class ScriptEditor(GridEditor):
- title = "Editing scripts"
- columns = [
- TextColumn("Command"),
- ]
-
- def is_error(self, col, val):
- try:
- script.parse_command(val)
- except exceptions.AddonError as e:
- return str(e)
-
-
-class HostPatternEditor(GridEditor):
- title = "Editing host patterns"
- columns = [
- TextColumn("Regex (matched on hostname:port / ip:port)")
- ]
-
- def is_error(self, col, val):
- try:
- re.compile(val, re.IGNORECASE)
- except re.error as e:
- return "Invalid regex: %s" % str(e)
-
- def data_in(self, data):
- return [[i] for i in data]
-
- def data_out(self, data):
- return [i[0] for i in data]
-
-
-class CookieEditor(GridEditor):
- title = "Editing request Cookie header"
- columns = [
- TextColumn("Name"),
- TextColumn("Value"),
- ]
-
-
-class CookieAttributeEditor(GridEditor):
- title = "Editing Set-Cookie attributes"
- columns = [
- TextColumn("Name"),
- TextColumn("Value"),
- ]
-
- def data_out(self, data):
- ret = []
- for i in data:
- if not i[1]:
- ret.append([i[0], None])
- else:
- ret.append(i)
- return ret
-
-
-class SetCookieEditor(GridEditor):
- title = "Editing response SetCookie header"
- columns = [
- TextColumn("Name"),
- TextColumn("Value"),
- SubgridColumn("Attributes", CookieAttributeEditor),
- ]
-
- def data_in(self, data):
- flattened = []
- for key, (value, attrs) in data:
- flattened.append([key, value, attrs.items(multi=True)])
- return flattened
-
- def data_out(self, data):
- vals = []
- for key, value, attrs in data:
- vals.append(
- [
- key,
- (value, attrs)
- ]
- )
- return vals
diff --git a/mitmproxy/console/grideditor/__init__.py b/mitmproxy/console/grideditor/__init__.py
new file mode 100644
index 00000000..894f3d22
--- /dev/null
+++ b/mitmproxy/console/grideditor/__init__.py
@@ -0,0 +1,2 @@
+from .editors import * # noqa
+from . import base # noqa
diff --git a/mitmproxy/console/grideditor/base.py b/mitmproxy/console/grideditor/base.py
new file mode 100644
index 00000000..68a0f86a
--- /dev/null
+++ b/mitmproxy/console/grideditor/base.py
@@ -0,0 +1,427 @@
+from __future__ import absolute_import, print_function, division
+import abc
+import copy
+
+import six
+import urwid
+from mitmproxy.console import common
+from mitmproxy.console import signals
+
+from typing import Any # noqa
+from typing import Callable # noqa
+from typing import Container # noqa
+from typing import Iterable # noqa
+from typing import Optional # noqa
+from typing import Sequence # noqa
+from typing import Tuple # noqa
+
+FOOTER = [
+ ('heading_key', "enter"), ":edit ",
+ ('heading_key', "q"), ":back ",
+]
+FOOTER_EDITING = [
+ ('heading_key', "esc"), ":stop editing ",
+]
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Column(object):
+ subeditor = None
+
+ def __init__(self, heading):
+ self.heading = heading
+
+ @abc.abstractmethod
+ def Display(self, data):
+ # type: () -> Cell
+ pass
+
+ @abc.abstractmethod
+ def Edit(self, data):
+ # type: () -> Cell
+ pass
+
+ @abc.abstractmethod
+ def blank(self):
+ # type: () -> Any
+ pass
+
+ def keypress(self, key, editor):
+ # type: (str, GridEditor) -> Optional[str]
+ return key
+
+
+class Cell(urwid.WidgetWrap):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def get_data(self):
+ """
+ Raises:
+ ValueError, if the current content is invalid.
+ """
+ pass
+
+ def selectable(self):
+ return True
+
+
+class GridRow(urwid.WidgetWrap):
+ def __init__(
+ self,
+ focused, # type: Optional[int]
+ editing, # type: bool
+ editor, # type: GridEditor
+ values # type: Tuple[Iterable[bytes], Container[int]
+ ):
+ self.focused = focused
+ self.editor = editor
+ self.edit_col = None # type: Optional[Cell]
+
+ errors = values[1]
+ self.fields = []
+ for i, v in enumerate(values[0]):
+ if focused == i and editing:
+ self.edit_col = self.editor.columns[i].Edit(v)
+ self.fields.append(self.edit_col)
+ else:
+ w = self.editor.columns[i].Display(v)
+ if focused == i:
+ if i in errors:
+ w = urwid.AttrWrap(w, "focusfield_error")
+ else:
+ w = urwid.AttrWrap(w, "focusfield")
+ elif i in errors:
+ w = urwid.AttrWrap(w, "field_error")
+ self.fields.append(w)
+
+ fspecs = self.fields[:]
+ if len(self.fields) > 1:
+ fspecs[0] = ("fixed", self.editor.first_width + 2, fspecs[0])
+ w = urwid.Columns(
+ fspecs,
+ dividechars=2
+ )
+ if focused is not None:
+ w.set_focus_column(focused)
+ super(GridRow, self).__init__(w)
+
+ def keypress(self, s, k):
+ if self.edit_col:
+ w = self._w.column_widths(s)[self.focused]
+ k = self.edit_col.keypress((w,), k)
+ return k
+
+ def selectable(self):
+ return True
+
+
+class GridWalker(urwid.ListWalker):
+ """
+ Stores rows as a list of (rows, errors) tuples, where rows is a list
+ and errors is a set with an entry of each offset in rows that is an
+ error.
+ """
+
+ def __init__(
+ self,
+ lst, # type: Iterable[list]
+ editor # type: GridEditor
+ ):
+ self.lst = [(i, set()) for i in lst]
+ self.editor = editor
+ self.focus = 0
+ self.focus_col = 0
+ self.edit_row = None # type: Optional[GridRow]
+
+ def _modified(self):
+ self.editor.show_empty_msg()
+ return super(GridWalker, self)._modified()
+
+ def add_value(self, lst):
+ self.lst.append(
+ (lst[:], set())
+ )
+ self._modified()
+
+ def get_current_value(self):
+ if self.lst:
+ return self.lst[self.focus][0][self.focus_col]
+
+ def set_current_value(self, val):
+ errors = self.lst[self.focus][1]
+ emsg = self.editor.is_error(self.focus_col, val)
+ if emsg:
+ signals.status_message.send(message=emsg, expire=5)
+ errors.add(self.focus_col)
+ else:
+ errors.discard(self.focus_col)
+ self.set_value(val, self.focus, self.focus_col, errors)
+
+ def set_value(self, val, focus, focus_col, errors=None):
+ if not errors:
+ errors = set([])
+ row = list(self.lst[focus][0])
+ row[focus_col] = val
+ self.lst[focus] = [tuple(row), errors]
+ self._modified()
+
+ def delete_focus(self):
+ if self.lst:
+ del self.lst[self.focus]
+ self.focus = min(len(self.lst) - 1, self.focus)
+ self._modified()
+
+ def _insert(self, pos):
+ self.focus = pos
+ self.lst.insert(
+ self.focus,
+ ([c.blank() for c in self.editor.columns], set([]))
+ )
+ self.focus_col = 0
+ self.start_edit()
+
+ def insert(self):
+ return self._insert(self.focus)
+
+ def add(self):
+ return self._insert(min(self.focus + 1, len(self.lst)))
+
+ def start_edit(self):
+ col = self.editor.columns[self.focus_col]
+ if self.lst and not col.subeditor:
+ self.edit_row = GridRow(
+ self.focus_col, True, self.editor, self.lst[self.focus]
+ )
+ self.editor.master.loop.widget.footer.update(FOOTER_EDITING)
+ self._modified()
+
+ def stop_edit(self):
+ if self.edit_row:
+ self.editor.master.loop.widget.footer.update(FOOTER)
+ try:
+ val = self.edit_row.edit_col.get_data()
+ except ValueError:
+ return
+ self.edit_row = None
+ self.set_current_value(val)
+
+ def left(self):
+ self.focus_col = max(self.focus_col - 1, 0)
+ self._modified()
+
+ def right(self):
+ self.focus_col = min(self.focus_col + 1, len(self.editor.columns) - 1)
+ self._modified()
+
+ def tab_next(self):
+ self.stop_edit()
+ if self.focus_col < len(self.editor.columns) - 1:
+ self.focus_col += 1
+ elif self.focus != len(self.lst) - 1:
+ self.focus_col = 0
+ self.focus += 1
+ self._modified()
+
+ def get_focus(self):
+ if self.edit_row:
+ return self.edit_row, self.focus
+ elif self.lst:
+ return GridRow(
+ self.focus_col,
+ False,
+ self.editor,
+ self.lst[self.focus]
+ ), self.focus
+ else:
+ return None, None
+
+ def set_focus(self, focus):
+ self.stop_edit()
+ self.focus = focus
+ self._modified()
+
+ def get_next(self, pos):
+ if pos + 1 >= len(self.lst):
+ return None, None
+ return GridRow(None, False, self.editor, self.lst[pos + 1]), pos + 1
+
+ def get_prev(self, pos):
+ if pos - 1 < 0:
+ return None, None
+ return GridRow(None, False, self.editor, self.lst[pos - 1]), pos - 1
+
+
+class GridListBox(urwid.ListBox):
+ def __init__(self, lw):
+ super(GridListBox, self).__init__(lw)
+
+
+FIRST_WIDTH_MAX = 40
+FIRST_WIDTH_MIN = 20
+
+
+class GridEditor(urwid.WidgetWrap):
+ title = None # type: str
+ columns = None # type: Sequence[Column]
+
+ def __init__(
+ self,
+ master, # type: "mitmproxy.console.master.ConsoleMaster"
+ value, # type: Any
+ callback, # type: Callable[..., None]
+ *cb_args,
+ **cb_kwargs
+ ):
+ value = self.data_in(copy.deepcopy(value))
+ self.master = master
+ self.value = value
+ self.callback = callback
+ self.cb_args = cb_args
+ self.cb_kwargs = cb_kwargs
+
+ first_width = 20
+ if value:
+ for r in value:
+ assert len(r) == len(self.columns)
+ first_width = max(len(r), first_width)
+ self.first_width = min(first_width, FIRST_WIDTH_MAX)
+
+ title = urwid.Text(self.title)
+ title = urwid.Padding(title, align="left", width=("relative", 100))
+ title = urwid.AttrWrap(title, "heading")
+
+ headings = []
+ for i, col in enumerate(self.columns):
+ c = urwid.Text(col.heading)
+ if i == 0 and len(self.columns) > 1:
+ headings.append(("fixed", first_width + 2, c))
+ else:
+ headings.append(c)
+ h = urwid.Columns(
+ headings,
+ dividechars=2
+ )
+ h = urwid.AttrWrap(h, "heading")
+
+ self.walker = GridWalker(self.value, self)
+ self.lb = GridListBox(self.walker)
+ w = urwid.Frame(
+ self.lb,
+ header=urwid.Pile([title, h])
+ )
+ super(GridEditor, self).__init__(w)
+ self.master.loop.widget.footer.update("")
+ self.show_empty_msg()
+
+ def show_empty_msg(self):
+ if self.walker.lst:
+ self._w.set_footer(None)
+ else:
+ self._w.set_footer(
+ urwid.Text(
+ [
+ ("highlight", "No values. Press "),
+ ("key", "a"),
+ ("highlight", " to add some."),
+ ]
+ )
+ )
+
+ def set_subeditor_value(self, val, focus, focus_col):
+ self.walker.set_value(val, focus, focus_col)
+
+ def keypress(self, size, key):
+ if self.walker.edit_row:
+ if key in ["esc"]:
+ self.walker.stop_edit()
+ elif key == "tab":
+ pf, pfc = self.walker.focus, self.walker.focus_col
+ self.walker.tab_next()
+ if self.walker.focus == pf and self.walker.focus_col != pfc:
+ self.walker.start_edit()
+ else:
+ self._w.keypress(size, key)
+ return None
+
+ key = common.shortcuts(key)
+ column = self.columns[self.walker.focus_col]
+ if key in ["q", "esc"]:
+ res = []
+ for i in self.walker.lst:
+ if not i[1] and any([x for x in i[0]]):
+ res.append(i[0])
+ self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs)
+ signals.pop_view_state.send(self)
+ elif key == "g":
+ self.walker.set_focus(0)
+ elif key == "G":
+ self.walker.set_focus(len(self.walker.lst) - 1)
+ elif key in ["h", "left"]:
+ self.walker.left()
+ elif key in ["l", "right"]:
+ self.walker.right()
+ elif key == "tab":
+ self.walker.tab_next()
+ elif key == "a":
+ self.walker.add()
+ elif key == "A":
+ self.walker.insert()
+ elif key == "d":
+ self.walker.delete_focus()
+ elif column.keypress(key, self) and not self.handle_key(key):
+ return self._w.keypress(size, key)
+
+ def data_out(self, data):
+ # type: (Sequence[list]) -> Any
+ """
+ Called on raw list data, before data is returned through the
+ callback.
+ """
+ return data
+
+ def data_in(self, data):
+ # type: (Any) -> Iterable[list]
+ """
+ Called to prepare provided data.
+ """
+ return data
+
+ def is_error(self, col, val):
+ # type: (int, Any) -> Optional[str]
+ """
+ Return None, or a string error message.
+ """
+ return False
+
+ def handle_key(self, key):
+ return False
+
+ def make_help(self):
+ text = [
+ urwid.Text([("text", "Editor control:\n")])
+ ]
+ keys = [
+ ("A", "insert row before cursor"),
+ ("a", "add row after cursor"),
+ ("d", "delete row"),
+ ("e", "spawn external editor on current field"),
+ ("q", "save changes and exit editor"),
+ ("r", "read value from file"),
+ ("R", "read unescaped value from file"),
+ ("esc", "save changes and exit editor"),
+ ("tab", "next field"),
+ ("enter", "edit field"),
+ ]
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
+ text.append(
+ urwid.Text(
+ [
+ "\n",
+ ("text", "Values are escaped Python-style strings.\n"),
+ ]
+ )
+ )
+ return text
diff --git a/mitmproxy/console/grideditor/col_bytes.py b/mitmproxy/console/grideditor/col_bytes.py
new file mode 100644
index 00000000..51bbb6cb
--- /dev/null
+++ b/mitmproxy/console/grideditor/col_bytes.py
@@ -0,0 +1,103 @@
+from __future__ import absolute_import, print_function, division
+
+import os
+
+import urwid
+from mitmproxy.console import signals
+from mitmproxy.console.grideditor import base
+from netlib import strutils
+
+
+def read_file(filename, callback, escaped):
+ # type: (str, Callable[...,None], bool) -> Optional[str]
+ if not filename:
+ return
+
+ filename = os.path.expanduser(filename)
+ try:
+ with open(filename, "r" if escaped else "rb") as f:
+ d = f.read()
+ except IOError as v:
+ return str(v)
+
+ if escaped:
+ try:
+ d = strutils.escaped_str_to_bytes(d)
+ except ValueError:
+ return "Invalid Python-style string encoding."
+ # TODO: Refactor the status_prompt_path signal so that we
+ # can raise exceptions here and return the content instead.
+ callback(d)
+
+
+class Column(base.Column):
+ def Display(self, data):
+ return Display(data)
+
+ def Edit(self, data):
+ return Edit(data)
+
+ def blank(self):
+ return b""
+
+ def keypress(self, key, editor):
+ if key == "r":
+ if editor.walker.get_current_value() is not None:
+ signals.status_prompt_path.send(
+ self,
+ prompt="Read file",
+ callback=read_file,
+ args=(editor.walker.set_current_value, True)
+ )
+ elif key == "R":
+ if editor.walker.get_current_value() is not None:
+ signals.status_prompt_path.send(
+ self,
+ prompt="Read unescaped file",
+ callback=read_file,
+ args=(editor.walker.set_current_value, False)
+ )
+ elif key == "e":
+ o = editor.walker.get_current_value()
+ if o is not None:
+ n = editor.master.spawn_editor(o)
+ n = strutils.clean_hanging_newline(n)
+ editor.walker.set_current_value(n)
+ elif key in ["enter"]:
+ editor.walker.start_edit()
+ else:
+ return key
+
+
+class Display(base.Cell):
+ def __init__(self, data):
+ # type: (bytes) -> Display
+ self.data = data
+ escaped = strutils.bytes_to_escaped_str(data)
+ w = urwid.Text(escaped, wrap="any")
+ super(Display, self).__init__(w)
+
+ def get_data(self):
+ return self.data
+
+
+class Edit(base.Cell):
+ def __init__(self, data):
+ # type: (bytes) -> Edit
+ data = strutils.bytes_to_escaped_str(data)
+ w = urwid.Edit(edit_text=data, wrap="any", multiline=True)
+ w = urwid.AttrWrap(w, "editfield")
+ super(Edit, self).__init__(w)
+
+ def get_data(self):
+ # type: () -> bytes
+ txt = self._w.get_text()[0].strip()
+ try:
+ return strutils.escaped_str_to_bytes(txt)
+ except ValueError:
+ signals.status_message.send(
+ self,
+ message="Invalid Python-style string encoding.",
+ expire=1000
+ )
+ raise
diff --git a/mitmproxy/console/grideditor/col_subgrid.py b/mitmproxy/console/grideditor/col_subgrid.py
new file mode 100644
index 00000000..1dec8032
--- /dev/null
+++ b/mitmproxy/console/grideditor/col_subgrid.py
@@ -0,0 +1,51 @@
+from __future__ import absolute_import, print_function, division
+import urwid
+from mitmproxy.console.grideditor import base
+from mitmproxy.console import signals
+from netlib.http import cookies
+
+
+class Column(base.Column):
+ def __init__(self, heading, subeditor):
+ super(Column, self).__init__(heading)
+ self.subeditor = subeditor
+
+ def Edit(self, data):
+ raise RuntimeError("SubgridColumn should handle edits itself")
+
+ def Display(self, data):
+ return Display(data)
+
+ def blank(self):
+ return []
+
+ def keypress(self, key, editor):
+ if key in "rRe":
+ signals.status_message.send(
+ self,
+ message="Press enter to edit this field.",
+ expire=1000
+ )
+ return
+ elif key in ["enter"]:
+ editor.master.view_grideditor(
+ self.subeditor(
+ editor.master,
+ editor.walker.get_current_value(),
+ editor.set_subeditor_value,
+ editor.walker.focus,
+ editor.walker.focus_col
+ )
+ )
+ else:
+ return key
+
+
+class Display(base.Cell):
+ def __init__(self, data):
+ p = cookies._format_pairs(data, sep="\n")
+ w = urwid.Text(p)
+ super(Display, self).__init__(w)
+
+ def get_data(self):
+ pass
diff --git a/mitmproxy/console/grideditor/col_text.py b/mitmproxy/console/grideditor/col_text.py
new file mode 100644
index 00000000..d60dc854
--- /dev/null
+++ b/mitmproxy/console/grideditor/col_text.py
@@ -0,0 +1,55 @@
+"""
+Welcome to the encoding dance!
+
+In a nutshell, text columns are actually a proxy class for byte columns,
+which just encode/decodes contents.
+"""
+from __future__ import absolute_import, print_function, division
+
+from mitmproxy.console import signals
+from mitmproxy.console.grideditor import col_bytes
+
+
+class Column(col_bytes.Column):
+ def __init__(self, heading, encoding="utf8", errors="surrogateescape"):
+ super(Column, self).__init__(heading)
+ self.encoding_args = encoding, errors
+
+ def Display(self, data):
+ return TDisplay(data, self.encoding_args)
+
+ def Edit(self, data):
+ return TEdit(data, self.encoding_args)
+
+ def blank(self):
+ return u""
+
+
+# This is the same for both edit and display.
+class EncodingMixin(object):
+ def __init__(self, data, encoding_args):
+ # type: (str) -> TDisplay
+ self.encoding_args = encoding_args
+ data = data.encode(*self.encoding_args)
+ super(EncodingMixin, self).__init__(data)
+
+ def get_data(self):
+ data = super(EncodingMixin, self).get_data()
+ try:
+ return data.decode(*self.encoding_args)
+ except ValueError:
+ signals.status_message.send(
+ self,
+ message="Invalid encoding.",
+ expire=1000
+ )
+ raise
+
+
+# urwid forces a different name for a subclass.
+class TDisplay(EncodingMixin, col_bytes.Display):
+ pass
+
+
+class TEdit(EncodingMixin, col_bytes.Edit):
+ pass
diff --git a/mitmproxy/console/grideditor/editors.py b/mitmproxy/console/grideditor/editors.py
new file mode 100644
index 00000000..80f0541b
--- /dev/null
+++ b/mitmproxy/console/grideditor/editors.py
@@ -0,0 +1,239 @@
+from __future__ import absolute_import, print_function, division
+import re
+import urwid
+from mitmproxy import filt
+from mitmproxy.builtins import script
+from mitmproxy import exceptions
+from mitmproxy.console import common
+from mitmproxy.console.grideditor import base
+from mitmproxy.console.grideditor import col_bytes
+from mitmproxy.console.grideditor import col_text
+from mitmproxy.console.grideditor import col_subgrid
+from mitmproxy.console import signals
+from netlib.http import user_agents
+
+
+class QueryEditor(base.GridEditor):
+ title = "Editing query"
+ columns = [
+ col_text.Column("Key"),
+ col_text.Column("Value")
+ ]
+
+
+class HeaderEditor(base.GridEditor):
+ title = "Editing headers"
+ columns = [
+ col_bytes.Column("Key"),
+ col_bytes.Column("Value")
+ ]
+
+ def make_help(self):
+ h = super(HeaderEditor, self).make_help()
+ text = [
+ urwid.Text([("text", "Special keys:\n")])
+ ]
+ keys = [
+ ("U", "add User-Agent header"),
+ ]
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
+ text.append(urwid.Text([("text", "\n")]))
+ text.extend(h)
+ return text
+
+ def set_user_agent(self, k):
+ ua = user_agents.get_by_shortcut(k)
+ if ua:
+ self.walker.add_value(
+ [
+ b"User-Agent",
+ ua[2].encode()
+ ]
+ )
+
+ def handle_key(self, key):
+ if key == "U":
+ signals.status_prompt_onekey.send(
+ prompt="Add User-Agent header:",
+ keys=[(i[0], i[1]) for i in user_agents.UASTRINGS],
+ callback=self.set_user_agent,
+ )
+ return True
+
+
+class URLEncodedFormEditor(base.GridEditor):
+ title = "Editing URL-encoded form"
+ columns = [
+ col_bytes.Column("Key"),
+ col_bytes.Column("Value")
+ ]
+
+
+class ReplaceEditor(base.GridEditor):
+ title = "Editing replacement patterns"
+ columns = [
+ col_text.Column("Filter"),
+ col_bytes.Column("Regex"),
+ col_bytes.Column("Replacement"),
+ ]
+
+ def is_error(self, col, val):
+ if col == 0:
+ if not filt.parse(val):
+ return "Invalid filter specification."
+ elif col == 1:
+ try:
+ re.compile(val)
+ except re.error:
+ return "Invalid regular expression."
+ return False
+
+
+class SetHeadersEditor(base.GridEditor):
+ title = "Editing header set patterns"
+ columns = [
+ col_text.Column("Filter"),
+ col_bytes.Column("Header"),
+ col_bytes.Column("Value"),
+ ]
+
+ def is_error(self, col, val):
+ if col == 0:
+ if not filt.parse(val):
+ return "Invalid filter specification"
+ return False
+
+ def make_help(self):
+ h = super(SetHeadersEditor, self).make_help()
+ text = [
+ urwid.Text([("text", "Special keys:\n")])
+ ]
+ keys = [
+ ("U", "add User-Agent header"),
+ ]
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
+ text.append(urwid.Text([("text", "\n")]))
+ text.extend(h)
+ return text
+
+ def set_user_agent(self, k):
+ ua = user_agents.get_by_shortcut(k)
+ if ua:
+ self.walker.add_value(
+ [
+ ".*",
+ b"User-Agent",
+ ua[2].encode()
+ ]
+ )
+
+ def handle_key(self, key):
+ if key == "U":
+ signals.status_prompt_onekey.send(
+ prompt="Add User-Agent header:",
+ keys=[(i[0], i[1]) for i in user_agents.UASTRINGS],
+ callback=self.set_user_agent,
+ )
+ return True
+
+
+class PathEditor(base.GridEditor):
+ # TODO: Next row on enter?
+
+ title = "Editing URL path components"
+ columns = [
+ col_text.Column("Component"),
+ ]
+
+ def data_in(self, data):
+ return [[i] for i in data]
+
+ def data_out(self, data):
+ return [i[0] for i in data]
+
+
+class ScriptEditor(base.GridEditor):
+ title = "Editing scripts"
+ columns = [
+ col_text.Column("Command"),
+ ]
+
+ def is_error(self, col, val):
+ try:
+ script.parse_command(val)
+ except exceptions.AddonError as e:
+ return str(e)
+
+
+class HostPatternEditor(base.GridEditor):
+ title = "Editing host patterns"
+ columns = [
+ col_text.Column("Regex (matched on hostname:port / ip:port)")
+ ]
+
+ def is_error(self, col, val):
+ try:
+ re.compile(val, re.IGNORECASE)
+ except re.error as e:
+ return "Invalid regex: %s" % str(e)
+
+ def data_in(self, data):
+ return [[i] for i in data]
+
+ def data_out(self, data):
+ return [i[0] for i in data]
+
+
+class CookieEditor(base.GridEditor):
+ title = "Editing request Cookie header"
+ columns = [
+ col_text.Column("Name"),
+ col_text.Column("Value"),
+ ]
+
+
+class CookieAttributeEditor(base.GridEditor):
+ title = "Editing Set-Cookie attributes"
+ columns = [
+ col_text.Column("Name"),
+ col_text.Column("Value"),
+ ]
+
+ def data_out(self, data):
+ ret = []
+ for i in data:
+ if not i[1]:
+ ret.append([i[0], None])
+ else:
+ ret.append(i)
+ return ret
+
+
+class SetCookieEditor(base.GridEditor):
+ title = "Editing response SetCookie header"
+ columns = [
+ col_text.Column("Name"),
+ col_text.Column("Value"),
+ col_subgrid.Column("Attributes", CookieAttributeEditor),
+ ]
+
+ def data_in(self, data):
+ flattened = []
+ for key, (value, attrs) in data:
+ flattened.append([key, value, attrs.items(multi=True)])
+ return flattened
+
+ def data_out(self, data):
+ vals = []
+ for key, value, attrs in data:
+ vals.append(
+ [
+ key,
+ (value, attrs)
+ ]
+ )
+ return vals
diff --git a/mitmproxy/console/master.py b/mitmproxy/console/master.py
index db414147..ad46cbb4 100644
--- a/mitmproxy/console/master.py
+++ b/mitmproxy/console/master.py
@@ -390,13 +390,12 @@ class ConsoleMaster(flow.FlowMaster):
)
def spawn_editor(self, data):
- fd, name = tempfile.mkstemp('', "mproxy")
+ text = not isinstance(data, bytes)
+ fd, name = tempfile.mkstemp('', "mproxy", text=text)
os.write(fd, data)
os.close(fd)
- c = os.environ.get("EDITOR")
# if no EDITOR is set, assume 'vi'
- if not c:
- c = "vi"
+ c = os.environ.get("EDITOR") or "vi"
cmd = shlex.split(c)
cmd.append(name)
self.ui.stop()
@@ -404,10 +403,11 @@ class ConsoleMaster(flow.FlowMaster):
subprocess.call(cmd)
except:
signals.status_message.send(
- message = "Can't start editor: %s" % " ".join(c)
+ message="Can't start editor: %s" % " ".join(c)
)
else:
- data = open(name, "rb").read()
+ with open(name, "r" if text else "rb") as f:
+ data = f.read()
self.ui.start()
os.unlink(name)
return data
@@ -570,7 +570,7 @@ class ConsoleMaster(flow.FlowMaster):
self,
ge,
None,
- statusbar.StatusBar(self, grideditor.FOOTER),
+ statusbar.StatusBar(self, grideditor.base.FOOTER),
ge.make_help()
)
)