aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/simple/custom_contentview.py7
-rw-r--r--examples/simple/io_read_dumpfile.py3
-rw-r--r--examples/simple/io_write_dumpfile.py3
-rw-r--r--mitmproxy/addonmanager.py4
-rw-r--r--mitmproxy/addons/core.py178
-rw-r--r--mitmproxy/addons/view.py60
-rw-r--r--mitmproxy/tools/console/commands.py9
-rw-r--r--mitmproxy/tools/console/common.py201
-rw-r--r--mitmproxy/tools/console/flowdetailview.py4
-rw-r--r--mitmproxy/tools/console/flowlist.py29
-rw-r--r--mitmproxy/tools/console/flowview.py486
-rw-r--r--mitmproxy/tools/console/grideditor/base.py102
-rw-r--r--mitmproxy/tools/console/grideditor/editors.py84
-rw-r--r--mitmproxy/tools/console/help.py12
-rw-r--r--mitmproxy/tools/console/keymap.py28
-rw-r--r--mitmproxy/tools/console/master.py498
-rw-r--r--mitmproxy/tools/console/options.py41
-rw-r--r--mitmproxy/tools/console/overlay.py7
-rw-r--r--mitmproxy/tools/console/searchable.py7
-rw-r--r--mitmproxy/tools/console/select.py1
-rw-r--r--mitmproxy/tools/console/signals.py4
-rw-r--r--mitmproxy/tools/console/statusbar.py6
-rw-r--r--mitmproxy/tools/console/tabs.py8
-rw-r--r--mitmproxy/tools/console/window.py155
-rw-r--r--mitmproxy/tools/main.py2
-rw-r--r--pathod/language/actions.py6
-rw-r--r--pathod/language/base.py15
-rw-r--r--pathod/language/http.py4
-rw-r--r--pathod/language/http2.py6
-rw-r--r--pathod/language/message.py3
-rw-r--r--pathod/language/websockets.py14
-rw-r--r--pathod/pathod.py6
-rw-r--r--pathod/test.py14
-rw-r--r--pathod/utils.py7
-rw-r--r--test/mitmproxy/addons/test_core.py102
-rw-r--r--test/mitmproxy/addons/test_script.py2
-rw-r--r--test/mitmproxy/addons/test_view.py23
-rw-r--r--test/mitmproxy/tools/console/test_help.py6
-rw-r--r--test/mitmproxy/tools/console/test_keymap.py29
-rw-r--r--tox.ini2
40 files changed, 1115 insertions, 1063 deletions
diff --git a/examples/simple/custom_contentview.py b/examples/simple/custom_contentview.py
index 289e1efe..71f92575 100644
--- a/examples/simple/custom_contentview.py
+++ b/examples/simple/custom_contentview.py
@@ -3,7 +3,10 @@ This example shows how one can add a custom contentview to mitmproxy.
The content view API is explained in the mitmproxy.contentviews module.
"""
from mitmproxy import contentviews
-from typing import Tuple, Iterable, AnyStr, List
+import typing
+
+
+CVIEWSWAPCASE = typing.Tuple[str, typing.Iterable[typing.List[typing.Tuple[str, typing.AnyStr]]]]
class ViewSwapCase(contentviews.View):
@@ -14,7 +17,7 @@ class ViewSwapCase(contentviews.View):
prompt = ("swap case text", "z")
content_types = ["text/plain"]
- def __call__(self, data: bytes, **metadata) -> Tuple[str, Iterable[List[Tuple[str, AnyStr]]]]:
+ def __call__(self, data: typing.AnyStr, **metadata) -> CVIEWSWAPCASE:
return "case-swapped text", contentviews.format_text(data.swapcase())
diff --git a/examples/simple/io_read_dumpfile.py b/examples/simple/io_read_dumpfile.py
index 87d37c0f..ea544cc4 100644
--- a/examples/simple/io_read_dumpfile.py
+++ b/examples/simple/io_read_dumpfile.py
@@ -1,8 +1,9 @@
#!/usr/bin/env python
+
+# type: ignore
#
# Simple script showing how to read a mitmproxy dump file
#
-
from mitmproxy import io
from mitmproxy.exceptions import FlowReadException
import pprint
diff --git a/examples/simple/io_write_dumpfile.py b/examples/simple/io_write_dumpfile.py
index 7c4c6a7a..cf7c4f52 100644
--- a/examples/simple/io_write_dumpfile.py
+++ b/examples/simple/io_write_dumpfile.py
@@ -8,12 +8,13 @@ to multiple files in parallel.
import random
import sys
from mitmproxy import io, http
+import typing # noqa
class Writer:
def __init__(self, path: str) -> None:
if path == "-":
- f = sys.stdout # type: io.TextIO
+ f = sys.stdout # type: typing.IO[typing.Any]
else:
f = open(path, "wb")
self.w = io.FlowWriter(f)
diff --git a/mitmproxy/addonmanager.py b/mitmproxy/addonmanager.py
index 5d63b1b3..0bbe6287 100644
--- a/mitmproxy/addonmanager.py
+++ b/mitmproxy/addonmanager.py
@@ -6,6 +6,7 @@ import sys
from mitmproxy import exceptions
from mitmproxy import eventsequence
from mitmproxy import controller
+from mitmproxy import flow
from . import ctx
import pprint
@@ -215,6 +216,9 @@ class AddonManager:
if isinstance(message.reply, controller.DummyReply):
message.reply.mark_reset()
+ if isinstance(message, flow.Flow):
+ self.trigger("update", [message])
+
def invoke_addon(self, addon, name, *args, **kwargs):
"""
Invoke an event on an addon and all its children. This method must
diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py
index b482edbb..426c47ad 100644
--- a/mitmproxy/addons/core.py
+++ b/mitmproxy/addons/core.py
@@ -4,6 +4,8 @@ from mitmproxy import ctx
from mitmproxy import exceptions
from mitmproxy import command
from mitmproxy import flow
+from mitmproxy import optmanager
+from mitmproxy.net.http import status_codes
class Core:
@@ -79,3 +81,179 @@ class Core:
updated.append(f)
ctx.log.alert("Reverted %s flows." % len(updated))
ctx.master.addons.trigger("update", updated)
+
+ @command.command("flow.set.options")
+ def flow_set_options(self) -> typing.Sequence[str]:
+ return [
+ "host",
+ "status_code",
+ "method",
+ "path",
+ "url",
+ "reason",
+ ]
+
+ @command.command("flow.set")
+ def flow_set(
+ self,
+ flows: typing.Sequence[flow.Flow], spec: str, sval: str
+ ) -> None:
+ """
+ Quickly set a number of common values on flows.
+ """
+ opts = self.flow_set_options()
+ if spec not in opts:
+ raise exceptions.CommandError(
+ "Set spec must be one of: %s." % ", ".join(opts)
+ )
+
+ val = sval # type: typing.Union[int, str]
+ if spec == "status_code":
+ try:
+ val = int(val)
+ except ValueError as v:
+ raise exceptions.CommandError(
+ "Status code is not an integer: %s" % val
+ ) from v
+
+ updated = []
+ for f in flows:
+ req = getattr(f, "request", None)
+ rupdate = True
+ if req:
+ if spec == "method":
+ req.method = val
+ elif spec == "host":
+ req.host = val
+ elif spec == "path":
+ req.path = val
+ elif spec == "url":
+ try:
+ req.url = val
+ except ValueError as e:
+ raise exceptions.CommandError(
+ "URL %s is invalid: %s" % (repr(val), e)
+ ) from e
+ else:
+ self.rupdate = False
+
+ resp = getattr(f, "response", None)
+ supdate = True
+ if resp:
+ if spec == "status_code":
+ resp.status_code = val
+ if val in status_codes.RESPONSES:
+ resp.reason = status_codes.RESPONSES[int(val)]
+ elif spec == "reason":
+ resp.reason = val
+ else:
+ supdate = False
+
+ if rupdate or supdate:
+ updated.append(f)
+
+ ctx.master.addons.trigger("update", updated)
+ ctx.log.alert("Set %s on %s flows." % (spec, len(updated)))
+
+ @command.command("flow.decode")
+ def decode(self, flows: typing.Sequence[flow.Flow], part: str) -> None:
+ """
+ Decode flows.
+ """
+ updated = []
+ for f in flows:
+ p = getattr(f, part, None)
+ if p:
+ p.decode()
+ updated.append(f)
+ ctx.master.addons.trigger("update", updated)
+ ctx.log.alert("Decoded %s flows." % len(updated))
+
+ @command.command("flow.encode.toggle")
+ def encode_toggle(self, flows: typing.Sequence[flow.Flow], part: str) -> None:
+ """
+ Toggle flow encoding on and off, using deflate for encoding.
+ """
+ updated = []
+ for f in flows:
+ p = getattr(f, part, None)
+ if p:
+ current_enc = p.headers.get("content-encoding", "identity")
+ if current_enc == "identity":
+ p.encode("deflate")
+ else:
+ p.decode()
+ updated.append(f)
+ ctx.master.addons.trigger("update", updated)
+ ctx.log.alert("Toggled encoding on %s flows." % len(updated))
+
+ @command.command("flow.encode")
+ def encode(self, flows: typing.Sequence[flow.Flow], part: str, enc: str) -> None:
+ """
+ Encode flows with a specified encoding.
+ """
+ if enc not in self.encode_options():
+ raise exceptions.CommandError("Invalid encoding format: %s" % enc)
+
+ updated = []
+ for f in flows:
+ p = getattr(f, part, None)
+ if p:
+ current_enc = p.headers.get("content-encoding", "identity")
+ if current_enc == "identity":
+ p.encode(enc)
+ updated.append(f)
+ ctx.master.addons.trigger("update", updated)
+ ctx.log.alert("Encoded %s flows." % len(updated))
+
+ @command.command("flow.encode.options")
+ def encode_options(self) -> typing.Sequence[str]:
+ """
+ The possible values for an encoding specification.
+
+ """
+ return ["gzip", "deflate", "br"]
+
+ @command.command("options.load")
+ def options_load(self, path: str) -> None:
+ """
+ Load options from a file.
+ """
+ try:
+ optmanager.load_paths(ctx.options, path)
+ except (OSError, exceptions.OptionsError) as e:
+ raise exceptions.CommandError(
+ "Could not load options - %s" % e
+ ) from e
+
+ @command.command("options.save")
+ def options_save(self, path: str) -> None:
+ """
+ Save options to a file.
+ """
+ try:
+ optmanager.save(ctx.options, path)
+ except OSError as e:
+ raise exceptions.CommandError(
+ "Could not save options - %s" % e
+ ) from e
+
+ @command.command("options.reset")
+ def options_reset(self) -> None:
+ """
+ Reset all options to defaults.
+ """
+ ctx.options.reset()
+
+ @command.command("options.reset.one")
+ def options_reset_one(self, name: str) -> None:
+ """
+ Reset one option to its default value.
+ """
+ if name not in ctx.options:
+ raise exceptions.CommandError("No such option: %s" % name)
+ setattr(
+ ctx.options,
+ name,
+ ctx.options.default(name),
+ )
diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py
index c9c9cbed..dd579585 100644
--- a/mitmproxy/addons/view.py
+++ b/mitmproxy/addons/view.py
@@ -202,6 +202,24 @@ class View(collections.Sequence):
self.sig_view_refresh.send(self)
# API
+ @command.command("view.focus.next")
+ def focus_next(self) -> None:
+ """
+ Set focus to the next flow.
+ """
+ idx = self.focus.index + 1
+ if self.inbounds(idx):
+ self.focus.flow = self[idx]
+
+ @command.command("view.focus.prev")
+ def focus_prev(self) -> None:
+ """
+ Set focus to the previous flow.
+ """
+ idx = self.focus.index - 1
+ if self.inbounds(idx):
+ self.focus.flow = self[idx]
+
@command.command("view.order.options")
def order_options(self) -> typing.Sequence[str]:
"""
@@ -278,6 +296,45 @@ class View(collections.Sequence):
"""
return self._store.get(flow_id)
+ @command.command("view.getval")
+ def getvalue(self, f: mitmproxy.flow.Flow, key: str, default: str) -> str:
+ """
+ Get a value from the settings store for the specified flow.
+ """
+ return self.settings[f].get(key, default)
+
+ @command.command("view.setval.toggle")
+ def setvalue_toggle(
+ self,
+ flows: typing.Sequence[mitmproxy.flow.Flow],
+ key: str
+ ) -> None:
+ """
+ Toggle a boolean value in the settings store, seting the value to
+ the string "true" or "false".
+ """
+ updated = []
+ for f in flows:
+ current = self.settings[f].get("key", "false")
+ self.settings[f][key] = "false" if current == "true" else "true"
+ updated.append(f)
+ ctx.master.addons.trigger("update", updated)
+
+ @command.command("view.setval")
+ def setvalue(
+ self,
+ flows: typing.Sequence[mitmproxy.flow.Flow],
+ key: str, value: str
+ ) -> None:
+ """
+ Set a value in the settings store for the specified flows.
+ """
+ updated = []
+ for f in flows:
+ self.settings[f][key] = value
+ updated.append(f)
+ ctx.master.addons.trigger("update", updated)
+
@command.command("view.load")
def load_file(self, path: str) -> None:
"""
@@ -296,6 +353,8 @@ class View(collections.Sequence):
the view, negative from the end of the view, so that 0 is the first
flow, -1 is the last flow.
"""
+ if len(self) == 0:
+ return
if dst < 0:
dst = len(self) + dst
if dst < 0:
@@ -314,6 +373,7 @@ class View(collections.Sequence):
if dups:
self.add(dups)
self.focus.flow = dups[0]
+ ctx.log.alert("Duplicated %s flows" % len(dups))
@command.command("view.remove")
def remove(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
diff --git a/mitmproxy/tools/console/commands.py b/mitmproxy/tools/console/commands.py
index 689aa637..76827a99 100644
--- a/mitmproxy/tools/console/commands.py
+++ b/mitmproxy/tools/console/commands.py
@@ -120,6 +120,12 @@ class CommandsList(urwid.ListBox):
if key == "enter":
foc, idx = self.get_focus()
signals.status_prompt_command.send(partial=foc.cmd.path + " ")
+ elif key == "m_start":
+ self.set_focus(0)
+ self.walker._modified()
+ elif key == "m_end":
+ self.set_focus(len(self.walker.cmds) - 1)
+ self.walker._modified()
return super().keypress(size, key)
@@ -146,6 +152,8 @@ class CommandHelp(urwid.Frame):
class Commands(urwid.Pile):
+ keyctx = "commands"
+
def __init__(self, master):
oh = CommandHelp(master)
super().__init__(
@@ -157,7 +165,6 @@ class Commands(urwid.Pile):
self.master = master
def keypress(self, size, key):
- key = common.shortcuts(key)
if key == "tab":
self.focus_position = (
self.focus_position + 1
diff --git a/mitmproxy/tools/console/common.py b/mitmproxy/tools/console/common.py
index 812ca7a8..de024d1a 100644
--- a/mitmproxy/tools/console/common.py
+++ b/mitmproxy/tools/console/common.py
@@ -1,24 +1,9 @@
-# -*- coding: utf-8 -*-
-
-
-import os
-
import urwid
import urwid.util
-import mitmproxy.net
from functools import lru_cache
-from mitmproxy.tools.console import signals
from mitmproxy.utils import human
-try:
- import pyperclip
-except:
- pyperclip = False
-
-
-VIEW_FLOW_REQUEST = 0
-VIEW_FLOW_RESPONSE = 1
METHOD_OPTIONS = [
("get", "g"),
@@ -92,20 +77,6 @@ def format_keyvals(lst, key="key", val="text", indent=0):
return ret
-def shortcuts(k):
- if k == " ":
- k = "page down"
- elif k == "ctrl f":
- k = "page down"
- elif k == "ctrl b":
- k = "page up"
- elif k == "j":
- k = "down"
- elif k == "k":
- k = "up"
- return k
-
-
def fcol(s, attr):
s = str(s)
return (
@@ -133,178 +104,6 @@ else:
SYMBOL_DOWN = " "
-# Save file to disk
-def save_data(path, data):
- if not path:
- return
- try:
- if isinstance(data, bytes):
- mode = "wb"
- else:
- mode = "w"
- with open(path, mode) as f:
- f.write(data)
- except IOError as v:
- signals.status_message.send(message=v.strerror)
-
-
-def ask_save_overwrite(path, data):
- if os.path.exists(path):
- def save_overwrite(k):
- if k == "y":
- save_data(path, data)
-
- signals.status_prompt_onekey.send(
- prompt = "'" + path + "' already exists. Overwrite?",
- keys = (
- ("yes", "y"),
- ("no", "n"),
- ),
- callback = save_overwrite
- )
- else:
- save_data(path, data)
-
-
-def ask_save_path(data, prompt="File path"):
- signals.status_prompt_path.send(
- prompt = prompt,
- callback = ask_save_overwrite,
- args = (data, )
- )
-
-
-def ask_scope_and_callback(flow, cb, *args):
- request_has_content = flow.request and flow.request.raw_content
- response_has_content = flow.response and flow.response.raw_content
-
- if request_has_content and response_has_content:
- signals.status_prompt_onekey.send(
- prompt = "Save",
- keys = (
- ("request", "q"),
- ("response", "s"),
- ("both", "b"),
- ),
- callback = cb,
- args = (flow,) + args
- )
- elif response_has_content:
- cb("s", flow, *args)
- else:
- cb("q", flow, *args)
-
-
-def copy_to_clipboard_or_prompt(data):
- # pyperclip calls encode('utf-8') on data to be copied without checking.
- # if data are already encoded that way UnicodeDecodeError is thrown.
- if isinstance(data, bytes):
- toclip = data.decode("utf8", "replace")
- else:
- toclip = data
-
- try:
- pyperclip.copy(toclip)
- except (RuntimeError, UnicodeDecodeError, AttributeError, TypeError):
- def save(k):
- if k == "y":
- ask_save_path(data, "Save data")
- signals.status_prompt_onekey.send(
- prompt = "Cannot copy data to clipboard. Save as file?",
- keys = (
- ("yes", "y"),
- ("no", "n"),
- ),
- callback = save
- )
-
-
-def format_flow_data(key, scope, flow):
- data = b""
- if scope in ("q", "b"):
- request = flow.request.copy()
- request.decode(strict=False)
- if request.content is None:
- return None, "Request content is missing"
- if key == "h":
- data += mitmproxy.net.http.http1.assemble_request(request)
- elif key == "c":
- data += request.get_content(strict=False)
- else:
- raise ValueError("Unknown key: {}".format(key))
- if scope == "b" and flow.request.raw_content and flow.response:
- # Add padding between request and response
- data += b"\r\n" * 2
- if scope in ("s", "b") and flow.response:
- response = flow.response.copy()
- response.decode(strict=False)
- if response.content is None:
- return None, "Response content is missing"
- if key == "h":
- data += mitmproxy.net.http.http1.assemble_response(response)
- elif key == "c":
- data += response.get_content(strict=False)
- else:
- raise ValueError("Unknown key: {}".format(key))
- return data, False
-
-
-def handle_flow_data(scope, flow, key, writer):
- """
- key: _c_ontent, _h_eaders+content, _u_rl
- scope: re_q_uest, re_s_ponse, _b_oth
- writer: copy_to_clipboard_or_prompt, ask_save_path
- """
- data, err = format_flow_data(key, scope, flow)
-
- if err:
- signals.status_message.send(message=err)
- return
-
- if not data:
- if scope == "q":
- signals.status_message.send(message="No request content.")
- elif scope == "s":
- signals.status_message.send(message="No response content.")
- else:
- signals.status_message.send(message="No content.")
- return
-
- writer(data)
-
-
-def ask_save_body(scope, flow):
- """
- Save either the request or the response body to disk.
-
- scope: re_q_uest, re_s_ponse, _b_oth, None (ask user if necessary)
- """
-
- request_has_content = flow.request and flow.request.raw_content
- response_has_content = flow.response and flow.response.raw_content
-
- if scope is None:
- ask_scope_and_callback(flow, ask_save_body)
- elif scope == "q" and request_has_content:
- ask_save_path(
- flow.request.get_content(strict=False),
- "Save request content to"
- )
- elif scope == "s" and response_has_content:
- ask_save_path(
- flow.response.get_content(strict=False),
- "Save response content to"
- )
- elif scope == "b" and request_has_content and response_has_content:
- ask_save_path(
- (flow.request.get_content(strict=False) + b"\n" +
- flow.response.get_content(strict=False)),
- "Save request & response content to"
- )
- else:
- signals.status_message.send(message="No content.")
-
-
@lru_cache(maxsize=800)
def raw_format_flow(f, flow):
f = dict(f)
diff --git a/mitmproxy/tools/console/flowdetailview.py b/mitmproxy/tools/console/flowdetailview.py
index 9ed063bc..28fe1fbc 100644
--- a/mitmproxy/tools/console/flowdetailview.py
+++ b/mitmproxy/tools/console/flowdetailview.py
@@ -27,7 +27,7 @@ def flowdetails(state, flow: http.HTTPFlow):
text.append(urwid.Text([("head", "Metadata:")]))
text.extend(common.format_keyvals(parts, key="key", val="text", indent=4))
- if sc is not None:
+ if sc is not None and sc.ip_address:
text.append(urwid.Text([("head", "Server Connection:")]))
parts = [
["Address", human.format_address(sc.address)],
@@ -183,4 +183,4 @@ def flowdetails(state, flow: http.HTTPFlow):
text.append(urwid.Text([("head", "Timing:")]))
text.extend(common.format_keyvals(parts, key="key", val="text", indent=4))
- return searchable.Searchable(state, text)
+ return searchable.Searchable(text)
diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py
index 7400c16c..8e28ff0f 100644
--- a/mitmproxy/tools/console/flowlist.py
+++ b/mitmproxy/tools/console/flowlist.py
@@ -1,7 +1,6 @@
import urwid
from mitmproxy.tools.console import common
-from mitmproxy.tools.console import signals
import mitmproxy.tools.console.master # noqa
@@ -59,13 +58,12 @@ class LogBufferBox(urwid.ListBox):
super().set_focus(index)
def keypress(self, size, key):
- key = common.shortcuts(key)
if key == "z":
self.master.clear_events()
key = None
- elif key == "G":
+ elif key == "m_end":
self.set_focus(len(self.master.logbuffer) - 1)
- elif key == "g":
+ elif key == "m_start":
self.set_focus(0)
return urwid.ListBox.keypress(self, size, key)
@@ -137,22 +135,15 @@ class FlowItem(urwid.WidgetWrap):
return True
def keypress(self, xxx_todo_changeme, key):
- (maxcol,) = xxx_todo_changeme
- return common.shortcuts(key)
+ return key
class FlowListWalker(urwid.ListWalker):
def __init__(self, master):
self.master = master
- self.master.view.sig_view_refresh.connect(self.sig_mod)
- self.master.view.sig_view_add.connect(self.sig_mod)
- self.master.view.sig_view_remove.connect(self.sig_mod)
- self.master.view.sig_view_update.connect(self.sig_mod)
- self.master.view.focus.sig_change.connect(self.sig_mod)
- signals.flowlist_change.connect(self.sig_mod)
-
- def sig_mod(self, *args, **kwargs):
+
+ def view_changed(self):
self._modified()
def get_focus(self):
@@ -164,7 +155,6 @@ class FlowListWalker(urwid.ListWalker):
def set_focus(self, index):
if self.master.view.inbounds(index):
self.master.view.focus.index = index
- signals.flowlist_change.send(self)
def get_next(self, pos):
pos = pos + 1
@@ -182,6 +172,7 @@ class FlowListWalker(urwid.ListWalker):
class FlowListBox(urwid.ListBox):
+ keyctx = "flowlist"
def __init__(
self, master: "mitmproxy.tools.console.master.ConsoleMaster"
@@ -190,5 +181,11 @@ class FlowListBox(urwid.ListBox):
super().__init__(FlowListWalker(master))
def keypress(self, size, key):
- key = common.shortcuts(key)
+ if key == "m_start":
+ self.master.commands.call("view.go 0")
+ elif key == "m_end":
+ self.master.commands.call("view.go -1")
return urwid.ListBox.keypress(self, size, key)
+
+ def view_changed(self):
+ self.body.view_changed()
diff --git a/mitmproxy/tools/console/flowview.py b/mitmproxy/tools/console/flowview.py
index b7b7053f..00951610 100644
--- a/mitmproxy/tools/console/flowview.py
+++ b/mitmproxy/tools/console/flowview.py
@@ -1,5 +1,4 @@
import math
-import os
import sys
from functools import lru_cache
from typing import Optional, Union # noqa
@@ -7,14 +6,9 @@ from typing import Optional, Union # noqa
import urwid
from mitmproxy import contentviews
-from mitmproxy import exceptions
from mitmproxy import http
-from mitmproxy.net.http import Headers
-from mitmproxy.net.http import status_codes
from mitmproxy.tools.console import common
from mitmproxy.tools.console import flowdetailview
-from mitmproxy.tools.console import grideditor
-from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import searchable
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import tabs
@@ -106,49 +100,45 @@ class FlowViewHeader(urwid.WidgetWrap):
def __init__(
self,
master: "mitmproxy.tools.console.master.ConsoleMaster",
- f: http.HTTPFlow
) -> None:
self.master = master
- self.flow = f
- self._w = common.format_flow(
- f,
- False,
- extended=True,
- hostheader=self.master.options.showhost
- )
- signals.flow_change.connect(self.sig_flow_change)
+ self.focus_changed()
- def sig_flow_change(self, sender, flow):
- if flow == self.flow:
+ def focus_changed(self):
+ if self.master.view.focus.flow:
self._w = common.format_flow(
- flow,
+ self.master.view.focus.flow,
False,
extended=True,
hostheader=self.master.options.showhost
)
+ else:
+ self._w = urwid.Pile([])
-TAB_REQ = 0
-TAB_RESP = 1
-
-
-class FlowView(tabs.Tabs):
- highlight_color = "focusfield"
+class FlowDetails(tabs.Tabs):
+ def __init__(self, master):
+ self.master = master
+ super().__init__([])
+ self.show()
+ self.last_displayed_body = None
- def __init__(self, master, view, flow, tab_offset):
- self.master, self.view, self.flow = master, view, flow
- super().__init__(
- [
+ def focus_changed(self):
+ if self.master.view.focus.flow:
+ self.tabs = [
(self.tab_request, self.view_request),
(self.tab_response, self.view_response),
(self.tab_details, self.view_details),
- ],
- tab_offset
- )
-
+ ]
self.show()
- self.last_displayed_body = None
- signals.flow_change.connect(self.sig_flow_change)
+
+ @property
+ def view(self):
+ return self.master.view
+
+ @property
+ def flow(self):
+ return self.master.view.focus.flow
def tab_request(self):
if self.flow.intercepted and not self.flow.response:
@@ -174,18 +164,13 @@ class FlowView(tabs.Tabs):
def view_details(self):
return flowdetailview.flowdetails(self.view, self.flow)
- def sig_flow_change(self, sender, flow):
- if flow == self.flow:
- self.show()
-
def content_view(self, viewmode, message):
if message.raw_content is None:
msg, body = "", [urwid.Text([("error", "[content missing]")])]
return msg, body
else:
- s = self.view.settings[self.flow]
- full = s.get((self.tab_offset, "fullcontents"), False)
- if full:
+ full = self.master.commands.call("view.getval @focus fullcontents false")
+ if full == "true":
limit = sys.maxsize
else:
limit = contentviews.VIEW_CUTOFF
@@ -241,12 +226,6 @@ class FlowView(tabs.Tabs):
return description, text_objects
- def viewmode_get(self):
- return self.view.settings[self.flow].get(
- (self.tab_offset, "prettyview"),
- self.master.options.default_contentview
- )
-
def conn_text(self, conn):
if conn:
txt = common.format_keyvals(
@@ -254,7 +233,7 @@ class FlowView(tabs.Tabs):
key = "header",
val = "text"
)
- viewmode = self.viewmode_get()
+ viewmode = self.master.commands.call("console.flowview.mode")
msg, body = self.content_view(viewmode, conn)
cols = [
@@ -288,404 +267,23 @@ class FlowView(tabs.Tabs):
]
)
]
- return searchable.Searchable(self.view, txt)
-
- def set_method_raw(self, m):
- if m:
- self.flow.request.method = m
- signals.flow_change.send(self, flow = self.flow)
-
- def edit_method(self, m):
- if m == "e":
- signals.status_prompt.send(
- prompt = "Method",
- text = self.flow.request.method,
- callback = self.set_method_raw
- )
- else:
- for i in common.METHOD_OPTIONS:
- if i[1] == m:
- self.flow.request.method = i[0].upper()
- signals.flow_change.send(self, flow = self.flow)
-
- def set_url(self, url):
- request = self.flow.request
- try:
- request.url = str(url)
- except ValueError:
- return "Invalid URL."
- signals.flow_change.send(self, flow = self.flow)
-
- def set_resp_status_code(self, status_code):
- try:
- status_code = int(status_code)
- except ValueError:
- return None
- self.flow.response.status_code = status_code
- if status_code in status_codes.RESPONSES:
- self.flow.response.reason = status_codes.RESPONSES[status_code]
- signals.flow_change.send(self, flow = self.flow)
-
- def set_resp_reason(self, reason):
- self.flow.response.reason = reason
- signals.flow_change.send(self, flow = self.flow)
-
- def set_headers(self, fields, conn):
- conn.headers = Headers(fields)
- signals.flow_change.send(self, flow = self.flow)
-
- def set_query(self, lst, conn):
- conn.query = lst
- signals.flow_change.send(self, flow = self.flow)
-
- def set_path_components(self, lst, conn):
- conn.path_components = lst
- signals.flow_change.send(self, flow = self.flow)
-
- def set_form(self, lst, conn):
- conn.urlencoded_form = lst
- signals.flow_change.send(self, flow = self.flow)
-
- def edit_form(self, conn):
- self.master.view_grideditor(
- grideditor.URLEncodedFormEditor(
- self.master,
- conn.urlencoded_form.items(multi=True),
- self.set_form,
- conn
- )
- )
-
- def edit_form_confirm(self, key, conn):
- if key == "y":
- self.edit_form(conn)
-
- def set_cookies(self, lst, conn):
- conn.cookies = lst
- signals.flow_change.send(self, flow = self.flow)
-
- def set_setcookies(self, data, conn):
- conn.cookies = data
- signals.flow_change.send(self, flow = self.flow)
-
- def edit(self, part):
- if self.tab_offset == TAB_REQ:
- message = self.flow.request
- else:
- if not self.flow.response:
- self.flow.response = http.HTTPResponse.make(200, b"")
- message = self.flow.response
-
- self.flow.backup()
- if message == self.flow.request and part == "c":
- self.master.view_grideditor(
- grideditor.CookieEditor(
- self.master,
- message.cookies.items(multi=True),
- self.set_cookies,
- message
- )
- )
- if message == self.flow.response and part == "c":
- self.master.view_grideditor(
- grideditor.SetCookieEditor(
- self.master,
- message.cookies.items(multi=True),
- self.set_setcookies,
- message
- )
- )
- if part == "r":
- # Fix an issue caused by some editors when editing a
- # request/response body. Many editors make it hard to save a
- # file without a terminating newline on the last line. When
- # editing message bodies, this can cause problems. For now, I
- # just strip the newlines off the end of the body when we return
- # from an editor.
- c = self.master.spawn_editor(message.get_content(strict=False) or b"")
- message.content = c.rstrip(b"\n")
- elif part == "f":
- if not message.urlencoded_form and message.raw_content:
- signals.status_prompt_onekey.send(
- prompt = "Existing body is not a URL-encoded form. Clear and edit?",
- keys = [
- ("yes", "y"),
- ("no", "n"),
- ],
- callback = self.edit_form_confirm,
- args = (message,)
- )
- else:
- self.edit_form(message)
- elif part == "h":
- self.master.view_grideditor(
- grideditor.HeaderEditor(
- self.master,
- message.headers.fields,
- self.set_headers,
- message
- )
- )
- elif part == "p":
- p = message.path_components
- self.master.view_grideditor(
- grideditor.PathEditor(
- self.master,
- p,
- self.set_path_components,
- message
- )
- )
- elif part == "q":
- self.master.view_grideditor(
- grideditor.QueryEditor(
- self.master,
- message.query.items(multi=True),
- self.set_query, message
- )
- )
- elif part == "u":
- signals.status_prompt.send(
- prompt = "URL",
- text = message.url,
- callback = self.set_url
- )
- elif part == "m" and message == self.flow.request:
- signals.status_prompt_onekey.send(
- prompt = "Method",
- keys = common.METHOD_OPTIONS,
- callback = self.edit_method
- )
- elif part == "o":
- signals.status_prompt.send(
- prompt = "Code",
- text = str(message.status_code),
- callback = self.set_resp_status_code
- )
- elif part == "m" and message == self.flow.response:
- signals.status_prompt.send(
- prompt = "Message",
- text = message.reason,
- callback = self.set_resp_reason
- )
- signals.flow_change.send(self, flow = self.flow)
+ return searchable.Searchable(txt)
- def view_flow(self, flow):
- signals.pop_view_state.send(self)
- self.master.view_flow(flow, self.tab_offset)
-
- def _view_nextprev_flow(self, idx, flow):
- if not self.view.inbounds(idx):
- signals.status_message.send(message="No more flows")
- return
- self.view_flow(self.view[idx])
-
- def view_next_flow(self, flow):
- return self._view_nextprev_flow(self.view.index(flow) + 1, flow)
-
- def view_prev_flow(self, flow):
- return self._view_nextprev_flow(self.view.index(flow) - 1, flow)
+ def keypress(self, size, key):
+ key = super().keypress(size, key)
+ return self._w.keypress(size, key)
- def change_this_display_mode(self, t):
- view = contentviews.get(t)
- self.view.settings[self.flow][(self.tab_offset, "prettyview")] = view.name.lower()
- signals.flow_change.send(self, flow=self.flow)
- def keypress(self, size, key):
- conn = None # type: Optional[Union[http.HTTPRequest, http.HTTPResponse]]
- if self.tab_offset == TAB_REQ:
- conn = self.flow.request
- elif self.tab_offset == TAB_RESP:
- conn = self.flow.response
+class FlowView(urwid.Frame):
+ keyctx = "flowview"
- key = super().keypress(size, key)
+ def __init__(self, master):
+ super().__init__(
+ FlowDetails(master),
+ header = FlowViewHeader(master),
+ )
+ self.master = master
- # Special case: Space moves over to the next flow.
- # We need to catch that before applying common.shortcuts()
- if key == " ":
- self.view_next_flow(self.flow)
- return
-
- key = common.shortcuts(key)
- if key in ("up", "down", "page up", "page down"):
- # Pass scroll events to the wrapped widget
- self._w.keypress(size, key)
- elif key == "a":
- self.flow.resume()
- self.master.view.update(self.flow)
- elif key == "A":
- for f in self.view:
- if f.intercepted:
- f.resume()
- self.master.view.update(self.flow)
- elif key == "d":
- if self.flow.killable:
- self.flow.kill()
- self.view.remove(self.flow)
- if not self.view.focus.flow:
- self.master.view_flowlist()
- else:
- self.view_flow(self.view.focus.flow)
- elif key == "D":
- cp = self.flow.copy()
- self.master.view.add(cp)
- self.master.view.focus.flow = cp
- self.view_flow(cp)
- signals.status_message.send(message="Duplicated.")
- elif key == "p":
- self.view_prev_flow(self.flow)
- elif key == "r":
- try:
- self.master.replay_request(self.flow)
- except exceptions.ReplayException as e:
- signals.add_log("Replay error: %s" % e, "warn")
- signals.flow_change.send(self, flow = self.flow)
- elif key == "V":
- if self.flow.modified():
- self.flow.revert()
- signals.flow_change.send(self, flow = self.flow)
- signals.status_message.send(message="Reverted.")
- else:
- signals.status_message.send(message="Flow not modified.")
- elif key == "W":
- signals.status_prompt_path.send(
- prompt = "Save this flow",
- callback = self.master.save_one_flow,
- args = (self.flow,)
- )
- elif key == "|":
- signals.status_prompt_path.send(
- prompt = "Send flow to script",
- callback = self.master.run_script_once,
- args = (self.flow,)
- )
- elif key == "e":
- if self.tab_offset == TAB_REQ:
- signals.status_prompt_onekey.send(
- prompt="Edit request",
- keys=(
- ("cookies", "c"),
- ("query", "q"),
- ("path", "p"),
- ("url", "u"),
- ("header", "h"),
- ("form", "f"),
- ("raw body", "r"),
- ("method", "m"),
- ),
- callback=self.edit
- )
- elif self.tab_offset == TAB_RESP:
- signals.status_prompt_onekey.send(
- prompt="Edit response",
- keys=(
- ("cookies", "c"),
- ("code", "o"),
- ("message", "m"),
- ("header", "h"),
- ("raw body", "r"),
- ),
- callback=self.edit
- )
- else:
- signals.status_message.send(
- message="Tab to the request or response",
- expire=1
- )
- elif key in set("bfgmxvzEC") and not conn:
- signals.status_message.send(
- message = "Tab to the request or response",
- expire = 1
- )
- return
- elif key == "b":
- if self.tab_offset == TAB_REQ:
- common.ask_save_body("q", self.flow)
- else:
- common.ask_save_body("s", self.flow)
- elif key == "f":
- self.view.settings[self.flow][(self.tab_offset, "fullcontents")] = True
- signals.flow_change.send(self, flow = self.flow)
- signals.status_message.send(message="Loading all body data...")
- elif key == "m":
- opts = [i.name.lower() for i in contentviews.views]
- self.master.overlay(
- overlay.Chooser(
- "display mode",
- opts,
- self.viewmode_get(),
- self.change_this_display_mode
- )
- )
- elif key == "E":
- pass
- # if self.tab_offset == TAB_REQ:
- # scope = "q"
- # else:
- # scope = "s"
- # signals.status_prompt_onekey.send(
- # self,
- # prompt = "Export to file",
- # keys = [(e[0], e[1]) for e in export.EXPORTERS],
- # callback = common.export_to_clip_or_file,
- # args = (scope, self.flow, common.ask_save_path)
- # )
- elif key == "C":
- pass
- # if self.tab_offset == TAB_REQ:
- # scope = "q"
- # else:
- # scope = "s"
- # signals.status_prompt_onekey.send(
- # self,
- # prompt = "Export to clipboard",
- # keys = [(e[0], e[1]) for e in export.EXPORTERS],
- # callback = common.export_to_clip_or_file,
- # args = (scope, self.flow, common.copy_to_clipboard_or_prompt)
- # )
- elif key == "x":
- conn.content = None
- signals.flow_change.send(self, flow=self.flow)
- elif key == "v":
- if conn.raw_content:
- t = conn.headers.get("content-type")
- if "EDITOR" in os.environ or "PAGER" in os.environ:
- self.master.spawn_external_viewer(conn.get_content(strict=False), t)
- else:
- signals.status_message.send(
- message = "Error! Set $EDITOR or $PAGER."
- )
- elif key == "z":
- self.flow.backup()
- enc = conn.headers.get("content-encoding", "identity")
- if enc != "identity":
- try:
- conn.decode()
- except ValueError:
- signals.status_message.send(
- message = "Could not decode - invalid data?"
- )
- else:
- signals.status_prompt_onekey.send(
- prompt = "Select encoding: ",
- keys = (
- ("gzip", "z"),
- ("deflate", "d"),
- ("brotli", "b"),
- ),
- callback = self.encode_callback,
- args = (conn,)
- )
- signals.flow_change.send(self, flow = self.flow)
- else:
- # Key is not handled here.
- return key
-
- def encode_callback(self, key, conn):
- encoding_map = {
- "z": "gzip",
- "d": "deflate",
- "b": "br",
- }
- conn.encode(encoding_map[key])
- signals.flow_change.send(self, flow = self.flow)
+ def focus_changed(self, *args, **kwargs):
+ self.body.focus_changed()
+ self.header.focus_changed()
diff --git a/mitmproxy/tools/console/grideditor/base.py b/mitmproxy/tools/console/grideditor/base.py
index 151479a4..35ae655f 100644
--- a/mitmproxy/tools/console/grideditor/base.py
+++ b/mitmproxy/tools/console/grideditor/base.py
@@ -252,13 +252,13 @@ FIRST_WIDTH_MAX = 40
FIRST_WIDTH_MIN = 20
-class GridEditor(urwid.WidgetWrap):
- title = None # type: str
- columns = None # type: Sequence[Column]
+class BaseGridEditor(urwid.WidgetWrap):
def __init__(
self,
master: "mitmproxy.tools.console.master.ConsoleMaster",
+ title,
+ columns,
value: Any,
callback: Callable[..., None],
*cb_args,
@@ -266,6 +266,8 @@ class GridEditor(urwid.WidgetWrap):
) -> None:
value = self.data_in(copy.deepcopy(value))
self.master = master
+ self.title = title
+ self.columns = columns
self.value = value
self.callback = callback
self.cb_args = cb_args
@@ -307,6 +309,13 @@ class GridEditor(urwid.WidgetWrap):
signals.footer_help.send(self, helptext="")
self.show_empty_msg()
+ def view_popping(self):
+ res = []
+ for i in self.walker.lst:
+ if not i[1] and any([x for x in i[0]]):
+ res.append(i[0])
+ self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs)
+
def show_empty_msg(self):
if self.walker.lst:
self._w.set_footer(None)
@@ -337,22 +346,14 @@ class GridEditor(urwid.WidgetWrap):
self._w.keypress(size, key)
return None
- key = common.shortcuts(key)
column = self.columns[self.walker.focus_col]
- if key in ["q", "esc"]:
- res = []
- for i in self.walker.lst:
- if not i[1] and any([x for x in i[0]]):
- res.append(i[0])
- self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs)
- signals.pop_view_state.send(self)
- elif key == "g":
+ if key == "m_start":
self.walker.set_focus(0)
- elif key == "G":
+ elif key == "m_end":
self.walker.set_focus(len(self.walker.lst) - 1)
- elif key in ["h", "left"]:
+ elif key == "left":
self.walker.left()
- elif key in ["l", "right"]:
+ elif key == "right":
self.walker.right()
elif key == "tab":
self.walker.tab_next()
@@ -415,3 +416,74 @@ class GridEditor(urwid.WidgetWrap):
)
)
return text
+
+
+class GridEditor(urwid.WidgetWrap):
+ title = None # type: str
+ columns = None # type: Sequence[Column]
+
+ def __init__(
+ self,
+ master: "mitmproxy.tools.console.master.ConsoleMaster",
+ value: Any,
+ callback: Callable[..., None],
+ *cb_args,
+ **cb_kwargs
+ ) -> None:
+ super().__init__(
+ master,
+ value,
+ self.title,
+ self.columns,
+ callback,
+ *cb_args,
+ **cb_kwargs
+ )
+
+
+class FocusEditor(urwid.WidgetWrap):
+ """
+ A specialised GridEditor that edits the current focused flow.
+ """
+ keyctx = "grideditor"
+
+ def __init__(self, master):
+ self.master = master
+ self.focus_changed()
+
+ def focus_changed(self):
+ if self.master.view.focus.flow:
+ self._w = BaseGridEditor(
+ self.master.view.focus.flow,
+ self.title,
+ self.columns,
+ self.get_data(self.master.view.focus.flow),
+ self.set_data_update,
+ self.master.view.focus.flow,
+ )
+ else:
+ self._w = urwid.Pile([])
+
+ def call(self, v, name, *args, **kwargs):
+ f = getattr(v, name, None)
+ if f:
+ f(*args, **kwargs)
+
+ def view_popping(self):
+ self.call(self._w, "view_popping")
+
+ def get_data(self, flow):
+ """
+ Retrieve the data to edit from the current flow.
+ """
+ raise NotImplementedError
+
+ def set_data(self, vals, flow):
+ """
+ Set the current data on the flow.
+ """
+ raise NotImplementedError
+
+ def set_data_update(self, vals, flow):
+ self.set_data(vals, flow)
+ signals.flow_change.send(self, flow = flow)
diff --git a/mitmproxy/tools/console/grideditor/editors.py b/mitmproxy/tools/console/grideditor/editors.py
index e069fe2f..671e91fb 100644
--- a/mitmproxy/tools/console/grideditor/editors.py
+++ b/mitmproxy/tools/console/grideditor/editors.py
@@ -1,4 +1,3 @@
-import os
import re
import urwid
@@ -13,18 +12,24 @@ from mitmproxy.tools.console.grideditor import col_bytes
from mitmproxy.tools.console.grideditor import col_subgrid
from mitmproxy.tools.console import signals
from mitmproxy.net.http import user_agents
+from mitmproxy.net.http import Headers
-class QueryEditor(base.GridEditor):
+class QueryEditor(base.FocusEditor):
title = "Editing query"
columns = [
col_text.Column("Key"),
col_text.Column("Value")
]
+ def get_data(self, flow):
+ return flow.request.query.items(multi=True)
-class HeaderEditor(base.GridEditor):
- title = "Editing headers"
+ def set_data(self, vals, flow):
+ flow.request.query = vals
+
+
+class HeaderEditor(base.FocusEditor):
columns = [
col_bytes.Column("Key"),
col_bytes.Column("Value")
@@ -65,35 +70,38 @@ class HeaderEditor(base.GridEditor):
return True
-class URLEncodedFormEditor(base.GridEditor):
+class RequestHeaderEditor(HeaderEditor):
+ title = "Editing request headers"
+
+ def get_data(self, flow):
+ return flow.request.headers.fields
+
+ def set_data(self, vals, flow):
+ flow.request.headers = Headers(vals)
+
+
+class ResponseHeaderEditor(HeaderEditor):
+ title = "Editing response headers"
+
+ def get_data(self, flow):
+ return flow.response.headers.fields
+
+ def set_data(self, vals, flow):
+ flow.response.headers = Headers(vals)
+
+
+class RequestFormEditor(base.FocusEditor):
title = "Editing URL-encoded form"
columns = [
col_text.Column("Key"),
col_text.Column("Value")
]
+ def get_data(self, flow):
+ return flow.request.urlencoded_form.items(multi=True)
-class ReplaceEditor(base.GridEditor):
- title = "Editing replacement patterns"
- columns = [
- col_text.Column("Filter"),
- col_text.Column("Regex"),
- col_text.Column("Replacement"),
- ]
-
- def is_error(self, col, val):
- if col == 0:
- if not flowfilter.parse(val):
- return "Invalid filter specification."
- elif col == 1:
- try:
- re.compile(val)
- except re.error:
- return "Invalid regular expression."
- elif col == 2:
- if val.startswith("@") and not os.path.isfile(os.path.expanduser(val[1:])):
- return "Invalid file path"
- return False
+ def set_data(self, vals, flow):
+ flow.request.urlencoded_form = vals
class SetHeadersEditor(base.GridEditor):
@@ -146,7 +154,7 @@ class SetHeadersEditor(base.GridEditor):
return True
-class PathEditor(base.GridEditor):
+class PathEditor(base.FocusEditor):
# TODO: Next row on enter?
title = "Editing URL path components"
@@ -160,6 +168,12 @@ class PathEditor(base.GridEditor):
def data_out(self, data):
return [i[0] for i in data]
+ def get_data(self, flow):
+ return self.data_in(flow.request.path_components)
+
+ def set_data(self, vals, flow):
+ flow.request.path_components = self.data_out(vals)
+
class ScriptEditor(base.GridEditor):
title = "Editing scripts"
@@ -193,13 +207,19 @@ class HostPatternEditor(base.GridEditor):
return [i[0] for i in data]
-class CookieEditor(base.GridEditor):
+class CookieEditor(base.FocusEditor):
title = "Editing request Cookie header"
columns = [
col_text.Column("Name"),
col_text.Column("Value"),
]
+ def get_data(self, flow):
+ return flow.request.cookies.items(multi=True)
+
+ def set_data(self, vals, flow):
+ flow.request.cookies = vals
+
class CookieAttributeEditor(base.GridEditor):
title = "Editing Set-Cookie attributes"
@@ -221,7 +241,7 @@ class CookieAttributeEditor(base.GridEditor):
return ret
-class SetCookieEditor(base.GridEditor):
+class SetCookieEditor(base.FocusEditor):
title = "Editing response SetCookie header"
columns = [
col_text.Column("Name"),
@@ -246,6 +266,12 @@ class SetCookieEditor(base.GridEditor):
)
return vals
+ def get_data(self, flow):
+ return self.data_in(flow.response.cookies.items(multi=True))
+
+ def set_data(self, vals, flow):
+ flow.response.cookies = self.data_out(vals)
+
class OptionsEditor(base.GridEditor):
title = None # type: str
diff --git a/mitmproxy/tools/console/help.py b/mitmproxy/tools/console/help.py
index 282f374d..ec0c95d9 100644
--- a/mitmproxy/tools/console/help.py
+++ b/mitmproxy/tools/console/help.py
@@ -4,7 +4,6 @@ import urwid
from mitmproxy import flowfilter
from mitmproxy.tools.console import common
-from mitmproxy.tools.console import signals
from mitmproxy import version
@@ -15,6 +14,7 @@ footer = [
class HelpView(urwid.ListBox):
+ keyctx = "help"
def __init__(self, help_context):
self.help_context = help_context or []
@@ -84,14 +84,8 @@ class HelpView(urwid.ListBox):
return text
def keypress(self, size, key):
- key = common.shortcuts(key)
- if key == "q":
- signals.pop_view_state.send(self)
- return None
- elif key == "?":
- key = None
- elif key == "g":
+ if key == "m_start":
self.set_focus(0)
- elif key == "G":
+ elif key == "m_end":
self.set_focus(len(self.body.contents))
return urwid.ListBox.keypress(self, size, key)
diff --git a/mitmproxy/tools/console/keymap.py b/mitmproxy/tools/console/keymap.py
index e3d28cf4..62e2dcfb 100644
--- a/mitmproxy/tools/console/keymap.py
+++ b/mitmproxy/tools/console/keymap.py
@@ -1,8 +1,10 @@
import typing
+import collections
from mitmproxy.tools.console import commandeditor
-contexts = {
+SupportedContexts = {
+ "chooser",
"commands",
"flowlist",
"flowview",
@@ -13,20 +15,34 @@ contexts = {
}
+Binding = collections.namedtuple("Binding", ["key", "command", "contexts"])
+
+
class Keymap:
def __init__(self, master):
self.executor = commandeditor.CommandExecutor(master)
self.keys = {}
+ self.bindings = []
- def add(self, key: str, command: str, context: str = "global") -> None:
+ def add(self, key: str, command: str, contexts: typing.Sequence[str]) -> None:
"""
Add a key to the key map. If context is empty, it's considered to be
a global binding.
"""
- if context not in contexts:
- raise ValueError("Unsupported context: %s" % context)
- d = self.keys.setdefault(context, {})
- d[key] = command
+ if not contexts:
+ raise ValueError("Must specify at least one context.")
+ for c in contexts:
+ if c not in SupportedContexts:
+ raise ValueError("Unsupported context: %s" % c)
+
+ b = Binding(key=key, command=command, contexts=contexts)
+ self.bindings.append(b)
+ self.bind(b)
+
+ def bind(self, binding):
+ for c in binding.contexts:
+ d = self.keys.setdefault(c, {})
+ d[binding.key] = binding.command
def get(self, context: str, key: str) -> typing.Optional[str]:
if context in self.keys:
diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py
index 5b6d9bcb..b88a0354 100644
--- a/mitmproxy/tools/console/master.py
+++ b/mitmproxy/tools/console/master.py
@@ -16,24 +16,19 @@ import urwid
from mitmproxy import ctx
from mitmproxy import addons
from mitmproxy import command
+from mitmproxy import exceptions
from mitmproxy import master
from mitmproxy import log
from mitmproxy import flow
from mitmproxy.addons import intercept
from mitmproxy.addons import readfile
from mitmproxy.addons import view
-from mitmproxy.tools.console import flowlist
-from mitmproxy.tools.console import flowview
-from mitmproxy.tools.console import grideditor
-from mitmproxy.tools.console import help
from mitmproxy.tools.console import keymap
-from mitmproxy.tools.console import options
-from mitmproxy.tools.console import commands
from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import palettes
from mitmproxy.tools.console import signals
-from mitmproxy.tools.console import statusbar
from mitmproxy.tools.console import window
+from mitmproxy import contentviews
from mitmproxy.utils import strutils
EVENTLOG_SIZE = 10000
@@ -80,14 +75,102 @@ class UnsupportedLog:
class ConsoleAddon:
"""
- An addon that exposes console-specific commands.
+ An addon that exposes console-specific commands, and hooks into required
+ events.
"""
def __init__(self, master):
self.master = master
self.started = False
+ @command.command("console.options.reset.current")
+ def options_reset_current(self) -> None:
+ """
+ Reset the current option in the options editor.
+ """
+ if self.master.window.focus.keyctx != "options":
+ raise exceptions.CommandError("Not viewing options.")
+ name = self.master.window.windows["options"].current_name()
+ self.master.commands.call("options.reset.one %s" % name)
+
+ @command.command("console.nav.start")
+ def nav_start(self) -> None:
+ """
+ Go to the start of a list or scrollable.
+ """
+ self.master.inject_key("m_start")
+
+ @command.command("console.nav.end")
+ def nav_end(self) -> None:
+ """
+ Go to the end of a list or scrollable.
+ """
+ self.master.inject_key("m_end")
+
+ @command.command("console.nav.up")
+ def nav_up(self) -> None:
+ """
+ Go up.
+ """
+ self.master.inject_key("up")
+
+ @command.command("console.nav.down")
+ def nav_down(self) -> None:
+ """
+ Go down.
+ """
+ self.master.inject_key("down")
+
+ @command.command("console.nav.pageup")
+ def nav_pageup(self) -> None:
+ """
+ Go up.
+ """
+ self.master.inject_key("page up")
+
+ @command.command("console.nav.pagedown")
+ def nav_pagedown(self) -> None:
+ """
+ Go down.
+ """
+ self.master.inject_key("page down")
+
+ @command.command("console.nav.left")
+ def nav_left(self) -> None:
+ """
+ Go left.
+ """
+ self.master.inject_key("left")
+
+ @command.command("console.nav.right")
+ def nav_right(self) -> None:
+ """
+ Go right.
+ """
+ self.master.inject_key("right")
+
@command.command("console.choose")
def console_choose(
+ self, prompt: str, choices: typing.Sequence[str], *cmd: typing.Sequence[str]
+ ) -> None:
+ """
+ Prompt the user to choose from a specified list of strings, then
+ invoke another command with all occurances of {choice} replaced by
+ the choice the user made.
+ """
+ def callback(opt):
+ # We're now outside of the call context...
+ repl = " ".join(cmd)
+ repl = repl.replace("{choice}", opt)
+ try:
+ self.master.commands.call(repl)
+ except exceptions.CommandError as e:
+ signals.status_message.send(message=str(e))
+
+ self.master.overlay(overlay.Chooser(self.master, prompt, choices, "", callback))
+ ctx.log.info(choices)
+
+ @command.command("console.choose.cmd")
+ def console_choose_cmd(
self, prompt: str, choicecmd: str, *cmd: typing.Sequence[str]
) -> None:
"""
@@ -98,11 +181,15 @@ class ConsoleAddon:
choices = ctx.master.commands.call_args(choicecmd, [])
def callback(opt):
+ # We're now outside of the call context...
repl = " ".join(cmd)
repl = repl.replace("{choice}", opt)
- self.master.commands.call(repl)
+ try:
+ self.master.commands.call(repl)
+ except exceptions.CommandError as e:
+ signals.status_message.send(message=str(e))
- self.master.overlay(overlay.Chooser(choicecmd, choices, "", callback))
+ self.master.overlay(overlay.Chooser(self.master, prompt, choices, "", callback))
ctx.log.info(choices)
@command.command("console.command")
@@ -115,24 +202,24 @@ class ConsoleAddon:
@command.command("console.view.commands")
def view_commands(self) -> None:
"""View the commands list."""
- self.master.view_commands()
+ self.master.switch_view("commands")
@command.command("console.view.options")
def view_options(self) -> None:
"""View the options editor."""
- self.master.view_options()
+ self.master.switch_view("options")
@command.command("console.view.help")
def view_help(self) -> None:
"""View help."""
- self.master.view_help()
+ self.master.switch_view("help")
@command.command("console.view.flow")
def view_flow(self, flow: flow.Flow) -> None:
"""View a flow."""
if hasattr(flow, "request"):
# FIME: Also set focus?
- self.master.view_flow(flow)
+ self.master.switch_view("flowview")
@command.command("console.exit")
def exit(self) -> None:
@@ -147,71 +234,219 @@ class ConsoleAddon:
"""
signals.pop_view_state.send(self)
+ @command.command("console.bodyview")
+ def bodyview(self, f: flow.Flow, part: str) -> None:
+ """
+ Spawn an external viewer for a flow request or response body based
+ on the detected MIME type. We use the mailcap system to find the
+ correct viewier, and fall back to the programs in $PAGER or $EDITOR
+ if necessary.
+ """
+ fpart = getattr(f, part)
+ if not fpart:
+ raise exceptions.CommandError("Could not view part %s." % part)
+ t = fpart.headers.get("content-type")
+ content = fpart.get_content(strict=False)
+ if not content:
+ raise exceptions.CommandError("No content to view.")
+ self.master.spawn_external_viewer(content, t)
+
+ @command.command("console.edit.focus.options")
+ def edit_focus_options(self) -> typing.Sequence[str]:
+ return [
+ "cookies",
+ "form",
+ "path",
+ "method",
+ "query",
+ "reason",
+ "request-headers",
+ "response-headers",
+ "status_code",
+ "set-cookies",
+ "url",
+ ]
+
+ @command.command("console.edit.focus")
+ def edit_focus(self, part: str) -> None:
+ """
+ Edit the query of the current focus.
+ """
+ if part == "cookies":
+ self.master.switch_view("edit_focus_cookies")
+ elif part == "form":
+ self.master.switch_view("edit_focus_form")
+ elif part == "path":
+ self.master.switch_view("edit_focus_path")
+ elif part == "query":
+ self.master.switch_view("edit_focus_query")
+ elif part == "request-headers":
+ self.master.switch_view("edit_focus_request_headers")
+ elif part == "response-headers":
+ self.master.switch_view("edit_focus_response_headers")
+ elif part == "set-cookies":
+ self.master.switch_view("edit_focus_setcookies")
+ elif part in ["url", "method", "status_code", "reason"]:
+ self.master.commands.call(
+ "console.command flow.set @focus %s " % part
+ )
+
+ @command.command("console.flowview.mode.set")
+ def flowview_mode_set(self) -> None:
+ """
+ Set the display mode for the current flow view.
+ """
+ if self.master.window.focus.keyctx != "flowview":
+ raise exceptions.CommandError("Not viewing a flow.")
+ fv = self.master.window.windows["flowview"]
+ idx = fv.body.tab_offset
+
+ def callback(opt):
+ try:
+ self.master.commands.call_args(
+ "view.setval",
+ ["@focus", "flowview_mode_%s" % idx, opt]
+ )
+ except exceptions.CommandError as e:
+ signals.status_message.send(message=str(e))
+
+ opts = [i.name.lower() for i in contentviews.views]
+ self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback))
+
+ @command.command("console.flowview.mode")
+ def flowview_mode(self) -> str:
+ """
+ Get the display mode for the current flow view.
+ """
+ if self.master.window.focus.keyctx != "flowview":
+ raise exceptions.CommandError("Not viewing a flow.")
+ fv = self.master.window.windows["flowview"]
+ idx = fv.body.tab_offset
+ return self.master.commands.call_args(
+ "view.getval",
+ [
+ "@focus",
+ "flowview_mode_%s" % idx,
+ self.master.options.default_contentview,
+ ]
+ )
+
def running(self):
self.started = True
def update(self, flows):
if not flows:
signals.update_settings.send(self)
+ for f in flows:
+ signals.flow_change.send(self, flow=f)
def configure(self, updated):
if self.started:
if "console_eventlog" in updated:
- self.master.refresh_view()
+ pass
def default_keymap(km):
- km.add(":", "console.command ''")
- km.add("?", "console.view.help")
- km.add("C", "console.view.commands")
- km.add("O", "console.view.options")
- km.add("Q", "console.exit")
- km.add("q", "console.view.pop")
- km.add("i", "console.command set intercept=")
- km.add("W", "console.command set save_stream_file=")
-
- km.add("A", "flow.resume @all", context="flowlist")
- km.add("a", "flow.resume @focus", context="flowlist")
- km.add("b", "console.command cut.save s.content|@focus ''", context="flowlist")
- km.add("d", "view.remove @focus", context="flowlist")
- km.add("D", "view.duplicate @focus", context="flowlist")
- km.add("e", "set console_eventlog=toggle", context="flowlist")
+ km.add(":", "console.command ''", ["global"])
+ km.add("?", "console.view.help", ["global"])
+ km.add("C", "console.view.commands", ["global"])
+ km.add("O", "console.view.options", ["global"])
+ km.add("Q", "console.exit", ["global"])
+ km.add("q", "console.view.pop", ["global"])
+
+ km.add("g", "console.nav.start", ["global"])
+ km.add("G", "console.nav.end", ["global"])
+ km.add("k", "console.nav.up", ["global"])
+ km.add("j", "console.nav.down", ["global"])
+ km.add("l", "console.nav.right", ["global"])
+ km.add("h", "console.nav.left", ["global"])
+ km.add(" ", "console.nav.pagedown", ["global"])
+ km.add("ctrl f", "console.nav.pagedown", ["global"])
+ km.add("ctrl b", "console.nav.pageup", ["global"])
+
+ km.add("i", "console.command set intercept=", ["global"])
+ km.add("W", "console.command set save_stream_file=", ["global"])
+
+ km.add("A", "flow.resume @all", ["flowlist", "flowview"])
+ km.add("a", "flow.resume @focus", ["flowlist", "flowview"])
+ km.add(
+ "b", "console.command cut.save s.content|@focus ''",
+ ["flowlist", "flowview"]
+ )
+ km.add("d", "view.remove @focus", ["flowlist", "flowview"])
+ km.add("D", "view.duplicate @focus", ["flowlist", "flowview"])
+ km.add("e", "set console_eventlog=toggle", ["flowlist"])
km.add(
"E",
- "console.choose Format export.formats "
+ "console.choose.cmd Format export.formats "
"console.command export.file {choice} @focus ''",
- context="flowlist"
+ ["flowlist", "flowview"]
)
- km.add("f", "console.command 'set view_filter='", context="flowlist")
- km.add("F", "set console_focus_follow=toggle", context="flowlist")
- km.add("g", "view.go 0", context="flowlist")
- km.add("G", "view.go -1", context="flowlist")
- km.add("l", "console.command cut.clip ", context="flowlist")
- km.add("L", "console.command view.load ", context="flowlist")
- km.add("m", "flow.mark.toggle @focus", context="flowlist")
- km.add("M", "view.marked.toggle", context="flowlist")
+ km.add("f", "console.command set view_filter=", ["flowlist"])
+ km.add("F", "set console_focus_follow=toggle", ["flowlist"])
+ km.add("ctrl l", "console.command cut.clip ", ["flowlist", "flowview"])
+ km.add("L", "console.command view.load ", ["flowlist"])
+ km.add("m", "flow.mark.toggle @focus", ["flowlist"])
+ km.add("M", "view.marked.toggle", ["flowlist"])
km.add(
"n",
"console.command view.create get https://google.com",
- context="flowlist"
+ ["flowlist"]
)
km.add(
"o",
- "console.choose Order view.order.options "
+ "console.choose.cmd Order view.order.options "
"set console_order={choice}",
- context="flowlist"
+ ["flowlist"]
)
- km.add("r", "replay.client @focus", context="flowlist")
- km.add("S", "console.command 'replay.server '")
- km.add("v", "set console_order_reversed=toggle", context="flowlist")
- km.add("U", "flow.mark @all false", context="flowlist")
- km.add("w", "console.command 'save.file @shown '", context="flowlist")
- km.add("V", "flow.revert @focus", context="flowlist")
- km.add("X", "flow.kill @focus", context="flowlist")
- km.add("z", "view.remove @all", context="flowlist")
- km.add("Z", "view.remove @hidden", context="flowlist")
- km.add("|", "console.command 'script.run @focus '", context="flowlist")
- km.add("enter", "console.view.flow @focus", context="flowlist")
+ km.add("r", "replay.client @focus", ["flowlist", "flowview"])
+ km.add("S", "console.command replay.server ", ["flowlist"])
+ km.add("v", "set console_order_reversed=toggle", ["flowlist"])
+ km.add("U", "flow.mark @all false", ["flowlist"])
+ km.add("w", "console.command save.file @shown ", ["flowlist"])
+ km.add("V", "flow.revert @focus", ["flowlist", "flowview"])
+ km.add("X", "flow.kill @focus", ["flowlist"])
+ km.add("z", "view.remove @all", ["flowlist"])
+ km.add("Z", "view.remove @hidden", ["flowlist"])
+ km.add("|", "console.command script.run @focus ", ["flowlist", "flowview"])
+ km.add("enter", "console.view.flow @focus", ["flowlist"])
+
+ km.add(
+ "e",
+ "console.choose.cmd Part console.edit.focus.options "
+ "console.edit.focus {choice}",
+ ["flowview"]
+ )
+ km.add("f", "view.setval.toggle @focus fullcontents", ["flowview"])
+ km.add("w", "console.command save.file @focus ", ["flowview"])
+ km.add(" ", "view.focus.next", ["flowview"])
+ km.add(
+ "o",
+ "console.choose.cmd Order view.order.options "
+ "set console_order={choice}",
+ ["flowlist"]
+ )
+
+ km.add(
+ "v",
+ "console.choose \"View Part\" request,response "
+ "console.bodyview @focus {choice}",
+ ["flowview"]
+ )
+ km.add("p", "view.focus.prev", ["flowview"])
+ km.add("m", "console.flowview.mode.set", ["flowview"])
+ km.add("tab", "console.nav.right", ["flowview"])
+ km.add(
+ "z",
+ "console.choose \"Part\" request,response "
+ "flow.encode.toggle @focus {choice}",
+ ["flowview"]
+ )
+
+ km.add("L", "console.command options.load ", ["options"])
+ km.add("S", "console.command options.save ", ["options"])
+ km.add("D", "options.reset", ["options"])
+ km.add("d", "console.options.reset.current", ["options"])
class ConsoleMaster(master.Master):
@@ -219,7 +454,6 @@ class ConsoleMaster(master.Master):
def __init__(self, options, server):
super().__init__(options, server)
self.view = view.View() # type: view.View
- self.view.sig_view_update.connect(signals.flow_change.send)
self.stream_path = None
# This line is just for type hinting
self.options = self.options # type: Options
@@ -232,9 +466,6 @@ class ConsoleMaster(master.Master):
self.view_stack = []
signals.call_in.connect(self.sig_call_in)
- signals.pop_view_state.connect(self.sig_pop_view_state)
- signals.replace_view_state.connect(self.sig_replace_view_state)
- signals.push_view_state.connect(self.sig_push_view_state)
signals.sig_add_log.connect(self.sig_add_log)
self.addons.add(Logger())
self.addons.add(*addons.default_addons())
@@ -251,6 +482,8 @@ class ConsoleMaster(master.Master):
signal.signal(signal.SIGINT, sigint_handler)
+ self.window = None
+
def __setattr__(self, name, value):
self.__dict__[name] = value
signals.update_settings.send(self)
@@ -294,37 +527,6 @@ class ConsoleMaster(master.Master):
return callback(*args)
self.loop.set_alarm_in(seconds, cb)
- def sig_replace_view_state(self, sender):
- """
- A view has been pushed onto the stack, and is intended to replace
- the current view rather than creating a new stack entry.
- """
- if len(self.view_stack) > 1:
- del self.view_stack[1]
-
- def sig_pop_view_state(self, sender):
- """
- Pop the top view off the view stack. If no more views will be left
- after this, prompt for exit.
- """
- if len(self.view_stack) > 1:
- self.view_stack.pop()
- self.loop.widget = self.view_stack[-1]
- else:
- self.prompt_for_exit()
-
- def sig_push_view_state(self, sender, window):
- """
- Push a new view onto the view stack.
- """
- self.view_stack.append(window)
- self.loop.widget = window
- self.loop.draw_screen()
-
- def refresh_view(self):
- self.view_flowlist()
- signals.replace_view_state.send(self)
-
def spawn_editor(self, data):
text = not isinstance(data, bytes)
fd, name = tempfile.mkstemp('', "mproxy", text=text)
@@ -400,6 +602,9 @@ class ConsoleMaster(master.Master):
self.loop.draw_screen()
self.loop.set_alarm_in(0.01, self.ticker)
+ def inject_key(self, key):
+ self.loop.process_input([key])
+
def run(self):
self.ui = urwid.raw_display.Screen()
self.ui.set_terminal_properties(256)
@@ -413,12 +618,14 @@ class ConsoleMaster(master.Master):
screen = self.ui,
handle_mouse = self.options.console_mouse,
)
- self.ab = statusbar.ActionBar(self)
+
+ self.window = window.Window(self)
+ self.loop.widget = self.window
self.loop.set_alarm_in(0.01, self.ticker)
self.loop.set_alarm_in(
0.0001,
- lambda *args: self.view_flowlist()
+ lambda *args: self.switch_view("flowlist")
)
self.start()
@@ -439,111 +646,16 @@ class ConsoleMaster(master.Master):
def shutdown(self):
raise urwid.ExitMainLoop
- def overlay(self, widget, **kwargs):
- signals.push_view_state.send(
- self,
- window = overlay.SimpleOverlay(
- self,
- widget,
- self.loop.widget,
- widget.width,
- **kwargs
- )
- )
-
- def view_help(self):
- hc = self.view_stack[-1].helpctx
- signals.push_view_state.send(
- self,
- window = window.Window(
- self,
- help.HelpView(hc),
- None,
- statusbar.StatusBar(self, help.footer),
- None,
- "help"
- )
- )
-
- def view_options(self):
- for i in self.view_stack:
- if isinstance(i["body"], options.Options):
- return
- signals.push_view_state.send(
- self,
- window = window.Window(
- self,
- options.Options(self),
- None,
- statusbar.StatusBar(self, options.footer),
- options.help_context,
- "options"
- )
- )
-
- def view_commands(self):
- for i in self.view_stack:
- if isinstance(i["body"], commands.Commands):
- return
- signals.push_view_state.send(
- self,
- window = window.Window(
- self,
- commands.Commands(self),
- None,
- statusbar.StatusBar(self, commands.footer),
- commands.help_context,
- "commands"
- )
- )
+ def sig_exit_overlay(self, *args, **kwargs):
+ self.loop.widget = self.window
- def view_grideditor(self, ge):
- signals.push_view_state.send(
- self,
- window = window.Window(
- self,
- ge,
- None,
- statusbar.StatusBar(self, grideditor.base.FOOTER),
- ge.make_help(),
- "grideditor"
- )
- )
-
- def view_flowlist(self):
- if self.ui.started:
- self.ui.clear()
-
- if self.options.console_eventlog:
- body = flowlist.BodyPile(self)
- else:
- body = flowlist.FlowListBox(self)
-
- signals.push_view_state.send(
- self,
- window = window.Window(
- self,
- body,
- None,
- statusbar.StatusBar(self, flowlist.footer),
- flowlist.help_context,
- "flowlist"
- )
+ def overlay(self, widget, **kwargs):
+ self.loop.widget = overlay.SimpleOverlay(
+ self, widget, self.loop.widget, widget.width, **kwargs
)
- def view_flow(self, flow, tab_offset=0):
- self.view.focus.flow = flow
- signals.push_view_state.send(
- self,
- window = window.Window(
- self,
- flowview.FlowView(self, self.view, flow, tab_offset),
- flowview.FlowViewHeader(self, flow),
- statusbar.StatusBar(self, flowview.footer),
- flowview.help_context,
- "flowview"
- )
- )
+ def switch_view(self, name):
+ self.window.push(name)
def quit(self, a):
if a != "n":
diff --git a/mitmproxy/tools/console/options.py b/mitmproxy/tools/console/options.py
index 64203f2b..fee61fe5 100644
--- a/mitmproxy/tools/console/options.py
+++ b/mitmproxy/tools/console/options.py
@@ -187,12 +187,6 @@ class OptionsList(urwid.ListBox):
except exceptions.OptionsError as e:
signals.status_message.send(message=str(e))
- def load_config(self, path):
- try:
- optmanager.load_paths(self.master.options, path)
- except exceptions.OptionsError as e:
- signals.status_message.send(message=str(e))
-
def keypress(self, size, key):
if self.walker.editing:
if key == "enter":
@@ -207,29 +201,12 @@ class OptionsList(urwid.ListBox):
elif key == "esc":
self.walker.stop_editing()
else:
- if key == "d":
- foc, idx = self.get_focus()
- setattr(
- self.master.options,
- foc.opt.name,
- self.master.options.default(foc.opt.name)
- )
- elif key == "g":
+ if key == "m_start":
self.set_focus(0)
self.walker._modified()
- elif key == "G":
+ elif key == "m_end":
self.set_focus(len(self.walker.opts) - 1)
self.walker._modified()
- elif key == "l":
- signals.status_prompt_path.send(
- prompt = "Load config from",
- callback = self.load_config
- )
- elif key == "w":
- signals.status_prompt_path.send(
- prompt = "Save config to",
- callback = self.save_config
- )
elif key == "enter":
foc, idx = self.get_focus()
if foc.opt.typespec == bool:
@@ -242,6 +219,7 @@ class OptionsList(urwid.ListBox):
elif foc.opt.choices:
self.master.overlay(
overlay.Chooser(
+ self.master,
foc.opt.name,
foc.opt.choices,
foc.opt.current(),
@@ -286,27 +264,30 @@ class OptionHelp(urwid.Frame):
class Options(urwid.Pile):
+ keyctx = "options"
+
def __init__(self, master):
oh = OptionHelp(master)
+ self.optionslist = OptionsList(master)
super().__init__(
[
- OptionsList(master),
+ self.optionslist,
(HELP_HEIGHT, oh),
]
)
self.master = master
+ def current_name(self):
+ foc, idx = self.optionslist.get_focus()
+ return foc.opt.name
+
def keypress(self, size, key):
- key = common.shortcuts(key)
if key == "tab":
self.focus_position = (
self.focus_position + 1
) % len(self.widget_list)
self.widget_list[1].set_active(self.focus_position == 1)
key = None
- elif key == "D":
- self.master.options.reset()
- key = None
# This is essentially a copypasta from urwid.Pile's keypress handler.
# So much for "closed for modification, but open for extension".
diff --git a/mitmproxy/tools/console/overlay.py b/mitmproxy/tools/console/overlay.py
index e874da69..2fa6aa46 100644
--- a/mitmproxy/tools/console/overlay.py
+++ b/mitmproxy/tools/console/overlay.py
@@ -80,11 +80,12 @@ class ChooserListWalker(urwid.ListWalker):
class Chooser(urwid.WidgetWrap):
- def __init__(self, title, choices, current, callback):
+ def __init__(self, master, title, choices, current, callback):
+ self.master = master
self.choices = choices
self.callback = callback
choicewidth = max([len(i) for i in choices])
- self.width = max(choicewidth, len(title) + 5)
+ self.width = max(choicewidth, len(title)) + 5
self.walker = ChooserListWalker(choices, current)
super().__init__(
urwid.AttrWrap(
@@ -103,7 +104,7 @@ class Chooser(urwid.WidgetWrap):
return True
def keypress(self, size, key):
- key = common.shortcuts(key)
+ key = self.master.keymap.handle("chooser", key)
if key == "enter":
self.callback(self.choices[self.walker.index])
signals.pop_view_state.send(self)
diff --git a/mitmproxy/tools/console/searchable.py b/mitmproxy/tools/console/searchable.py
index 55c5218a..f2bb5612 100644
--- a/mitmproxy/tools/console/searchable.py
+++ b/mitmproxy/tools/console/searchable.py
@@ -16,10 +16,9 @@ class Highlight(urwid.AttrMap):
class Searchable(urwid.ListBox):
- def __init__(self, view, contents):
+ def __init__(self, contents):
self.walker = urwid.SimpleFocusListWalker(contents)
urwid.ListBox.__init__(self, self.walker)
- self.view = view
self.search_offset = 0
self.current_highlight = None
self.search_term = None
@@ -36,10 +35,10 @@ class Searchable(urwid.ListBox):
self.find_next(False)
elif key == "N":
self.find_next(True)
- elif key == "g":
+ elif key == "m_start":
self.set_focus(0)
self.walker._modified()
- elif key == "G":
+ elif key == "m_end":
self.set_focus(len(self.walker) - 1)
self.walker._modified()
else:
diff --git a/mitmproxy/tools/console/select.py b/mitmproxy/tools/console/select.py
index a990dff8..f7e5d950 100644
--- a/mitmproxy/tools/console/select.py
+++ b/mitmproxy/tools/console/select.py
@@ -113,7 +113,6 @@ class Select(urwid.ListBox):
if key == "enter" or key == " ":
self.get_focus()[0].option.activate()
return None
- key = common.shortcuts(key)
if key in self.keymap:
self.keymap[key].activate()
self.set_focus(self.options.index(self.keymap[key]))
diff --git a/mitmproxy/tools/console/signals.py b/mitmproxy/tools/console/signals.py
index 91cb63b3..885cdbfb 100644
--- a/mitmproxy/tools/console/signals.py
+++ b/mitmproxy/tools/console/signals.py
@@ -48,4 +48,6 @@ flowlist_change = blinker.Signal()
# Pop and push view state onto a stack
pop_view_state = blinker.Signal()
push_view_state = blinker.Signal()
-replace_view_state = blinker.Signal()
+
+# Exits overlay if there is one
+exit_overlay = blinker.Signal()
diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py
index 8ded0cda..a5db0f4a 100644
--- a/mitmproxy/tools/console/statusbar.py
+++ b/mitmproxy/tools/console/statusbar.py
@@ -143,6 +143,7 @@ class ActionBar(urwid.WidgetWrap):
class StatusBar(urwid.WidgetWrap):
+ keyctx = ""
def __init__(
self, master: "mitmproxy.tools.console.master.ConsoleMaster", helptext
@@ -150,7 +151,8 @@ class StatusBar(urwid.WidgetWrap):
self.master = master
self.helptext = helptext
self.ib = urwid.WidgetWrap(urwid.Text(""))
- super().__init__(urwid.Pile([self.ib, self.master.ab]))
+ self.ab = ActionBar(self)
+ super().__init__(urwid.Pile([self.ib, self.ab]))
signals.update_settings.connect(self.sig_update)
signals.flowlist_change.connect(self.sig_update)
signals.footer_help.connect(self.sig_footer_help)
@@ -166,7 +168,7 @@ class StatusBar(urwid.WidgetWrap):
self.redraw()
def keypress(self, *args, **kwargs):
- return self.master.ab.keypress(*args, **kwargs)
+ return self.ab.keypress(*args, **kwargs)
def get_status(self):
r = []
diff --git a/mitmproxy/tools/console/tabs.py b/mitmproxy/tools/console/tabs.py
index a2d5e719..93d6909e 100644
--- a/mitmproxy/tools/console/tabs.py
+++ b/mitmproxy/tools/console/tabs.py
@@ -27,6 +27,7 @@ class Tabs(urwid.WidgetWrap):
self.tab_offset = tab_offset
self.tabs = tabs
self.show()
+ self._w = urwid.Pile([])
def change_tab(self, offset):
self.tab_offset = offset
@@ -34,13 +35,16 @@ class Tabs(urwid.WidgetWrap):
def keypress(self, size, key):
n = len(self.tabs)
- if key in ["tab", "l"]:
+ if key == "right":
self.change_tab((self.tab_offset + 1) % n)
- elif key == "h":
+ elif key == "left":
self.change_tab((self.tab_offset - 1) % n)
return self._w.keypress(size, key)
def show(self):
+ if not self.tabs:
+ return
+
headers = []
for i in range(len(self.tabs)):
txt = self.tabs[i][0]()
diff --git a/mitmproxy/tools/console/window.py b/mitmproxy/tools/console/window.py
index ad972a66..d7038da0 100644
--- a/mitmproxy/tools/console/window.py
+++ b/mitmproxy/tools/console/window.py
@@ -1,22 +1,107 @@
import urwid
-
from mitmproxy.tools.console import signals
+from mitmproxy.tools.console import statusbar
+from mitmproxy.tools.console import flowlist
+from mitmproxy.tools.console import flowview
+from mitmproxy.tools.console import commands
+from mitmproxy.tools.console import options
+from mitmproxy.tools.console import overlay
+from mitmproxy.tools.console import help
+from mitmproxy.tools.console import grideditor
class Window(urwid.Frame):
-
- def __init__(self, master, body, header, footer, helpctx, keyctx):
- urwid.Frame.__init__(
- self,
- urwid.AttrWrap(body, "background"),
- header = urwid.AttrWrap(header, "background") if header else None,
- footer = urwid.AttrWrap(footer, "background") if footer else None
+ def __init__(self, master):
+ self.statusbar = statusbar.StatusBar(master, "")
+ super().__init__(
+ None,
+ header = None,
+ footer = urwid.AttrWrap(self.statusbar, "background")
)
self.master = master
- self.helpctx = helpctx
- self.keyctx = keyctx
+ self.primary_stack = []
+ self.master.view.sig_view_refresh.connect(self.view_changed)
+ self.master.view.sig_view_add.connect(self.view_changed)
+ self.master.view.sig_view_remove.connect(self.view_changed)
+ self.master.view.sig_view_update.connect(self.view_changed)
+ self.master.view.focus.sig_change.connect(self.view_changed)
signals.focus.connect(self.sig_focus)
+ self.master.view.focus.sig_change.connect(self.focus_changed)
+ signals.flow_change.connect(self.flow_changed)
+
+ signals.pop_view_state.connect(self.pop)
+ signals.push_view_state.connect(self.push)
+ self.windows = dict(
+ flowlist = flowlist.FlowListBox(self.master),
+ flowview = flowview.FlowView(self.master),
+ commands = commands.Commands(self.master),
+ options = options.Options(self.master),
+ help = help.HelpView(None),
+ edit_focus_query = grideditor.QueryEditor(self.master),
+ edit_focus_cookies = grideditor.CookieEditor(self.master),
+ edit_focus_setcookies = grideditor.SetCookieEditor(self.master),
+ edit_focus_form = grideditor.RequestFormEditor(self.master),
+ edit_focus_path = grideditor.PathEditor(self.master),
+ edit_focus_request_headers = grideditor.RequestHeaderEditor(self.master),
+ edit_focus_response_headers = grideditor.ResponseHeaderEditor(self.master),
+ )
+
+ def call(self, v, name, *args, **kwargs):
+ f = getattr(v, name, None)
+ if f:
+ f(*args, **kwargs)
+
+ def flow_changed(self, sender, flow):
+ if self.master.view.focus.flow:
+ if flow.id == self.master.view.focus.flow.id:
+ self.focus_changed()
+
+ def focus_changed(self, *args, **kwargs):
+ """
+ Triggered when the focus changes - either when it's modified, or
+ when it changes to a different flow altogether.
+ """
+ self.call(self.focus, "focus_changed")
+
+ def view_changed(self, *args, **kwargs):
+ """
+ Triggered when the view list has changed.
+ """
+ self.call(self.focus, "view_changed")
+
+ def view_popping(self, *args, **kwargs):
+ """
+ Triggered when the view list has changed.
+ """
+ self.call(self.focus, "view_popping")
+
+ def push(self, wname):
+ if self.primary_stack and self.primary_stack[-1] == wname:
+ return
+ self.primary_stack.append(wname)
+ self.body = urwid.AttrWrap(
+ self.windows[wname], "background"
+ )
+ self.view_changed()
+ self.focus_changed()
+
+ def pop(self, *args, **kwargs):
+ if isinstance(self.master.loop.widget, overlay.SimpleOverlay):
+ self.master.loop.widget = self
+ else:
+ if len(self.primary_stack) > 1:
+ self.view_popping()
+ self.primary_stack.pop()
+ self.body = urwid.AttrWrap(
+ self.windows[self.primary_stack[-1]],
+ "background",
+ )
+ self.view_changed()
+ self.focus_changed()
+ else:
+ self.master.prompt_for_exit()
+
def sig_focus(self, sender, section):
self.focus_position = section
@@ -37,50 +122,8 @@ class Window(urwid.Frame):
return False
return True
- def handle_replay(self, k):
- if k == "c":
- creplay = self.master.addons.get("clientplayback")
- if self.master.options.client_replay and creplay.count():
- def stop_client_playback_prompt(a):
- if a != "n":
- self.master.options.client_replay = None
- signals.status_prompt_onekey.send(
- self,
- prompt = "Stop current client replay?",
- keys = (
- ("yes", "y"),
- ("no", "n"),
- ),
- callback = stop_client_playback_prompt
- )
- else:
- signals.status_prompt_path.send(
- self,
- prompt = "Client replay path",
- callback = lambda x: self.master.options.setter("client_replay")([x])
- )
- elif k == "s":
- a = self.master.addons.get("serverplayback")
- if a.count():
- def stop_server_playback(response):
- if response == "y":
- self.master.options.server_replay = []
- signals.status_prompt_onekey.send(
- self,
- prompt = "Stop current server replay?",
- keys = (
- ("yes", "y"),
- ("no", "n"),
- ),
- callback = stop_server_playback
- )
- else:
- signals.status_prompt_path.send(
- self,
- prompt = "Server playback path",
- callback = lambda x: self.master.options.setter("server_replay")([x])
- )
-
def keypress(self, size, k):
- k = super().keypress(size, k)
- return self.master.keymap.handle(self.keyctx, k)
+ if self.focus.keyctx:
+ k = self.master.keymap.handle(self.focus.keyctx, k)
+ if k:
+ return super().keypress(size, k)
diff --git a/mitmproxy/tools/main.py b/mitmproxy/tools/main.py
index 9748f3cf..d8fac077 100644
--- a/mitmproxy/tools/main.py
+++ b/mitmproxy/tools/main.py
@@ -99,7 +99,7 @@ def run(MasterKlass, args, extra=None): # pragma: no cover
except exceptions.OptionsError as e:
print("%s: %s" % (sys.argv[0], e), file=sys.stderr)
sys.exit(1)
- except (KeyboardInterrupt, RuntimeError):
+ except (KeyboardInterrupt, RuntimeError) as e:
pass
return master
diff --git a/pathod/language/actions.py b/pathod/language/actions.py
index e85affac..fc57a18b 100644
--- a/pathod/language/actions.py
+++ b/pathod/language/actions.py
@@ -2,9 +2,7 @@ import abc
import copy
import random
from functools import total_ordering
-
import pyparsing as pp
-
from . import base
@@ -52,7 +50,7 @@ class _Action(base.Token):
class PauseAt(_Action):
- unique_name = None
+ unique_name = None # type: ignore
def __init__(self, offset, seconds):
_Action.__init__(self, offset)
@@ -103,7 +101,7 @@ class DisconnectAt(_Action):
class InjectAt(_Action):
- unique_name = None
+ unique_name = None # type: ignore
def __init__(self, offset, value):
_Action.__init__(self, offset)
diff --git a/pathod/language/base.py b/pathod/language/base.py
index 3a810ef0..c8892748 100644
--- a/pathod/language/base.py
+++ b/pathod/language/base.py
@@ -3,10 +3,9 @@ import os
import abc
import functools
import pyparsing as pp
-
from mitmproxy.utils import strutils
from mitmproxy.utils import human
-
+import typing # noqa
from . import generators, exceptions
@@ -84,7 +83,7 @@ class Token:
return None
@property
- def unique_name(self):
+ def unique_name(self) -> typing.Optional[str]:
"""
Controls uniqueness constraints for tokens. No two tokens with the
same name will be allowed. If no uniquness should be applied, this
@@ -334,7 +333,7 @@ class OptionsOrValue(_Component):
Can be any of a specified set of options, or a value specifier.
"""
preamble = ""
- options = []
+ options = [] # type: typing.List[str]
def __init__(self, value):
# If it's a string, we were passed one of the options, so we lower-case
@@ -376,7 +375,7 @@ class OptionsOrValue(_Component):
class Integer(_Component):
- bounds = (None, None)
+ bounds = (None, None) # type: typing.Tuple[typing.Union[int, None], typing.Union[int , None]]
preamble = ""
def __init__(self, value):
@@ -442,7 +441,7 @@ class FixedLengthValue(Value):
A value component lead by an optional preamble.
"""
preamble = ""
- length = None
+ length = None # type: typing.Optional[int]
def __init__(self, value):
Value.__init__(self, value)
@@ -511,7 +510,7 @@ class IntField(_Component):
"""
An integer field, where values can optionally specified by name.
"""
- names = {}
+ names = {} # type: typing.Dict[str, int]
max = 16
preamble = ""
@@ -546,7 +545,7 @@ class NestedMessage(Token):
A nested message, as an escaped string with a preamble.
"""
preamble = ""
- nest_type = None
+ nest_type = None # type: ignore
def __init__(self, value):
Token.__init__(self)
diff --git a/pathod/language/http.py b/pathod/language/http.py
index 8fcf9edc..5cd717a9 100644
--- a/pathod/language/http.py
+++ b/pathod/language/http.py
@@ -54,7 +54,7 @@ class Method(base.OptionsOrValue):
class _HeaderMixin:
- unique_name = None
+ unique_name = None # type: ignore
def format_header(self, key, value):
return [key, b": ", value, b"\r\n"]
@@ -143,7 +143,7 @@ class _HTTPMessage(message.Message):
class Response(_HTTPMessage):
- unique_name = None
+ unique_name = None # type: ignore
comps = (
Header,
ShortcutContentType,
diff --git a/pathod/language/http2.py b/pathod/language/http2.py
index 08c5f6d7..47d6e370 100644
--- a/pathod/language/http2.py
+++ b/pathod/language/http2.py
@@ -1,9 +1,9 @@
import pyparsing as pp
-
from mitmproxy.net import http
from mitmproxy.net.http import user_agents, Headers
from . import base, message
+
"""
Normal HTTP requests:
<method>:<path>:<header>:<body>
@@ -41,7 +41,7 @@ def get_header(val, headers):
class _HeaderMixin:
- unique_name = None
+ unique_name = None # type: ignore
def values(self, settings):
return (
@@ -146,7 +146,7 @@ class Times(base.Integer):
class Response(_HTTP2Message):
- unique_name = None
+ unique_name = None # type: ignore
comps = (
Header,
Body,
diff --git a/pathod/language/message.py b/pathod/language/message.py
index 6cdaaa0b..6b4c5021 100644
--- a/pathod/language/message.py
+++ b/pathod/language/message.py
@@ -1,13 +1,14 @@
import abc
from . import actions, exceptions
from mitmproxy.utils import strutils
+import typing # noqa
LOG_TRUNCATE = 1024
class Message:
__metaclass__ = abc.ABCMeta
- logattrs = []
+ logattrs = [] # type: typing.List[str]
def __init__(self, tokens):
track = set([])
diff --git a/pathod/language/websockets.py b/pathod/language/websockets.py
index a237381c..b4faf59b 100644
--- a/pathod/language/websockets.py
+++ b/pathod/language/websockets.py
@@ -4,6 +4,7 @@ import mitmproxy.net.websockets
from mitmproxy.utils import strutils
import pyparsing as pp
from . import base, generators, actions, message
+import typing # noqa
NESTED_LEADER = b"pathod!"
@@ -20,7 +21,7 @@ class OpCode(base.IntField):
"close": mitmproxy.net.websockets.OPCODE.CLOSE,
"ping": mitmproxy.net.websockets.OPCODE.PING,
"pong": mitmproxy.net.websockets.OPCODE.PONG,
- }
+ } # type: typing.Dict[str, int]
max = 15
preamble = "c"
@@ -239,7 +240,14 @@ class NestedFrame(base.NestedMessage):
nest_type = WebsocketFrame
+COMP = typing.Tuple[
+ typing.Type[OpCode], typing.Type[Length], typing.Type[Fin], typing.Type[RSV1], typing.Type[RSV2], typing.Type[RSV3], typing.Type[Mask],
+ typing.Type[actions.PauseAt], typing.Type[actions.DisconnectAt], typing.Type[actions.InjectAt], typing.Type[KeyNone], typing.Type[Key],
+ typing.Type[Times], typing.Type[Body], typing.Type[RawBody]
+]
+
+
class WebsocketClientFrame(WebsocketFrame):
- components = COMPONENTS + (
+ components = typing.cast(COMP, COMPONENTS + (
NestedFrame,
- )
+ ))
diff --git a/pathod/pathod.py b/pathod/pathod.py
index 7416d325..7c773c3b 100644
--- a/pathod/pathod.py
+++ b/pathod/pathod.py
@@ -3,19 +3,17 @@ import logging
import os
import sys
import threading
-
from mitmproxy.net import tcp
from mitmproxy import certs as mcerts
from mitmproxy.net import websockets
from mitmproxy import version
-
import urllib
from mitmproxy import exceptions
-
from pathod import language
from pathod import utils
from pathod import log
from pathod import protocols
+import typing # noqa
DEFAULT_CERT_DOMAIN = b"pathod.net"
@@ -71,7 +69,7 @@ class SSLOptions:
class PathodHandler(tcp.BaseHandler):
wbufsize = 0
- sni = None
+ sni = None # type: typing.Union[str, None, bool]
def __init__(
self,
diff --git a/pathod/test.py b/pathod/test.py
index 81f5805f..52f3ba02 100644
--- a/pathod/test.py
+++ b/pathod/test.py
@@ -1,16 +1,16 @@
import io
import time
import queue
-
from . import pathod
from mitmproxy.types import basethread
+import typing # noqa
class Daemon:
IFACE = "127.0.0.1"
- def __init__(self, ssl=None, **daemonargs):
- self.q = queue.Queue()
+ def __init__(self, ssl=None, **daemonargs) -> None:
+ self.q = queue.Queue() # type: queue.Queue
self.logfp = io.StringIO()
daemonargs["logfp"] = self.logfp
self.thread = _PaThread(self.IFACE, self.q, ssl, daemonargs)
@@ -25,18 +25,18 @@ class Daemon:
def __enter__(self):
return self
- def __exit__(self, type, value, traceback):
+ def __exit__(self, type, value, traceback) -> bool:
self.logfp.truncate(0)
self.shutdown()
return False
- def p(self, spec):
+ def p(self, spec: str) -> str:
"""
Return a URL that will render the response in spec.
"""
return "%s/p/%s" % (self.urlbase, spec)
- def text_log(self):
+ def text_log(self) -> str:
return self.logfp.getvalue()
def wait_for_silence(self, timeout=5):
@@ -62,7 +62,7 @@ class Daemon:
return None
return l[-1]
- def log(self):
+ def log(self) -> typing.List[typing.Dict]:
"""
Return the log buffer as a list of dictionaries.
"""
diff --git a/pathod/utils.py b/pathod/utils.py
index 44ad1f87..11b1dccd 100644
--- a/pathod/utils.py
+++ b/pathod/utils.py
@@ -1,6 +1,7 @@
import os
import sys
from mitmproxy.utils import data as mdata
+import typing # noqa
class MemBool:
@@ -9,10 +10,10 @@ class MemBool:
Truth-checking with a memory, for use in chained if statements.
"""
- def __init__(self):
- self.v = None
+ def __init__(self) -> None:
+ self.v = None # type: typing.Optional[bool]
- def __call__(self, v):
+ def __call__(self, v: bool) -> bool:
self.v = v
return bool(v)
diff --git a/test/mitmproxy/addons/test_core.py b/test/mitmproxy/addons/test_core.py
index 64d0fa19..c132d80a 100644
--- a/test/mitmproxy/addons/test_core.py
+++ b/test/mitmproxy/addons/test_core.py
@@ -61,3 +61,105 @@ def test_revert():
assert f.modified()
sa.revert([f])
assert not f.modified()
+
+
+def test_flow_set():
+ sa = core.Core()
+ with taddons.context():
+ f = tflow.tflow(resp=True)
+ assert sa.flow_set_options()
+
+ with pytest.raises(exceptions.CommandError):
+ sa.flow_set([f], "flibble", "post")
+
+ assert f.request.method != "post"
+ sa.flow_set([f], "method", "post")
+ assert f.request.method == "POST"
+
+ assert f.request.host != "testhost"
+ sa.flow_set([f], "host", "testhost")
+ assert f.request.host == "testhost"
+
+ assert f.request.path != "/test/path"
+ sa.flow_set([f], "path", "/test/path")
+ assert f.request.path == "/test/path"
+
+ assert f.request.url != "http://foo.com/bar"
+ sa.flow_set([f], "url", "http://foo.com/bar")
+ assert f.request.url == "http://foo.com/bar"
+ with pytest.raises(exceptions.CommandError):
+ sa.flow_set([f], "url", "oink")
+
+ assert f.response.status_code != 404
+ sa.flow_set([f], "status_code", "404")
+ assert f.response.status_code == 404
+ assert f.response.reason == "Not Found"
+ with pytest.raises(exceptions.CommandError):
+ sa.flow_set([f], "status_code", "oink")
+
+ assert f.response.reason != "foo"
+ sa.flow_set([f], "reason", "foo")
+ assert f.response.reason == "foo"
+
+
+def test_encoding():
+ sa = core.Core()
+ with taddons.context():
+ f = tflow.tflow()
+ assert sa.encode_options()
+ sa.encode([f], "request", "deflate")
+ assert f.request.headers["content-encoding"] == "deflate"
+
+ sa.encode([f], "request", "br")
+ assert f.request.headers["content-encoding"] == "deflate"
+
+ sa.decode([f], "request")
+ assert "content-encoding" not in f.request.headers
+
+ sa.encode([f], "request", "br")
+ assert f.request.headers["content-encoding"] == "br"
+
+ sa.encode_toggle([f], "request")
+ assert "content-encoding" not in f.request.headers
+ sa.encode_toggle([f], "request")
+ assert f.request.headers["content-encoding"] == "deflate"
+ sa.encode_toggle([f], "request")
+ assert "content-encoding" not in f.request.headers
+
+ with pytest.raises(exceptions.CommandError):
+ sa.encode([f], "request", "invalid")
+
+
+def test_options(tmpdir):
+ p = str(tmpdir.join("path"))
+ sa = core.Core()
+ with taddons.context() as tctx:
+ tctx.options.stickycookie = "foo"
+ assert tctx.options.stickycookie == "foo"
+ sa.options_reset()
+ assert tctx.options.stickycookie is None
+
+ tctx.options.stickycookie = "foo"
+ tctx.options.stickyauth = "bar"
+ sa.options_reset_one("stickycookie")
+ assert tctx.options.stickycookie is None
+ assert tctx.options.stickyauth == "bar"
+
+ with pytest.raises(exceptions.CommandError):
+ sa.options_reset_one("unknown")
+
+ sa.options_save(p)
+ with pytest.raises(exceptions.CommandError):
+ sa.options_save("/")
+
+ sa.options_reset()
+ assert tctx.options.stickyauth is None
+ sa.options_load(p)
+ assert tctx.options.stickyauth == "bar"
+
+ sa.options_load("/nonexistent")
+
+ with open(p, 'a') as f:
+ f.write("'''")
+ with pytest.raises(exceptions.CommandError):
+ sa.options_load(p)
diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py
index a3df1fcf..dd5349cb 100644
--- a/test/mitmproxy/addons/test_script.py
+++ b/test/mitmproxy/addons/test_script.py
@@ -152,7 +152,7 @@ class TestScriptLoader:
sc = script.ScriptLoader()
with taddons.context():
with pytest.raises(exceptions.CommandError):
- sc.script_run([tflow.tflow(resp=True)], "/nonexistent")
+ sc.script_run([tflow.tflow(resp=True)], "/")
def test_simple(self):
sc = script.ScriptLoader()
diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py
index 1724da49..6da13650 100644
--- a/test/mitmproxy/addons/test_view.py
+++ b/test/mitmproxy/addons/test_view.py
@@ -218,9 +218,10 @@ def test_resolve():
tctx.command(v.resolve, "~")
-def test_go():
+def test_movement():
v = view.View()
with taddons.context():
+ v.go(0)
v.add([
tflow.tflow(),
tflow.tflow(),
@@ -240,6 +241,11 @@ def test_go():
v.go(-999)
assert v.focus.index == 0
+ v.focus_next()
+ assert v.focus.index == 1
+ v.focus_prev()
+ assert v.focus.index == 0
+
def test_duplicate():
v = view.View()
@@ -255,6 +261,21 @@ def test_duplicate():
assert v.focus.index == 2
+def test_setgetval():
+ v = view.View()
+ with taddons.context():
+ f = tflow.tflow()
+ v.add([f])
+ v.setvalue([f], "key", "value")
+ assert v.getvalue(f, "key", "default") == "value"
+ assert v.getvalue(f, "unknow", "default") == "default"
+
+ v.setvalue_toggle([f], "key")
+ assert v.getvalue(f, "key", "default") == "true"
+ v.setvalue_toggle([f], "key")
+ assert v.getvalue(f, "key", "default") == "false"
+
+
def test_order():
v = view.View()
with taddons.context() as tctx:
diff --git a/test/mitmproxy/tools/console/test_help.py b/test/mitmproxy/tools/console/test_help.py
index ac3011e6..0ebc2d6a 100644
--- a/test/mitmproxy/tools/console/test_help.py
+++ b/test/mitmproxy/tools/console/test_help.py
@@ -9,9 +9,3 @@ class TestHelp:
def test_helptext(self):
h = help.HelpView(None)
assert h.helptext()
-
- def test_keypress(self):
- h = help.HelpView([1, 2, 3])
- assert not h.keypress((0, 0), "q")
- assert not h.keypress((0, 0), "?")
- assert h.keypress((0, 0), "o") == "o"
diff --git a/test/mitmproxy/tools/console/test_keymap.py b/test/mitmproxy/tools/console/test_keymap.py
new file mode 100644
index 00000000..6a75800e
--- /dev/null
+++ b/test/mitmproxy/tools/console/test_keymap.py
@@ -0,0 +1,29 @@
+from mitmproxy.tools.console import keymap
+from mitmproxy.test import taddons
+from unittest import mock
+import pytest
+
+
+def test_bind():
+ with taddons.context() as tctx:
+ km = keymap.Keymap(tctx.master)
+ km.executor = mock.Mock()
+
+ with pytest.raises(ValueError):
+ km.add("foo", "bar", ["unsupported"])
+
+ km.add("key", "str", ["options", "commands"])
+ assert km.get("options", "key")
+ assert km.get("commands", "key")
+ assert not km.get("flowlist", "key")
+
+ km.handle("unknown", "unknown")
+ assert not km.executor.called
+
+ km.handle("options", "key")
+ assert km.executor.called
+
+ km.add("glob", "str", ["global"])
+ km.executor = mock.Mock()
+ km.handle("options", "glob")
+ assert km.executor.called
diff --git a/tox.ini b/tox.ini
index fafb455e..3f3240ae 100644
--- a/tox.ini
+++ b/tox.ini
@@ -28,6 +28,8 @@ commands =
python3 test/filename_matching.py
rstcheck README.rst
mypy --ignore-missing-imports ./mitmproxy
+ mypy --ignore-missing-imports ./pathod
+ mypy --ignore-missing-imports --follow-imports=skip ./examples/simple/
[testenv:individual_coverage]
deps =