diff options
40 files changed, 1115 insertions, 1063 deletions
diff --git a/examples/simple/custom_contentview.py b/examples/simple/custom_contentview.py index 289e1efe..71f92575 100644 --- a/examples/simple/custom_contentview.py +++ b/examples/simple/custom_contentview.py @@ -3,7 +3,10 @@ This example shows how one can add a custom contentview to mitmproxy. The content view API is explained in the mitmproxy.contentviews module. """ from mitmproxy import contentviews -from typing import Tuple, Iterable, AnyStr, List +import typing + + +CVIEWSWAPCASE = typing.Tuple[str, typing.Iterable[typing.List[typing.Tuple[str, typing.AnyStr]]]] class ViewSwapCase(contentviews.View): @@ -14,7 +17,7 @@ class ViewSwapCase(contentviews.View): prompt = ("swap case text", "z") content_types = ["text/plain"] - def __call__(self, data: bytes, **metadata) -> Tuple[str, Iterable[List[Tuple[str, AnyStr]]]]: + def __call__(self, data: typing.AnyStr, **metadata) -> CVIEWSWAPCASE: return "case-swapped text", contentviews.format_text(data.swapcase()) diff --git a/examples/simple/io_read_dumpfile.py b/examples/simple/io_read_dumpfile.py index 87d37c0f..ea544cc4 100644 --- a/examples/simple/io_read_dumpfile.py +++ b/examples/simple/io_read_dumpfile.py @@ -1,8 +1,9 @@ #!/usr/bin/env python + +# type: ignore # # Simple script showing how to read a mitmproxy dump file # - from mitmproxy import io from mitmproxy.exceptions import FlowReadException import pprint diff --git a/examples/simple/io_write_dumpfile.py b/examples/simple/io_write_dumpfile.py index 7c4c6a7a..cf7c4f52 100644 --- a/examples/simple/io_write_dumpfile.py +++ b/examples/simple/io_write_dumpfile.py @@ -8,12 +8,13 @@ to multiple files in parallel. import random import sys from mitmproxy import io, http +import typing # noqa class Writer: def __init__(self, path: str) -> None: if path == "-": - f = sys.stdout # type: io.TextIO + f = sys.stdout # type: typing.IO[typing.Any] else: f = open(path, "wb") self.w = io.FlowWriter(f) diff --git a/mitmproxy/addonmanager.py b/mitmproxy/addonmanager.py index 5d63b1b3..0bbe6287 100644 --- a/mitmproxy/addonmanager.py +++ b/mitmproxy/addonmanager.py @@ -6,6 +6,7 @@ import sys from mitmproxy import exceptions from mitmproxy import eventsequence from mitmproxy import controller +from mitmproxy import flow from . import ctx import pprint @@ -215,6 +216,9 @@ class AddonManager: if isinstance(message.reply, controller.DummyReply): message.reply.mark_reset() + if isinstance(message, flow.Flow): + self.trigger("update", [message]) + def invoke_addon(self, addon, name, *args, **kwargs): """ Invoke an event on an addon and all its children. This method must diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py index b482edbb..426c47ad 100644 --- a/mitmproxy/addons/core.py +++ b/mitmproxy/addons/core.py @@ -4,6 +4,8 @@ from mitmproxy import ctx from mitmproxy import exceptions from mitmproxy import command from mitmproxy import flow +from mitmproxy import optmanager +from mitmproxy.net.http import status_codes class Core: @@ -79,3 +81,179 @@ class Core: updated.append(f) ctx.log.alert("Reverted %s flows." % len(updated)) ctx.master.addons.trigger("update", updated) + + @command.command("flow.set.options") + def flow_set_options(self) -> typing.Sequence[str]: + return [ + "host", + "status_code", + "method", + "path", + "url", + "reason", + ] + + @command.command("flow.set") + def flow_set( + self, + flows: typing.Sequence[flow.Flow], spec: str, sval: str + ) -> None: + """ + Quickly set a number of common values on flows. + """ + opts = self.flow_set_options() + if spec not in opts: + raise exceptions.CommandError( + "Set spec must be one of: %s." % ", ".join(opts) + ) + + val = sval # type: typing.Union[int, str] + if spec == "status_code": + try: + val = int(val) + except ValueError as v: + raise exceptions.CommandError( + "Status code is not an integer: %s" % val + ) from v + + updated = [] + for f in flows: + req = getattr(f, "request", None) + rupdate = True + if req: + if spec == "method": + req.method = val + elif spec == "host": + req.host = val + elif spec == "path": + req.path = val + elif spec == "url": + try: + req.url = val + except ValueError as e: + raise exceptions.CommandError( + "URL %s is invalid: %s" % (repr(val), e) + ) from e + else: + self.rupdate = False + + resp = getattr(f, "response", None) + supdate = True + if resp: + if spec == "status_code": + resp.status_code = val + if val in status_codes.RESPONSES: + resp.reason = status_codes.RESPONSES[int(val)] + elif spec == "reason": + resp.reason = val + else: + supdate = False + + if rupdate or supdate: + updated.append(f) + + ctx.master.addons.trigger("update", updated) + ctx.log.alert("Set %s on %s flows." % (spec, len(updated))) + + @command.command("flow.decode") + def decode(self, flows: typing.Sequence[flow.Flow], part: str) -> None: + """ + Decode flows. + """ + updated = [] + for f in flows: + p = getattr(f, part, None) + if p: + p.decode() + updated.append(f) + ctx.master.addons.trigger("update", updated) + ctx.log.alert("Decoded %s flows." % len(updated)) + + @command.command("flow.encode.toggle") + def encode_toggle(self, flows: typing.Sequence[flow.Flow], part: str) -> None: + """ + Toggle flow encoding on and off, using deflate for encoding. + """ + updated = [] + for f in flows: + p = getattr(f, part, None) + if p: + current_enc = p.headers.get("content-encoding", "identity") + if current_enc == "identity": + p.encode("deflate") + else: + p.decode() + updated.append(f) + ctx.master.addons.trigger("update", updated) + ctx.log.alert("Toggled encoding on %s flows." % len(updated)) + + @command.command("flow.encode") + def encode(self, flows: typing.Sequence[flow.Flow], part: str, enc: str) -> None: + """ + Encode flows with a specified encoding. + """ + if enc not in self.encode_options(): + raise exceptions.CommandError("Invalid encoding format: %s" % enc) + + updated = [] + for f in flows: + p = getattr(f, part, None) + if p: + current_enc = p.headers.get("content-encoding", "identity") + if current_enc == "identity": + p.encode(enc) + updated.append(f) + ctx.master.addons.trigger("update", updated) + ctx.log.alert("Encoded %s flows." % len(updated)) + + @command.command("flow.encode.options") + def encode_options(self) -> typing.Sequence[str]: + """ + The possible values for an encoding specification. + + """ + return ["gzip", "deflate", "br"] + + @command.command("options.load") + def options_load(self, path: str) -> None: + """ + Load options from a file. + """ + try: + optmanager.load_paths(ctx.options, path) + except (OSError, exceptions.OptionsError) as e: + raise exceptions.CommandError( + "Could not load options - %s" % e + ) from e + + @command.command("options.save") + def options_save(self, path: str) -> None: + """ + Save options to a file. + """ + try: + optmanager.save(ctx.options, path) + except OSError as e: + raise exceptions.CommandError( + "Could not save options - %s" % e + ) from e + + @command.command("options.reset") + def options_reset(self) -> None: + """ + Reset all options to defaults. + """ + ctx.options.reset() + + @command.command("options.reset.one") + def options_reset_one(self, name: str) -> None: + """ + Reset one option to its default value. + """ + if name not in ctx.options: + raise exceptions.CommandError("No such option: %s" % name) + setattr( + ctx.options, + name, + ctx.options.default(name), + ) diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py index c9c9cbed..dd579585 100644 --- a/mitmproxy/addons/view.py +++ b/mitmproxy/addons/view.py @@ -202,6 +202,24 @@ class View(collections.Sequence): self.sig_view_refresh.send(self) # API + @command.command("view.focus.next") + def focus_next(self) -> None: + """ + Set focus to the next flow. + """ + idx = self.focus.index + 1 + if self.inbounds(idx): + self.focus.flow = self[idx] + + @command.command("view.focus.prev") + def focus_prev(self) -> None: + """ + Set focus to the previous flow. + """ + idx = self.focus.index - 1 + if self.inbounds(idx): + self.focus.flow = self[idx] + @command.command("view.order.options") def order_options(self) -> typing.Sequence[str]: """ @@ -278,6 +296,45 @@ class View(collections.Sequence): """ return self._store.get(flow_id) + @command.command("view.getval") + def getvalue(self, f: mitmproxy.flow.Flow, key: str, default: str) -> str: + """ + Get a value from the settings store for the specified flow. + """ + return self.settings[f].get(key, default) + + @command.command("view.setval.toggle") + def setvalue_toggle( + self, + flows: typing.Sequence[mitmproxy.flow.Flow], + key: str + ) -> None: + """ + Toggle a boolean value in the settings store, seting the value to + the string "true" or "false". + """ + updated = [] + for f in flows: + current = self.settings[f].get("key", "false") + self.settings[f][key] = "false" if current == "true" else "true" + updated.append(f) + ctx.master.addons.trigger("update", updated) + + @command.command("view.setval") + def setvalue( + self, + flows: typing.Sequence[mitmproxy.flow.Flow], + key: str, value: str + ) -> None: + """ + Set a value in the settings store for the specified flows. + """ + updated = [] + for f in flows: + self.settings[f][key] = value + updated.append(f) + ctx.master.addons.trigger("update", updated) + @command.command("view.load") def load_file(self, path: str) -> None: """ @@ -296,6 +353,8 @@ class View(collections.Sequence): the view, negative from the end of the view, so that 0 is the first flow, -1 is the last flow. """ + if len(self) == 0: + return if dst < 0: dst = len(self) + dst if dst < 0: @@ -314,6 +373,7 @@ class View(collections.Sequence): if dups: self.add(dups) self.focus.flow = dups[0] + ctx.log.alert("Duplicated %s flows" % len(dups)) @command.command("view.remove") def remove(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None: diff --git a/mitmproxy/tools/console/commands.py b/mitmproxy/tools/console/commands.py index 689aa637..76827a99 100644 --- a/mitmproxy/tools/console/commands.py +++ b/mitmproxy/tools/console/commands.py @@ -120,6 +120,12 @@ class CommandsList(urwid.ListBox): if key == "enter": foc, idx = self.get_focus() signals.status_prompt_command.send(partial=foc.cmd.path + " ") + elif key == "m_start": + self.set_focus(0) + self.walker._modified() + elif key == "m_end": + self.set_focus(len(self.walker.cmds) - 1) + self.walker._modified() return super().keypress(size, key) @@ -146,6 +152,8 @@ class CommandHelp(urwid.Frame): class Commands(urwid.Pile): + keyctx = "commands" + def __init__(self, master): oh = CommandHelp(master) super().__init__( @@ -157,7 +165,6 @@ class Commands(urwid.Pile): self.master = master def keypress(self, size, key): - key = common.shortcuts(key) if key == "tab": self.focus_position = ( self.focus_position + 1 diff --git a/mitmproxy/tools/console/common.py b/mitmproxy/tools/console/common.py index 812ca7a8..de024d1a 100644 --- a/mitmproxy/tools/console/common.py +++ b/mitmproxy/tools/console/common.py @@ -1,24 +1,9 @@ -# -*- coding: utf-8 -*- - - -import os - import urwid import urwid.util -import mitmproxy.net from functools import lru_cache -from mitmproxy.tools.console import signals from mitmproxy.utils import human -try: - import pyperclip -except: - pyperclip = False - - -VIEW_FLOW_REQUEST = 0 -VIEW_FLOW_RESPONSE = 1 METHOD_OPTIONS = [ ("get", "g"), @@ -92,20 +77,6 @@ def format_keyvals(lst, key="key", val="text", indent=0): return ret -def shortcuts(k): - if k == " ": - k = "page down" - elif k == "ctrl f": - k = "page down" - elif k == "ctrl b": - k = "page up" - elif k == "j": - k = "down" - elif k == "k": - k = "up" - return k - - def fcol(s, attr): s = str(s) return ( @@ -133,178 +104,6 @@ else: SYMBOL_DOWN = " " -# Save file to disk -def save_data(path, data): - if not path: - return - try: - if isinstance(data, bytes): - mode = "wb" - else: - mode = "w" - with open(path, mode) as f: - f.write(data) - except IOError as v: - signals.status_message.send(message=v.strerror) - - -def ask_save_overwrite(path, data): - if os.path.exists(path): - def save_overwrite(k): - if k == "y": - save_data(path, data) - - signals.status_prompt_onekey.send( - prompt = "'" + path + "' already exists. Overwrite?", - keys = ( - ("yes", "y"), - ("no", "n"), - ), - callback = save_overwrite - ) - else: - save_data(path, data) - - -def ask_save_path(data, prompt="File path"): - signals.status_prompt_path.send( - prompt = prompt, - callback = ask_save_overwrite, - args = (data, ) - ) - - -def ask_scope_and_callback(flow, cb, *args): - request_has_content = flow.request and flow.request.raw_content - response_has_content = flow.response and flow.response.raw_content - - if request_has_content and response_has_content: - signals.status_prompt_onekey.send( - prompt = "Save", - keys = ( - ("request", "q"), - ("response", "s"), - ("both", "b"), - ), - callback = cb, - args = (flow,) + args - ) - elif response_has_content: - cb("s", flow, *args) - else: - cb("q", flow, *args) - - -def copy_to_clipboard_or_prompt(data): - # pyperclip calls encode('utf-8') on data to be copied without checking. - # if data are already encoded that way UnicodeDecodeError is thrown. - if isinstance(data, bytes): - toclip = data.decode("utf8", "replace") - else: - toclip = data - - try: - pyperclip.copy(toclip) - except (RuntimeError, UnicodeDecodeError, AttributeError, TypeError): - def save(k): - if k == "y": - ask_save_path(data, "Save data") - signals.status_prompt_onekey.send( - prompt = "Cannot copy data to clipboard. Save as file?", - keys = ( - ("yes", "y"), - ("no", "n"), - ), - callback = save - ) - - -def format_flow_data(key, scope, flow): - data = b"" - if scope in ("q", "b"): - request = flow.request.copy() - request.decode(strict=False) - if request.content is None: - return None, "Request content is missing" - if key == "h": - data += mitmproxy.net.http.http1.assemble_request(request) - elif key == "c": - data += request.get_content(strict=False) - else: - raise ValueError("Unknown key: {}".format(key)) - if scope == "b" and flow.request.raw_content and flow.response: - # Add padding between request and response - data += b"\r\n" * 2 - if scope in ("s", "b") and flow.response: - response = flow.response.copy() - response.decode(strict=False) - if response.content is None: - return None, "Response content is missing" - if key == "h": - data += mitmproxy.net.http.http1.assemble_response(response) - elif key == "c": - data += response.get_content(strict=False) - else: - raise ValueError("Unknown key: {}".format(key)) - return data, False - - -def handle_flow_data(scope, flow, key, writer): - """ - key: _c_ontent, _h_eaders+content, _u_rl - scope: re_q_uest, re_s_ponse, _b_oth - writer: copy_to_clipboard_or_prompt, ask_save_path - """ - data, err = format_flow_data(key, scope, flow) - - if err: - signals.status_message.send(message=err) - return - - if not data: - if scope == "q": - signals.status_message.send(message="No request content.") - elif scope == "s": - signals.status_message.send(message="No response content.") - else: - signals.status_message.send(message="No content.") - return - - writer(data) - - -def ask_save_body(scope, flow): - """ - Save either the request or the response body to disk. - - scope: re_q_uest, re_s_ponse, _b_oth, None (ask user if necessary) - """ - - request_has_content = flow.request and flow.request.raw_content - response_has_content = flow.response and flow.response.raw_content - - if scope is None: - ask_scope_and_callback(flow, ask_save_body) - elif scope == "q" and request_has_content: - ask_save_path( - flow.request.get_content(strict=False), - "Save request content to" - ) - elif scope == "s" and response_has_content: - ask_save_path( - flow.response.get_content(strict=False), - "Save response content to" - ) - elif scope == "b" and request_has_content and response_has_content: - ask_save_path( - (flow.request.get_content(strict=False) + b"\n" + - flow.response.get_content(strict=False)), - "Save request & response content to" - ) - else: - signals.status_message.send(message="No content.") - - @lru_cache(maxsize=800) def raw_format_flow(f, flow): f = dict(f) diff --git a/mitmproxy/tools/console/flowdetailview.py b/mitmproxy/tools/console/flowdetailview.py index 9ed063bc..28fe1fbc 100644 --- a/mitmproxy/tools/console/flowdetailview.py +++ b/mitmproxy/tools/console/flowdetailview.py @@ -27,7 +27,7 @@ def flowdetails(state, flow: http.HTTPFlow): text.append(urwid.Text([("head", "Metadata:")])) text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) - if sc is not None: + if sc is not None and sc.ip_address: text.append(urwid.Text([("head", "Server Connection:")])) parts = [ ["Address", human.format_address(sc.address)], @@ -183,4 +183,4 @@ def flowdetails(state, flow: http.HTTPFlow): text.append(urwid.Text([("head", "Timing:")])) text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) - return searchable.Searchable(state, text) + return searchable.Searchable(text) diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py index 7400c16c..8e28ff0f 100644 --- a/mitmproxy/tools/console/flowlist.py +++ b/mitmproxy/tools/console/flowlist.py @@ -1,7 +1,6 @@ import urwid from mitmproxy.tools.console import common -from mitmproxy.tools.console import signals import mitmproxy.tools.console.master # noqa @@ -59,13 +58,12 @@ class LogBufferBox(urwid.ListBox): super().set_focus(index) def keypress(self, size, key): - key = common.shortcuts(key) if key == "z": self.master.clear_events() key = None - elif key == "G": + elif key == "m_end": self.set_focus(len(self.master.logbuffer) - 1) - elif key == "g": + elif key == "m_start": self.set_focus(0) return urwid.ListBox.keypress(self, size, key) @@ -137,22 +135,15 @@ class FlowItem(urwid.WidgetWrap): return True def keypress(self, xxx_todo_changeme, key): - (maxcol,) = xxx_todo_changeme - return common.shortcuts(key) + return key class FlowListWalker(urwid.ListWalker): def __init__(self, master): self.master = master - self.master.view.sig_view_refresh.connect(self.sig_mod) - self.master.view.sig_view_add.connect(self.sig_mod) - self.master.view.sig_view_remove.connect(self.sig_mod) - self.master.view.sig_view_update.connect(self.sig_mod) - self.master.view.focus.sig_change.connect(self.sig_mod) - signals.flowlist_change.connect(self.sig_mod) - - def sig_mod(self, *args, **kwargs): + + def view_changed(self): self._modified() def get_focus(self): @@ -164,7 +155,6 @@ class FlowListWalker(urwid.ListWalker): def set_focus(self, index): if self.master.view.inbounds(index): self.master.view.focus.index = index - signals.flowlist_change.send(self) def get_next(self, pos): pos = pos + 1 @@ -182,6 +172,7 @@ class FlowListWalker(urwid.ListWalker): class FlowListBox(urwid.ListBox): + keyctx = "flowlist" def __init__( self, master: "mitmproxy.tools.console.master.ConsoleMaster" @@ -190,5 +181,11 @@ class FlowListBox(urwid.ListBox): super().__init__(FlowListWalker(master)) def keypress(self, size, key): - key = common.shortcuts(key) + if key == "m_start": + self.master.commands.call("view.go 0") + elif key == "m_end": + self.master.commands.call("view.go -1") return urwid.ListBox.keypress(self, size, key) + + def view_changed(self): + self.body.view_changed() diff --git a/mitmproxy/tools/console/flowview.py b/mitmproxy/tools/console/flowview.py index b7b7053f..00951610 100644 --- a/mitmproxy/tools/console/flowview.py +++ b/mitmproxy/tools/console/flowview.py @@ -1,5 +1,4 @@ import math -import os import sys from functools import lru_cache from typing import Optional, Union # noqa @@ -7,14 +6,9 @@ from typing import Optional, Union # noqa import urwid from mitmproxy import contentviews -from mitmproxy import exceptions from mitmproxy import http -from mitmproxy.net.http import Headers -from mitmproxy.net.http import status_codes from mitmproxy.tools.console import common from mitmproxy.tools.console import flowdetailview -from mitmproxy.tools.console import grideditor -from mitmproxy.tools.console import overlay from mitmproxy.tools.console import searchable from mitmproxy.tools.console import signals from mitmproxy.tools.console import tabs @@ -106,49 +100,45 @@ class FlowViewHeader(urwid.WidgetWrap): def __init__( self, master: "mitmproxy.tools.console.master.ConsoleMaster", - f: http.HTTPFlow ) -> None: self.master = master - self.flow = f - self._w = common.format_flow( - f, - False, - extended=True, - hostheader=self.master.options.showhost - ) - signals.flow_change.connect(self.sig_flow_change) + self.focus_changed() - def sig_flow_change(self, sender, flow): - if flow == self.flow: + def focus_changed(self): + if self.master.view.focus.flow: self._w = common.format_flow( - flow, + self.master.view.focus.flow, False, extended=True, hostheader=self.master.options.showhost ) + else: + self._w = urwid.Pile([]) -TAB_REQ = 0 -TAB_RESP = 1 - - -class FlowView(tabs.Tabs): - highlight_color = "focusfield" +class FlowDetails(tabs.Tabs): + def __init__(self, master): + self.master = master + super().__init__([]) + self.show() + self.last_displayed_body = None - def __init__(self, master, view, flow, tab_offset): - self.master, self.view, self.flow = master, view, flow - super().__init__( - [ + def focus_changed(self): + if self.master.view.focus.flow: + self.tabs = [ (self.tab_request, self.view_request), (self.tab_response, self.view_response), (self.tab_details, self.view_details), - ], - tab_offset - ) - + ] self.show() - self.last_displayed_body = None - signals.flow_change.connect(self.sig_flow_change) + + @property + def view(self): + return self.master.view + + @property + def flow(self): + return self.master.view.focus.flow def tab_request(self): if self.flow.intercepted and not self.flow.response: @@ -174,18 +164,13 @@ class FlowView(tabs.Tabs): def view_details(self): return flowdetailview.flowdetails(self.view, self.flow) - def sig_flow_change(self, sender, flow): - if flow == self.flow: - self.show() - def content_view(self, viewmode, message): if message.raw_content is None: msg, body = "", [urwid.Text([("error", "[content missing]")])] return msg, body else: - s = self.view.settings[self.flow] - full = s.get((self.tab_offset, "fullcontents"), False) - if full: + full = self.master.commands.call("view.getval @focus fullcontents false") + if full == "true": limit = sys.maxsize else: limit = contentviews.VIEW_CUTOFF @@ -241,12 +226,6 @@ class FlowView(tabs.Tabs): return description, text_objects - def viewmode_get(self): - return self.view.settings[self.flow].get( - (self.tab_offset, "prettyview"), - self.master.options.default_contentview - ) - def conn_text(self, conn): if conn: txt = common.format_keyvals( @@ -254,7 +233,7 @@ class FlowView(tabs.Tabs): key = "header", val = "text" ) - viewmode = self.viewmode_get() + viewmode = self.master.commands.call("console.flowview.mode") msg, body = self.content_view(viewmode, conn) cols = [ @@ -288,404 +267,23 @@ class FlowView(tabs.Tabs): ] ) ] - return searchable.Searchable(self.view, txt) - - def set_method_raw(self, m): - if m: - self.flow.request.method = m - signals.flow_change.send(self, flow = self.flow) - - def edit_method(self, m): - if m == "e": - signals.status_prompt.send( - prompt = "Method", - text = self.flow.request.method, - callback = self.set_method_raw - ) - else: - for i in common.METHOD_OPTIONS: - if i[1] == m: - self.flow.request.method = i[0].upper() - signals.flow_change.send(self, flow = self.flow) - - def set_url(self, url): - request = self.flow.request - try: - request.url = str(url) - except ValueError: - return "Invalid URL." - signals.flow_change.send(self, flow = self.flow) - - def set_resp_status_code(self, status_code): - try: - status_code = int(status_code) - except ValueError: - return None - self.flow.response.status_code = status_code - if status_code in status_codes.RESPONSES: - self.flow.response.reason = status_codes.RESPONSES[status_code] - signals.flow_change.send(self, flow = self.flow) - - def set_resp_reason(self, reason): - self.flow.response.reason = reason - signals.flow_change.send(self, flow = self.flow) - - def set_headers(self, fields, conn): - conn.headers = Headers(fields) - signals.flow_change.send(self, flow = self.flow) - - def set_query(self, lst, conn): - conn.query = lst - signals.flow_change.send(self, flow = self.flow) - - def set_path_components(self, lst, conn): - conn.path_components = lst - signals.flow_change.send(self, flow = self.flow) - - def set_form(self, lst, conn): - conn.urlencoded_form = lst - signals.flow_change.send(self, flow = self.flow) - - def edit_form(self, conn): - self.master.view_grideditor( - grideditor.URLEncodedFormEditor( - self.master, - conn.urlencoded_form.items(multi=True), - self.set_form, - conn - ) - ) - - def edit_form_confirm(self, key, conn): - if key == "y": - self.edit_form(conn) - - def set_cookies(self, lst, conn): - conn.cookies = lst - signals.flow_change.send(self, flow = self.flow) - - def set_setcookies(self, data, conn): - conn.cookies = data - signals.flow_change.send(self, flow = self.flow) - - def edit(self, part): - if self.tab_offset == TAB_REQ: - message = self.flow.request - else: - if not self.flow.response: - self.flow.response = http.HTTPResponse.make(200, b"") - message = self.flow.response - - self.flow.backup() - if message == self.flow.request and part == "c": - self.master.view_grideditor( - grideditor.CookieEditor( - self.master, - message.cookies.items(multi=True), - self.set_cookies, - message - ) - ) - if message == self.flow.response and part == "c": - self.master.view_grideditor( - grideditor.SetCookieEditor( - self.master, - message.cookies.items(multi=True), - self.set_setcookies, - message - ) - ) - if part == "r": - # Fix an issue caused by some editors when editing a - # request/response body. Many editors make it hard to save a - # file without a terminating newline on the last line. When - # editing message bodies, this can cause problems. For now, I - # just strip the newlines off the end of the body when we return - # from an editor. - c = self.master.spawn_editor(message.get_content(strict=False) or b"") - message.content = c.rstrip(b"\n") - elif part == "f": - if not message.urlencoded_form and message.raw_content: - signals.status_prompt_onekey.send( - prompt = "Existing body is not a URL-encoded form. Clear and edit?", - keys = [ - ("yes", "y"), - ("no", "n"), - ], - callback = self.edit_form_confirm, - args = (message,) - ) - else: - self.edit_form(message) - elif part == "h": - self.master.view_grideditor( - grideditor.HeaderEditor( - self.master, - message.headers.fields, - self.set_headers, - message - ) - ) - elif part == "p": - p = message.path_components - self.master.view_grideditor( - grideditor.PathEditor( - self.master, - p, - self.set_path_components, - message - ) - ) - elif part == "q": - self.master.view_grideditor( - grideditor.QueryEditor( - self.master, - message.query.items(multi=True), - self.set_query, message - ) - ) - elif part == "u": - signals.status_prompt.send( - prompt = "URL", - text = message.url, - callback = self.set_url - ) - elif part == "m" and message == self.flow.request: - signals.status_prompt_onekey.send( - prompt = "Method", - keys = common.METHOD_OPTIONS, - callback = self.edit_method - ) - elif part == "o": - signals.status_prompt.send( - prompt = "Code", - text = str(message.status_code), - callback = self.set_resp_status_code - ) - elif part == "m" and message == self.flow.response: - signals.status_prompt.send( - prompt = "Message", - text = message.reason, - callback = self.set_resp_reason - ) - signals.flow_change.send(self, flow = self.flow) + return searchable.Searchable(txt) - def view_flow(self, flow): - signals.pop_view_state.send(self) - self.master.view_flow(flow, self.tab_offset) - - def _view_nextprev_flow(self, idx, flow): - if not self.view.inbounds(idx): - signals.status_message.send(message="No more flows") - return - self.view_flow(self.view[idx]) - - def view_next_flow(self, flow): - return self._view_nextprev_flow(self.view.index(flow) + 1, flow) - - def view_prev_flow(self, flow): - return self._view_nextprev_flow(self.view.index(flow) - 1, flow) + def keypress(self, size, key): + key = super().keypress(size, key) + return self._w.keypress(size, key) - def change_this_display_mode(self, t): - view = contentviews.get(t) - self.view.settings[self.flow][(self.tab_offset, "prettyview")] = view.name.lower() - signals.flow_change.send(self, flow=self.flow) - def keypress(self, size, key): - conn = None # type: Optional[Union[http.HTTPRequest, http.HTTPResponse]] - if self.tab_offset == TAB_REQ: - conn = self.flow.request - elif self.tab_offset == TAB_RESP: - conn = self.flow.response +class FlowView(urwid.Frame): + keyctx = "flowview" - key = super().keypress(size, key) + def __init__(self, master): + super().__init__( + FlowDetails(master), + header = FlowViewHeader(master), + ) + self.master = master - # Special case: Space moves over to the next flow. - # We need to catch that before applying common.shortcuts() - if key == " ": - self.view_next_flow(self.flow) - return - - key = common.shortcuts(key) - if key in ("up", "down", "page up", "page down"): - # Pass scroll events to the wrapped widget - self._w.keypress(size, key) - elif key == "a": - self.flow.resume() - self.master.view.update(self.flow) - elif key == "A": - for f in self.view: - if f.intercepted: - f.resume() - self.master.view.update(self.flow) - elif key == "d": - if self.flow.killable: - self.flow.kill() - self.view.remove(self.flow) - if not self.view.focus.flow: - self.master.view_flowlist() - else: - self.view_flow(self.view.focus.flow) - elif key == "D": - cp = self.flow.copy() - self.master.view.add(cp) - self.master.view.focus.flow = cp - self.view_flow(cp) - signals.status_message.send(message="Duplicated.") - elif key == "p": - self.view_prev_flow(self.flow) - elif key == "r": - try: - self.master.replay_request(self.flow) - except exceptions.ReplayException as e: - signals.add_log("Replay error: %s" % e, "warn") - signals.flow_change.send(self, flow = self.flow) - elif key == "V": - if self.flow.modified(): - self.flow.revert() - signals.flow_change.send(self, flow = self.flow) - signals.status_message.send(message="Reverted.") - else: - signals.status_message.send(message="Flow not modified.") - elif key == "W": - signals.status_prompt_path.send( - prompt = "Save this flow", - callback = self.master.save_one_flow, - args = (self.flow,) - ) - elif key == "|": - signals.status_prompt_path.send( - prompt = "Send flow to script", - callback = self.master.run_script_once, - args = (self.flow,) - ) - elif key == "e": - if self.tab_offset == TAB_REQ: - signals.status_prompt_onekey.send( - prompt="Edit request", - keys=( - ("cookies", "c"), - ("query", "q"), - ("path", "p"), - ("url", "u"), - ("header", "h"), - ("form", "f"), - ("raw body", "r"), - ("method", "m"), - ), - callback=self.edit - ) - elif self.tab_offset == TAB_RESP: - signals.status_prompt_onekey.send( - prompt="Edit response", - keys=( - ("cookies", "c"), - ("code", "o"), - ("message", "m"), - ("header", "h"), - ("raw body", "r"), - ), - callback=self.edit - ) - else: - signals.status_message.send( - message="Tab to the request or response", - expire=1 - ) - elif key in set("bfgmxvzEC") and not conn: - signals.status_message.send( - message = "Tab to the request or response", - expire = 1 - ) - return - elif key == "b": - if self.tab_offset == TAB_REQ: - common.ask_save_body("q", self.flow) - else: - common.ask_save_body("s", self.flow) - elif key == "f": - self.view.settings[self.flow][(self.tab_offset, "fullcontents")] = True - signals.flow_change.send(self, flow = self.flow) - signals.status_message.send(message="Loading all body data...") - elif key == "m": - opts = [i.name.lower() for i in contentviews.views] - self.master.overlay( - overlay.Chooser( - "display mode", - opts, - self.viewmode_get(), - self.change_this_display_mode - ) - ) - elif key == "E": - pass - # if self.tab_offset == TAB_REQ: - # scope = "q" - # else: - # scope = "s" - # signals.status_prompt_onekey.send( - # self, - # prompt = "Export to file", - # keys = [(e[0], e[1]) for e in export.EXPORTERS], - # callback = common.export_to_clip_or_file, - # args = (scope, self.flow, common.ask_save_path) - # ) - elif key == "C": - pass - # if self.tab_offset == TAB_REQ: - # scope = "q" - # else: - # scope = "s" - # signals.status_prompt_onekey.send( - # self, - # prompt = "Export to clipboard", - # keys = [(e[0], e[1]) for e in export.EXPORTERS], - # callback = common.export_to_clip_or_file, - # args = (scope, self.flow, common.copy_to_clipboard_or_prompt) - # ) - elif key == "x": - conn.content = None - signals.flow_change.send(self, flow=self.flow) - elif key == "v": - if conn.raw_content: - t = conn.headers.get("content-type") - if "EDITOR" in os.environ or "PAGER" in os.environ: - self.master.spawn_external_viewer(conn.get_content(strict=False), t) - else: - signals.status_message.send( - message = "Error! Set $EDITOR or $PAGER." - ) - elif key == "z": - self.flow.backup() - enc = conn.headers.get("content-encoding", "identity") - if enc != "identity": - try: - conn.decode() - except ValueError: - signals.status_message.send( - message = "Could not decode - invalid data?" - ) - else: - signals.status_prompt_onekey.send( - prompt = "Select encoding: ", - keys = ( - ("gzip", "z"), - ("deflate", "d"), - ("brotli", "b"), - ), - callback = self.encode_callback, - args = (conn,) - ) - signals.flow_change.send(self, flow = self.flow) - else: - # Key is not handled here. - return key - - def encode_callback(self, key, conn): - encoding_map = { - "z": "gzip", - "d": "deflate", - "b": "br", - } - conn.encode(encoding_map[key]) - signals.flow_change.send(self, flow = self.flow) + def focus_changed(self, *args, **kwargs): + self.body.focus_changed() + self.header.focus_changed() diff --git a/mitmproxy/tools/console/grideditor/base.py b/mitmproxy/tools/console/grideditor/base.py index 151479a4..35ae655f 100644 --- a/mitmproxy/tools/console/grideditor/base.py +++ b/mitmproxy/tools/console/grideditor/base.py @@ -252,13 +252,13 @@ FIRST_WIDTH_MAX = 40 FIRST_WIDTH_MIN = 20 -class GridEditor(urwid.WidgetWrap): - title = None # type: str - columns = None # type: Sequence[Column] +class BaseGridEditor(urwid.WidgetWrap): def __init__( self, master: "mitmproxy.tools.console.master.ConsoleMaster", + title, + columns, value: Any, callback: Callable[..., None], *cb_args, @@ -266,6 +266,8 @@ class GridEditor(urwid.WidgetWrap): ) -> None: value = self.data_in(copy.deepcopy(value)) self.master = master + self.title = title + self.columns = columns self.value = value self.callback = callback self.cb_args = cb_args @@ -307,6 +309,13 @@ class GridEditor(urwid.WidgetWrap): signals.footer_help.send(self, helptext="") self.show_empty_msg() + def view_popping(self): + res = [] + for i in self.walker.lst: + if not i[1] and any([x for x in i[0]]): + res.append(i[0]) + self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs) + def show_empty_msg(self): if self.walker.lst: self._w.set_footer(None) @@ -337,22 +346,14 @@ class GridEditor(urwid.WidgetWrap): self._w.keypress(size, key) return None - key = common.shortcuts(key) column = self.columns[self.walker.focus_col] - if key in ["q", "esc"]: - res = [] - for i in self.walker.lst: - if not i[1] and any([x for x in i[0]]): - res.append(i[0]) - self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs) - signals.pop_view_state.send(self) - elif key == "g": + if key == "m_start": self.walker.set_focus(0) - elif key == "G": + elif key == "m_end": self.walker.set_focus(len(self.walker.lst) - 1) - elif key in ["h", "left"]: + elif key == "left": self.walker.left() - elif key in ["l", "right"]: + elif key == "right": self.walker.right() elif key == "tab": self.walker.tab_next() @@ -415,3 +416,74 @@ class GridEditor(urwid.WidgetWrap): ) ) return text + + +class GridEditor(urwid.WidgetWrap): + title = None # type: str + columns = None # type: Sequence[Column] + + def __init__( + self, + master: "mitmproxy.tools.console.master.ConsoleMaster", + value: Any, + callback: Callable[..., None], + *cb_args, + **cb_kwargs + ) -> None: + super().__init__( + master, + value, + self.title, + self.columns, + callback, + *cb_args, + **cb_kwargs + ) + + +class FocusEditor(urwid.WidgetWrap): + """ + A specialised GridEditor that edits the current focused flow. + """ + keyctx = "grideditor" + + def __init__(self, master): + self.master = master + self.focus_changed() + + def focus_changed(self): + if self.master.view.focus.flow: + self._w = BaseGridEditor( + self.master.view.focus.flow, + self.title, + self.columns, + self.get_data(self.master.view.focus.flow), + self.set_data_update, + self.master.view.focus.flow, + ) + else: + self._w = urwid.Pile([]) + + def call(self, v, name, *args, **kwargs): + f = getattr(v, name, None) + if f: + f(*args, **kwargs) + + def view_popping(self): + self.call(self._w, "view_popping") + + def get_data(self, flow): + """ + Retrieve the data to edit from the current flow. + """ + raise NotImplementedError + + def set_data(self, vals, flow): + """ + Set the current data on the flow. + """ + raise NotImplementedError + + def set_data_update(self, vals, flow): + self.set_data(vals, flow) + signals.flow_change.send(self, flow = flow) diff --git a/mitmproxy/tools/console/grideditor/editors.py b/mitmproxy/tools/console/grideditor/editors.py index e069fe2f..671e91fb 100644 --- a/mitmproxy/tools/console/grideditor/editors.py +++ b/mitmproxy/tools/console/grideditor/editors.py @@ -1,4 +1,3 @@ -import os import re import urwid @@ -13,18 +12,24 @@ from mitmproxy.tools.console.grideditor import col_bytes from mitmproxy.tools.console.grideditor import col_subgrid from mitmproxy.tools.console import signals from mitmproxy.net.http import user_agents +from mitmproxy.net.http import Headers -class QueryEditor(base.GridEditor): +class QueryEditor(base.FocusEditor): title = "Editing query" columns = [ col_text.Column("Key"), col_text.Column("Value") ] + def get_data(self, flow): + return flow.request.query.items(multi=True) -class HeaderEditor(base.GridEditor): - title = "Editing headers" + def set_data(self, vals, flow): + flow.request.query = vals + + +class HeaderEditor(base.FocusEditor): columns = [ col_bytes.Column("Key"), col_bytes.Column("Value") @@ -65,35 +70,38 @@ class HeaderEditor(base.GridEditor): return True -class URLEncodedFormEditor(base.GridEditor): +class RequestHeaderEditor(HeaderEditor): + title = "Editing request headers" + + def get_data(self, flow): + return flow.request.headers.fields + + def set_data(self, vals, flow): + flow.request.headers = Headers(vals) + + +class ResponseHeaderEditor(HeaderEditor): + title = "Editing response headers" + + def get_data(self, flow): + return flow.response.headers.fields + + def set_data(self, vals, flow): + flow.response.headers = Headers(vals) + + +class RequestFormEditor(base.FocusEditor): title = "Editing URL-encoded form" columns = [ col_text.Column("Key"), col_text.Column("Value") ] + def get_data(self, flow): + return flow.request.urlencoded_form.items(multi=True) -class ReplaceEditor(base.GridEditor): - title = "Editing replacement patterns" - columns = [ - col_text.Column("Filter"), - col_text.Column("Regex"), - col_text.Column("Replacement"), - ] - - def is_error(self, col, val): - if col == 0: - if not flowfilter.parse(val): - return "Invalid filter specification." - elif col == 1: - try: - re.compile(val) - except re.error: - return "Invalid regular expression." - elif col == 2: - if val.startswith("@") and not os.path.isfile(os.path.expanduser(val[1:])): - return "Invalid file path" - return False + def set_data(self, vals, flow): + flow.request.urlencoded_form = vals class SetHeadersEditor(base.GridEditor): @@ -146,7 +154,7 @@ class SetHeadersEditor(base.GridEditor): return True -class PathEditor(base.GridEditor): +class PathEditor(base.FocusEditor): # TODO: Next row on enter? title = "Editing URL path components" @@ -160,6 +168,12 @@ class PathEditor(base.GridEditor): def data_out(self, data): return [i[0] for i in data] + def get_data(self, flow): + return self.data_in(flow.request.path_components) + + def set_data(self, vals, flow): + flow.request.path_components = self.data_out(vals) + class ScriptEditor(base.GridEditor): title = "Editing scripts" @@ -193,13 +207,19 @@ class HostPatternEditor(base.GridEditor): return [i[0] for i in data] -class CookieEditor(base.GridEditor): +class CookieEditor(base.FocusEditor): title = "Editing request Cookie header" columns = [ col_text.Column("Name"), col_text.Column("Value"), ] + def get_data(self, flow): + return flow.request.cookies.items(multi=True) + + def set_data(self, vals, flow): + flow.request.cookies = vals + class CookieAttributeEditor(base.GridEditor): title = "Editing Set-Cookie attributes" @@ -221,7 +241,7 @@ class CookieAttributeEditor(base.GridEditor): return ret -class SetCookieEditor(base.GridEditor): +class SetCookieEditor(base.FocusEditor): title = "Editing response SetCookie header" columns = [ col_text.Column("Name"), @@ -246,6 +266,12 @@ class SetCookieEditor(base.GridEditor): ) return vals + def get_data(self, flow): + return self.data_in(flow.response.cookies.items(multi=True)) + + def set_data(self, vals, flow): + flow.response.cookies = self.data_out(vals) + class OptionsEditor(base.GridEditor): title = None # type: str diff --git a/mitmproxy/tools/console/help.py b/mitmproxy/tools/console/help.py index 282f374d..ec0c95d9 100644 --- a/mitmproxy/tools/console/help.py +++ b/mitmproxy/tools/console/help.py @@ -4,7 +4,6 @@ import urwid from mitmproxy import flowfilter from mitmproxy.tools.console import common -from mitmproxy.tools.console import signals from mitmproxy import version @@ -15,6 +14,7 @@ footer = [ class HelpView(urwid.ListBox): + keyctx = "help" def __init__(self, help_context): self.help_context = help_context or [] @@ -84,14 +84,8 @@ class HelpView(urwid.ListBox): return text def keypress(self, size, key): - key = common.shortcuts(key) - if key == "q": - signals.pop_view_state.send(self) - return None - elif key == "?": - key = None - elif key == "g": + if key == "m_start": self.set_focus(0) - elif key == "G": + elif key == "m_end": self.set_focus(len(self.body.contents)) return urwid.ListBox.keypress(self, size, key) diff --git a/mitmproxy/tools/console/keymap.py b/mitmproxy/tools/console/keymap.py index e3d28cf4..62e2dcfb 100644 --- a/mitmproxy/tools/console/keymap.py +++ b/mitmproxy/tools/console/keymap.py @@ -1,8 +1,10 @@ import typing +import collections from mitmproxy.tools.console import commandeditor -contexts = { +SupportedContexts = { + "chooser", "commands", "flowlist", "flowview", @@ -13,20 +15,34 @@ contexts = { } +Binding = collections.namedtuple("Binding", ["key", "command", "contexts"]) + + class Keymap: def __init__(self, master): self.executor = commandeditor.CommandExecutor(master) self.keys = {} + self.bindings = [] - def add(self, key: str, command: str, context: str = "global") -> None: + def add(self, key: str, command: str, contexts: typing.Sequence[str]) -> None: """ Add a key to the key map. If context is empty, it's considered to be a global binding. """ - if context not in contexts: - raise ValueError("Unsupported context: %s" % context) - d = self.keys.setdefault(context, {}) - d[key] = command + if not contexts: + raise ValueError("Must specify at least one context.") + for c in contexts: + if c not in SupportedContexts: + raise ValueError("Unsupported context: %s" % c) + + b = Binding(key=key, command=command, contexts=contexts) + self.bindings.append(b) + self.bind(b) + + def bind(self, binding): + for c in binding.contexts: + d = self.keys.setdefault(c, {}) + d[binding.key] = binding.command def get(self, context: str, key: str) -> typing.Optional[str]: if context in self.keys: diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py index 5b6d9bcb..b88a0354 100644 --- a/mitmproxy/tools/console/master.py +++ b/mitmproxy/tools/console/master.py @@ -16,24 +16,19 @@ import urwid from mitmproxy import ctx from mitmproxy import addons from mitmproxy import command +from mitmproxy import exceptions from mitmproxy import master from mitmproxy import log from mitmproxy import flow from mitmproxy.addons import intercept from mitmproxy.addons import readfile from mitmproxy.addons import view -from mitmproxy.tools.console import flowlist -from mitmproxy.tools.console import flowview -from mitmproxy.tools.console import grideditor -from mitmproxy.tools.console import help from mitmproxy.tools.console import keymap -from mitmproxy.tools.console import options -from mitmproxy.tools.console import commands from mitmproxy.tools.console import overlay from mitmproxy.tools.console import palettes from mitmproxy.tools.console import signals -from mitmproxy.tools.console import statusbar from mitmproxy.tools.console import window +from mitmproxy import contentviews from mitmproxy.utils import strutils EVENTLOG_SIZE = 10000 @@ -80,14 +75,102 @@ class UnsupportedLog: class ConsoleAddon: """ - An addon that exposes console-specific commands. + An addon that exposes console-specific commands, and hooks into required + events. """ def __init__(self, master): self.master = master self.started = False + @command.command("console.options.reset.current") + def options_reset_current(self) -> None: + """ + Reset the current option in the options editor. + """ + if self.master.window.focus.keyctx != "options": + raise exceptions.CommandError("Not viewing options.") + name = self.master.window.windows["options"].current_name() + self.master.commands.call("options.reset.one %s" % name) + + @command.command("console.nav.start") + def nav_start(self) -> None: + """ + Go to the start of a list or scrollable. + """ + self.master.inject_key("m_start") + + @command.command("console.nav.end") + def nav_end(self) -> None: + """ + Go to the end of a list or scrollable. + """ + self.master.inject_key("m_end") + + @command.command("console.nav.up") + def nav_up(self) -> None: + """ + Go up. + """ + self.master.inject_key("up") + + @command.command("console.nav.down") + def nav_down(self) -> None: + """ + Go down. + """ + self.master.inject_key("down") + + @command.command("console.nav.pageup") + def nav_pageup(self) -> None: + """ + Go up. + """ + self.master.inject_key("page up") + + @command.command("console.nav.pagedown") + def nav_pagedown(self) -> None: + """ + Go down. + """ + self.master.inject_key("page down") + + @command.command("console.nav.left") + def nav_left(self) -> None: + """ + Go left. + """ + self.master.inject_key("left") + + @command.command("console.nav.right") + def nav_right(self) -> None: + """ + Go right. + """ + self.master.inject_key("right") + @command.command("console.choose") def console_choose( + self, prompt: str, choices: typing.Sequence[str], *cmd: typing.Sequence[str] + ) -> None: + """ + Prompt the user to choose from a specified list of strings, then + invoke another command with all occurances of {choice} replaced by + the choice the user made. + """ + def callback(opt): + # We're now outside of the call context... + repl = " ".join(cmd) + repl = repl.replace("{choice}", opt) + try: + self.master.commands.call(repl) + except exceptions.CommandError as e: + signals.status_message.send(message=str(e)) + + self.master.overlay(overlay.Chooser(self.master, prompt, choices, "", callback)) + ctx.log.info(choices) + + @command.command("console.choose.cmd") + def console_choose_cmd( self, prompt: str, choicecmd: str, *cmd: typing.Sequence[str] ) -> None: """ @@ -98,11 +181,15 @@ class ConsoleAddon: choices = ctx.master.commands.call_args(choicecmd, []) def callback(opt): + # We're now outside of the call context... repl = " ".join(cmd) repl = repl.replace("{choice}", opt) - self.master.commands.call(repl) + try: + self.master.commands.call(repl) + except exceptions.CommandError as e: + signals.status_message.send(message=str(e)) - self.master.overlay(overlay.Chooser(choicecmd, choices, "", callback)) + self.master.overlay(overlay.Chooser(self.master, prompt, choices, "", callback)) ctx.log.info(choices) @command.command("console.command") @@ -115,24 +202,24 @@ class ConsoleAddon: @command.command("console.view.commands") def view_commands(self) -> None: """View the commands list.""" - self.master.view_commands() + self.master.switch_view("commands") @command.command("console.view.options") def view_options(self) -> None: """View the options editor.""" - self.master.view_options() + self.master.switch_view("options") @command.command("console.view.help") def view_help(self) -> None: """View help.""" - self.master.view_help() + self.master.switch_view("help") @command.command("console.view.flow") def view_flow(self, flow: flow.Flow) -> None: """View a flow.""" if hasattr(flow, "request"): # FIME: Also set focus? - self.master.view_flow(flow) + self.master.switch_view("flowview") @command.command("console.exit") def exit(self) -> None: @@ -147,71 +234,219 @@ class ConsoleAddon: """ signals.pop_view_state.send(self) + @command.command("console.bodyview") + def bodyview(self, f: flow.Flow, part: str) -> None: + """ + Spawn an external viewer for a flow request or response body based + on the detected MIME type. We use the mailcap system to find the + correct viewier, and fall back to the programs in $PAGER or $EDITOR + if necessary. + """ + fpart = getattr(f, part) + if not fpart: + raise exceptions.CommandError("Could not view part %s." % part) + t = fpart.headers.get("content-type") + content = fpart.get_content(strict=False) + if not content: + raise exceptions.CommandError("No content to view.") + self.master.spawn_external_viewer(content, t) + + @command.command("console.edit.focus.options") + def edit_focus_options(self) -> typing.Sequence[str]: + return [ + "cookies", + "form", + "path", + "method", + "query", + "reason", + "request-headers", + "response-headers", + "status_code", + "set-cookies", + "url", + ] + + @command.command("console.edit.focus") + def edit_focus(self, part: str) -> None: + """ + Edit the query of the current focus. + """ + if part == "cookies": + self.master.switch_view("edit_focus_cookies") + elif part == "form": + self.master.switch_view("edit_focus_form") + elif part == "path": + self.master.switch_view("edit_focus_path") + elif part == "query": + self.master.switch_view("edit_focus_query") + elif part == "request-headers": + self.master.switch_view("edit_focus_request_headers") + elif part == "response-headers": + self.master.switch_view("edit_focus_response_headers") + elif part == "set-cookies": + self.master.switch_view("edit_focus_setcookies") + elif part in ["url", "method", "status_code", "reason"]: + self.master.commands.call( + "console.command flow.set @focus %s " % part + ) + + @command.command("console.flowview.mode.set") + def flowview_mode_set(self) -> None: + """ + Set the display mode for the current flow view. + """ + if self.master.window.focus.keyctx != "flowview": + raise exceptions.CommandError("Not viewing a flow.") + fv = self.master.window.windows["flowview"] + idx = fv.body.tab_offset + + def callback(opt): + try: + self.master.commands.call_args( + "view.setval", + ["@focus", "flowview_mode_%s" % idx, opt] + ) + except exceptions.CommandError as e: + signals.status_message.send(message=str(e)) + + opts = [i.name.lower() for i in contentviews.views] + self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback)) + + @command.command("console.flowview.mode") + def flowview_mode(self) -> str: + """ + Get the display mode for the current flow view. + """ + if self.master.window.focus.keyctx != "flowview": + raise exceptions.CommandError("Not viewing a flow.") + fv = self.master.window.windows["flowview"] + idx = fv.body.tab_offset + return self.master.commands.call_args( + "view.getval", + [ + "@focus", + "flowview_mode_%s" % idx, + self.master.options.default_contentview, + ] + ) + def running(self): self.started = True def update(self, flows): if not flows: signals.update_settings.send(self) + for f in flows: + signals.flow_change.send(self, flow=f) def configure(self, updated): if self.started: if "console_eventlog" in updated: - self.master.refresh_view() + pass def default_keymap(km): - km.add(":", "console.command ''") - km.add("?", "console.view.help") - km.add("C", "console.view.commands") - km.add("O", "console.view.options") - km.add("Q", "console.exit") - km.add("q", "console.view.pop") - km.add("i", "console.command set intercept=") - km.add("W", "console.command set save_stream_file=") - - km.add("A", "flow.resume @all", context="flowlist") - km.add("a", "flow.resume @focus", context="flowlist") - km.add("b", "console.command cut.save s.content|@focus ''", context="flowlist") - km.add("d", "view.remove @focus", context="flowlist") - km.add("D", "view.duplicate @focus", context="flowlist") - km.add("e", "set console_eventlog=toggle", context="flowlist") + km.add(":", "console.command ''", ["global"]) + km.add("?", "console.view.help", ["global"]) + km.add("C", "console.view.commands", ["global"]) + km.add("O", "console.view.options", ["global"]) + km.add("Q", "console.exit", ["global"]) + km.add("q", "console.view.pop", ["global"]) + + km.add("g", "console.nav.start", ["global"]) + km.add("G", "console.nav.end", ["global"]) + km.add("k", "console.nav.up", ["global"]) + km.add("j", "console.nav.down", ["global"]) + km.add("l", "console.nav.right", ["global"]) + km.add("h", "console.nav.left", ["global"]) + km.add(" ", "console.nav.pagedown", ["global"]) + km.add("ctrl f", "console.nav.pagedown", ["global"]) + km.add("ctrl b", "console.nav.pageup", ["global"]) + + km.add("i", "console.command set intercept=", ["global"]) + km.add("W", "console.command set save_stream_file=", ["global"]) + + km.add("A", "flow.resume @all", ["flowlist", "flowview"]) + km.add("a", "flow.resume @focus", ["flowlist", "flowview"]) + km.add( + "b", "console.command cut.save s.content|@focus ''", + ["flowlist", "flowview"] + ) + km.add("d", "view.remove @focus", ["flowlist", "flowview"]) + km.add("D", "view.duplicate @focus", ["flowlist", "flowview"]) + km.add("e", "set console_eventlog=toggle", ["flowlist"]) km.add( "E", - "console.choose Format export.formats " + "console.choose.cmd Format export.formats " "console.command export.file {choice} @focus ''", - context="flowlist" + ["flowlist", "flowview"] ) - km.add("f", "console.command 'set view_filter='", context="flowlist") - km.add("F", "set console_focus_follow=toggle", context="flowlist") - km.add("g", "view.go 0", context="flowlist") - km.add("G", "view.go -1", context="flowlist") - km.add("l", "console.command cut.clip ", context="flowlist") - km.add("L", "console.command view.load ", context="flowlist") - km.add("m", "flow.mark.toggle @focus", context="flowlist") - km.add("M", "view.marked.toggle", context="flowlist") + km.add("f", "console.command set view_filter=", ["flowlist"]) + km.add("F", "set console_focus_follow=toggle", ["flowlist"]) + km.add("ctrl l", "console.command cut.clip ", ["flowlist", "flowview"]) + km.add("L", "console.command view.load ", ["flowlist"]) + km.add("m", "flow.mark.toggle @focus", ["flowlist"]) + km.add("M", "view.marked.toggle", ["flowlist"]) km.add( "n", "console.command view.create get https://google.com", - context="flowlist" + ["flowlist"] ) km.add( "o", - "console.choose Order view.order.options " + "console.choose.cmd Order view.order.options " "set console_order={choice}", - context="flowlist" + ["flowlist"] ) - km.add("r", "replay.client @focus", context="flowlist") - km.add("S", "console.command 'replay.server '") - km.add("v", "set console_order_reversed=toggle", context="flowlist") - km.add("U", "flow.mark @all false", context="flowlist") - km.add("w", "console.command 'save.file @shown '", context="flowlist") - km.add("V", "flow.revert @focus", context="flowlist") - km.add("X", "flow.kill @focus", context="flowlist") - km.add("z", "view.remove @all", context="flowlist") - km.add("Z", "view.remove @hidden", context="flowlist") - km.add("|", "console.command 'script.run @focus '", context="flowlist") - km.add("enter", "console.view.flow @focus", context="flowlist") + km.add("r", "replay.client @focus", ["flowlist", "flowview"]) + km.add("S", "console.command replay.server ", ["flowlist"]) + km.add("v", "set console_order_reversed=toggle", ["flowlist"]) + km.add("U", "flow.mark @all false", ["flowlist"]) + km.add("w", "console.command save.file @shown ", ["flowlist"]) + km.add("V", "flow.revert @focus", ["flowlist", "flowview"]) + km.add("X", "flow.kill @focus", ["flowlist"]) + km.add("z", "view.remove @all", ["flowlist"]) + km.add("Z", "view.remove @hidden", ["flowlist"]) + km.add("|", "console.command script.run @focus ", ["flowlist", "flowview"]) + km.add("enter", "console.view.flow @focus", ["flowlist"]) + + km.add( + "e", + "console.choose.cmd Part console.edit.focus.options " + "console.edit.focus {choice}", + ["flowview"] + ) + km.add("f", "view.setval.toggle @focus fullcontents", ["flowview"]) + km.add("w", "console.command save.file @focus ", ["flowview"]) + km.add(" ", "view.focus.next", ["flowview"]) + km.add( + "o", + "console.choose.cmd Order view.order.options " + "set console_order={choice}", + ["flowlist"] + ) + + km.add( + "v", + "console.choose \"View Part\" request,response " + "console.bodyview @focus {choice}", + ["flowview"] + ) + km.add("p", "view.focus.prev", ["flowview"]) + km.add("m", "console.flowview.mode.set", ["flowview"]) + km.add("tab", "console.nav.right", ["flowview"]) + km.add( + "z", + "console.choose \"Part\" request,response " + "flow.encode.toggle @focus {choice}", + ["flowview"] + ) + + km.add("L", "console.command options.load ", ["options"]) + km.add("S", "console.command options.save ", ["options"]) + km.add("D", "options.reset", ["options"]) + km.add("d", "console.options.reset.current", ["options"]) class ConsoleMaster(master.Master): @@ -219,7 +454,6 @@ class ConsoleMaster(master.Master): def __init__(self, options, server): super().__init__(options, server) self.view = view.View() # type: view.View - self.view.sig_view_update.connect(signals.flow_change.send) self.stream_path = None # This line is just for type hinting self.options = self.options # type: Options @@ -232,9 +466,6 @@ class ConsoleMaster(master.Master): self.view_stack = [] signals.call_in.connect(self.sig_call_in) - signals.pop_view_state.connect(self.sig_pop_view_state) - signals.replace_view_state.connect(self.sig_replace_view_state) - signals.push_view_state.connect(self.sig_push_view_state) signals.sig_add_log.connect(self.sig_add_log) self.addons.add(Logger()) self.addons.add(*addons.default_addons()) @@ -251,6 +482,8 @@ class ConsoleMaster(master.Master): signal.signal(signal.SIGINT, sigint_handler) + self.window = None + def __setattr__(self, name, value): self.__dict__[name] = value signals.update_settings.send(self) @@ -294,37 +527,6 @@ class ConsoleMaster(master.Master): return callback(*args) self.loop.set_alarm_in(seconds, cb) - def sig_replace_view_state(self, sender): - """ - A view has been pushed onto the stack, and is intended to replace - the current view rather than creating a new stack entry. - """ - if len(self.view_stack) > 1: - del self.view_stack[1] - - def sig_pop_view_state(self, sender): - """ - Pop the top view off the view stack. If no more views will be left - after this, prompt for exit. - """ - if len(self.view_stack) > 1: - self.view_stack.pop() - self.loop.widget = self.view_stack[-1] - else: - self.prompt_for_exit() - - def sig_push_view_state(self, sender, window): - """ - Push a new view onto the view stack. - """ - self.view_stack.append(window) - self.loop.widget = window - self.loop.draw_screen() - - def refresh_view(self): - self.view_flowlist() - signals.replace_view_state.send(self) - def spawn_editor(self, data): text = not isinstance(data, bytes) fd, name = tempfile.mkstemp('', "mproxy", text=text) @@ -400,6 +602,9 @@ class ConsoleMaster(master.Master): self.loop.draw_screen() self.loop.set_alarm_in(0.01, self.ticker) + def inject_key(self, key): + self.loop.process_input([key]) + def run(self): self.ui = urwid.raw_display.Screen() self.ui.set_terminal_properties(256) @@ -413,12 +618,14 @@ class ConsoleMaster(master.Master): screen = self.ui, handle_mouse = self.options.console_mouse, ) - self.ab = statusbar.ActionBar(self) + + self.window = window.Window(self) + self.loop.widget = self.window self.loop.set_alarm_in(0.01, self.ticker) self.loop.set_alarm_in( 0.0001, - lambda *args: self.view_flowlist() + lambda *args: self.switch_view("flowlist") ) self.start() @@ -439,111 +646,16 @@ class ConsoleMaster(master.Master): def shutdown(self): raise urwid.ExitMainLoop - def overlay(self, widget, **kwargs): - signals.push_view_state.send( - self, - window = overlay.SimpleOverlay( - self, - widget, - self.loop.widget, - widget.width, - **kwargs - ) - ) - - def view_help(self): - hc = self.view_stack[-1].helpctx - signals.push_view_state.send( - self, - window = window.Window( - self, - help.HelpView(hc), - None, - statusbar.StatusBar(self, help.footer), - None, - "help" - ) - ) - - def view_options(self): - for i in self.view_stack: - if isinstance(i["body"], options.Options): - return - signals.push_view_state.send( - self, - window = window.Window( - self, - options.Options(self), - None, - statusbar.StatusBar(self, options.footer), - options.help_context, - "options" - ) - ) - - def view_commands(self): - for i in self.view_stack: - if isinstance(i["body"], commands.Commands): - return - signals.push_view_state.send( - self, - window = window.Window( - self, - commands.Commands(self), - None, - statusbar.StatusBar(self, commands.footer), - commands.help_context, - "commands" - ) - ) + def sig_exit_overlay(self, *args, **kwargs): + self.loop.widget = self.window - def view_grideditor(self, ge): - signals.push_view_state.send( - self, - window = window.Window( - self, - ge, - None, - statusbar.StatusBar(self, grideditor.base.FOOTER), - ge.make_help(), - "grideditor" - ) - ) - - def view_flowlist(self): - if self.ui.started: - self.ui.clear() - - if self.options.console_eventlog: - body = flowlist.BodyPile(self) - else: - body = flowlist.FlowListBox(self) - - signals.push_view_state.send( - self, - window = window.Window( - self, - body, - None, - statusbar.StatusBar(self, flowlist.footer), - flowlist.help_context, - "flowlist" - ) + def overlay(self, widget, **kwargs): + self.loop.widget = overlay.SimpleOverlay( + self, widget, self.loop.widget, widget.width, **kwargs ) - def view_flow(self, flow, tab_offset=0): - self.view.focus.flow = flow - signals.push_view_state.send( - self, - window = window.Window( - self, - flowview.FlowView(self, self.view, flow, tab_offset), - flowview.FlowViewHeader(self, flow), - statusbar.StatusBar(self, flowview.footer), - flowview.help_context, - "flowview" - ) - ) + def switch_view(self, name): + self.window.push(name) def quit(self, a): if a != "n": diff --git a/mitmproxy/tools/console/options.py b/mitmproxy/tools/console/options.py index 64203f2b..fee61fe5 100644 --- a/mitmproxy/tools/console/options.py +++ b/mitmproxy/tools/console/options.py @@ -187,12 +187,6 @@ class OptionsList(urwid.ListBox): except exceptions.OptionsError as e: signals.status_message.send(message=str(e)) - def load_config(self, path): - try: - optmanager.load_paths(self.master.options, path) - except exceptions.OptionsError as e: - signals.status_message.send(message=str(e)) - def keypress(self, size, key): if self.walker.editing: if key == "enter": @@ -207,29 +201,12 @@ class OptionsList(urwid.ListBox): elif key == "esc": self.walker.stop_editing() else: - if key == "d": - foc, idx = self.get_focus() - setattr( - self.master.options, - foc.opt.name, - self.master.options.default(foc.opt.name) - ) - elif key == "g": + if key == "m_start": self.set_focus(0) self.walker._modified() - elif key == "G": + elif key == "m_end": self.set_focus(len(self.walker.opts) - 1) self.walker._modified() - elif key == "l": - signals.status_prompt_path.send( - prompt = "Load config from", - callback = self.load_config - ) - elif key == "w": - signals.status_prompt_path.send( - prompt = "Save config to", - callback = self.save_config - ) elif key == "enter": foc, idx = self.get_focus() if foc.opt.typespec == bool: @@ -242,6 +219,7 @@ class OptionsList(urwid.ListBox): elif foc.opt.choices: self.master.overlay( overlay.Chooser( + self.master, foc.opt.name, foc.opt.choices, foc.opt.current(), @@ -286,27 +264,30 @@ class OptionHelp(urwid.Frame): class Options(urwid.Pile): + keyctx = "options" + def __init__(self, master): oh = OptionHelp(master) + self.optionslist = OptionsList(master) super().__init__( [ - OptionsList(master), + self.optionslist, (HELP_HEIGHT, oh), ] ) self.master = master + def current_name(self): + foc, idx = self.optionslist.get_focus() + return foc.opt.name + def keypress(self, size, key): - key = common.shortcuts(key) if key == "tab": self.focus_position = ( self.focus_position + 1 ) % len(self.widget_list) self.widget_list[1].set_active(self.focus_position == 1) key = None - elif key == "D": - self.master.options.reset() - key = None # This is essentially a copypasta from urwid.Pile's keypress handler. # So much for "closed for modification, but open for extension". diff --git a/mitmproxy/tools/console/overlay.py b/mitmproxy/tools/console/overlay.py index e874da69..2fa6aa46 100644 --- a/mitmproxy/tools/console/overlay.py +++ b/mitmproxy/tools/console/overlay.py @@ -80,11 +80,12 @@ class ChooserListWalker(urwid.ListWalker): class Chooser(urwid.WidgetWrap): - def __init__(self, title, choices, current, callback): + def __init__(self, master, title, choices, current, callback): + self.master = master self.choices = choices self.callback = callback choicewidth = max([len(i) for i in choices]) - self.width = max(choicewidth, len(title) + 5) + self.width = max(choicewidth, len(title)) + 5 self.walker = ChooserListWalker(choices, current) super().__init__( urwid.AttrWrap( @@ -103,7 +104,7 @@ class Chooser(urwid.WidgetWrap): return True def keypress(self, size, key): - key = common.shortcuts(key) + key = self.master.keymap.handle("chooser", key) if key == "enter": self.callback(self.choices[self.walker.index]) signals.pop_view_state.send(self) diff --git a/mitmproxy/tools/console/searchable.py b/mitmproxy/tools/console/searchable.py index 55c5218a..f2bb5612 100644 --- a/mitmproxy/tools/console/searchable.py +++ b/mitmproxy/tools/console/searchable.py @@ -16,10 +16,9 @@ class Highlight(urwid.AttrMap): class Searchable(urwid.ListBox): - def __init__(self, view, contents): + def __init__(self, contents): self.walker = urwid.SimpleFocusListWalker(contents) urwid.ListBox.__init__(self, self.walker) - self.view = view self.search_offset = 0 self.current_highlight = None self.search_term = None @@ -36,10 +35,10 @@ class Searchable(urwid.ListBox): self.find_next(False) elif key == "N": self.find_next(True) - elif key == "g": + elif key == "m_start": self.set_focus(0) self.walker._modified() - elif key == "G": + elif key == "m_end": self.set_focus(len(self.walker) - 1) self.walker._modified() else: diff --git a/mitmproxy/tools/console/select.py b/mitmproxy/tools/console/select.py index a990dff8..f7e5d950 100644 --- a/mitmproxy/tools/console/select.py +++ b/mitmproxy/tools/console/select.py @@ -113,7 +113,6 @@ class Select(urwid.ListBox): if key == "enter" or key == " ": self.get_focus()[0].option.activate() return None - key = common.shortcuts(key) if key in self.keymap: self.keymap[key].activate() self.set_focus(self.options.index(self.keymap[key])) diff --git a/mitmproxy/tools/console/signals.py b/mitmproxy/tools/console/signals.py index 91cb63b3..885cdbfb 100644 --- a/mitmproxy/tools/console/signals.py +++ b/mitmproxy/tools/console/signals.py @@ -48,4 +48,6 @@ flowlist_change = blinker.Signal() # Pop and push view state onto a stack pop_view_state = blinker.Signal() push_view_state = blinker.Signal() -replace_view_state = blinker.Signal() + +# Exits overlay if there is one +exit_overlay = blinker.Signal() diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 8ded0cda..a5db0f4a 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -143,6 +143,7 @@ class ActionBar(urwid.WidgetWrap): class StatusBar(urwid.WidgetWrap): + keyctx = "" def __init__( self, master: "mitmproxy.tools.console.master.ConsoleMaster", helptext @@ -150,7 +151,8 @@ class StatusBar(urwid.WidgetWrap): self.master = master self.helptext = helptext self.ib = urwid.WidgetWrap(urwid.Text("")) - super().__init__(urwid.Pile([self.ib, self.master.ab])) + self.ab = ActionBar(self) + super().__init__(urwid.Pile([self.ib, self.ab])) signals.update_settings.connect(self.sig_update) signals.flowlist_change.connect(self.sig_update) signals.footer_help.connect(self.sig_footer_help) @@ -166,7 +168,7 @@ class StatusBar(urwid.WidgetWrap): self.redraw() def keypress(self, *args, **kwargs): - return self.master.ab.keypress(*args, **kwargs) + return self.ab.keypress(*args, **kwargs) def get_status(self): r = [] diff --git a/mitmproxy/tools/console/tabs.py b/mitmproxy/tools/console/tabs.py index a2d5e719..93d6909e 100644 --- a/mitmproxy/tools/console/tabs.py +++ b/mitmproxy/tools/console/tabs.py @@ -27,6 +27,7 @@ class Tabs(urwid.WidgetWrap): self.tab_offset = tab_offset self.tabs = tabs self.show() + self._w = urwid.Pile([]) def change_tab(self, offset): self.tab_offset = offset @@ -34,13 +35,16 @@ class Tabs(urwid.WidgetWrap): def keypress(self, size, key): n = len(self.tabs) - if key in ["tab", "l"]: + if key == "right": self.change_tab((self.tab_offset + 1) % n) - elif key == "h": + elif key == "left": self.change_tab((self.tab_offset - 1) % n) return self._w.keypress(size, key) def show(self): + if not self.tabs: + return + headers = [] for i in range(len(self.tabs)): txt = self.tabs[i][0]() diff --git a/mitmproxy/tools/console/window.py b/mitmproxy/tools/console/window.py index ad972a66..d7038da0 100644 --- a/mitmproxy/tools/console/window.py +++ b/mitmproxy/tools/console/window.py @@ -1,22 +1,107 @@ import urwid - from mitmproxy.tools.console import signals +from mitmproxy.tools.console import statusbar +from mitmproxy.tools.console import flowlist +from mitmproxy.tools.console import flowview +from mitmproxy.tools.console import commands +from mitmproxy.tools.console import options +from mitmproxy.tools.console import overlay +from mitmproxy.tools.console import help +from mitmproxy.tools.console import grideditor class Window(urwid.Frame): - - def __init__(self, master, body, header, footer, helpctx, keyctx): - urwid.Frame.__init__( - self, - urwid.AttrWrap(body, "background"), - header = urwid.AttrWrap(header, "background") if header else None, - footer = urwid.AttrWrap(footer, "background") if footer else None + def __init__(self, master): + self.statusbar = statusbar.StatusBar(master, "") + super().__init__( + None, + header = None, + footer = urwid.AttrWrap(self.statusbar, "background") ) self.master = master - self.helpctx = helpctx - self.keyctx = keyctx + self.primary_stack = [] + self.master.view.sig_view_refresh.connect(self.view_changed) + self.master.view.sig_view_add.connect(self.view_changed) + self.master.view.sig_view_remove.connect(self.view_changed) + self.master.view.sig_view_update.connect(self.view_changed) + self.master.view.focus.sig_change.connect(self.view_changed) signals.focus.connect(self.sig_focus) + self.master.view.focus.sig_change.connect(self.focus_changed) + signals.flow_change.connect(self.flow_changed) + + signals.pop_view_state.connect(self.pop) + signals.push_view_state.connect(self.push) + self.windows = dict( + flowlist = flowlist.FlowListBox(self.master), + flowview = flowview.FlowView(self.master), + commands = commands.Commands(self.master), + options = options.Options(self.master), + help = help.HelpView(None), + edit_focus_query = grideditor.QueryEditor(self.master), + edit_focus_cookies = grideditor.CookieEditor(self.master), + edit_focus_setcookies = grideditor.SetCookieEditor(self.master), + edit_focus_form = grideditor.RequestFormEditor(self.master), + edit_focus_path = grideditor.PathEditor(self.master), + edit_focus_request_headers = grideditor.RequestHeaderEditor(self.master), + edit_focus_response_headers = grideditor.ResponseHeaderEditor(self.master), + ) + + def call(self, v, name, *args, **kwargs): + f = getattr(v, name, None) + if f: + f(*args, **kwargs) + + def flow_changed(self, sender, flow): + if self.master.view.focus.flow: + if flow.id == self.master.view.focus.flow.id: + self.focus_changed() + + def focus_changed(self, *args, **kwargs): + """ + Triggered when the focus changes - either when it's modified, or + when it changes to a different flow altogether. + """ + self.call(self.focus, "focus_changed") + + def view_changed(self, *args, **kwargs): + """ + Triggered when the view list has changed. + """ + self.call(self.focus, "view_changed") + + def view_popping(self, *args, **kwargs): + """ + Triggered when the view list has changed. + """ + self.call(self.focus, "view_popping") + + def push(self, wname): + if self.primary_stack and self.primary_stack[-1] == wname: + return + self.primary_stack.append(wname) + self.body = urwid.AttrWrap( + self.windows[wname], "background" + ) + self.view_changed() + self.focus_changed() + + def pop(self, *args, **kwargs): + if isinstance(self.master.loop.widget, overlay.SimpleOverlay): + self.master.loop.widget = self + else: + if len(self.primary_stack) > 1: + self.view_popping() + self.primary_stack.pop() + self.body = urwid.AttrWrap( + self.windows[self.primary_stack[-1]], + "background", + ) + self.view_changed() + self.focus_changed() + else: + self.master.prompt_for_exit() + def sig_focus(self, sender, section): self.focus_position = section @@ -37,50 +122,8 @@ class Window(urwid.Frame): return False return True - def handle_replay(self, k): - if k == "c": - creplay = self.master.addons.get("clientplayback") - if self.master.options.client_replay and creplay.count(): - def stop_client_playback_prompt(a): - if a != "n": - self.master.options.client_replay = None - signals.status_prompt_onekey.send( - self, - prompt = "Stop current client replay?", - keys = ( - ("yes", "y"), - ("no", "n"), - ), - callback = stop_client_playback_prompt - ) - else: - signals.status_prompt_path.send( - self, - prompt = "Client replay path", - callback = lambda x: self.master.options.setter("client_replay")([x]) - ) - elif k == "s": - a = self.master.addons.get("serverplayback") - if a.count(): - def stop_server_playback(response): - if response == "y": - self.master.options.server_replay = [] - signals.status_prompt_onekey.send( - self, - prompt = "Stop current server replay?", - keys = ( - ("yes", "y"), - ("no", "n"), - ), - callback = stop_server_playback - ) - else: - signals.status_prompt_path.send( - self, - prompt = "Server playback path", - callback = lambda x: self.master.options.setter("server_replay")([x]) - ) - def keypress(self, size, k): - k = super().keypress(size, k) - return self.master.keymap.handle(self.keyctx, k) + if self.focus.keyctx: + k = self.master.keymap.handle(self.focus.keyctx, k) + if k: + return super().keypress(size, k) diff --git a/mitmproxy/tools/main.py b/mitmproxy/tools/main.py index 9748f3cf..d8fac077 100644 --- a/mitmproxy/tools/main.py +++ b/mitmproxy/tools/main.py @@ -99,7 +99,7 @@ def run(MasterKlass, args, extra=None): # pragma: no cover except exceptions.OptionsError as e: print("%s: %s" % (sys.argv[0], e), file=sys.stderr) sys.exit(1) - except (KeyboardInterrupt, RuntimeError): + except (KeyboardInterrupt, RuntimeError) as e: pass return master diff --git a/pathod/language/actions.py b/pathod/language/actions.py index e85affac..fc57a18b 100644 --- a/pathod/language/actions.py +++ b/pathod/language/actions.py @@ -2,9 +2,7 @@ import abc import copy import random from functools import total_ordering - import pyparsing as pp - from . import base @@ -52,7 +50,7 @@ class _Action(base.Token): class PauseAt(_Action): - unique_name = None + unique_name = None # type: ignore def __init__(self, offset, seconds): _Action.__init__(self, offset) @@ -103,7 +101,7 @@ class DisconnectAt(_Action): class InjectAt(_Action): - unique_name = None + unique_name = None # type: ignore def __init__(self, offset, value): _Action.__init__(self, offset) diff --git a/pathod/language/base.py b/pathod/language/base.py index 3a810ef0..c8892748 100644 --- a/pathod/language/base.py +++ b/pathod/language/base.py @@ -3,10 +3,9 @@ import os import abc import functools import pyparsing as pp - from mitmproxy.utils import strutils from mitmproxy.utils import human - +import typing # noqa from . import generators, exceptions @@ -84,7 +83,7 @@ class Token: return None @property - def unique_name(self): + def unique_name(self) -> typing.Optional[str]: """ Controls uniqueness constraints for tokens. No two tokens with the same name will be allowed. If no uniquness should be applied, this @@ -334,7 +333,7 @@ class OptionsOrValue(_Component): Can be any of a specified set of options, or a value specifier. """ preamble = "" - options = [] + options = [] # type: typing.List[str] def __init__(self, value): # If it's a string, we were passed one of the options, so we lower-case @@ -376,7 +375,7 @@ class OptionsOrValue(_Component): class Integer(_Component): - bounds = (None, None) + bounds = (None, None) # type: typing.Tuple[typing.Union[int, None], typing.Union[int , None]] preamble = "" def __init__(self, value): @@ -442,7 +441,7 @@ class FixedLengthValue(Value): A value component lead by an optional preamble. """ preamble = "" - length = None + length = None # type: typing.Optional[int] def __init__(self, value): Value.__init__(self, value) @@ -511,7 +510,7 @@ class IntField(_Component): """ An integer field, where values can optionally specified by name. """ - names = {} + names = {} # type: typing.Dict[str, int] max = 16 preamble = "" @@ -546,7 +545,7 @@ class NestedMessage(Token): A nested message, as an escaped string with a preamble. """ preamble = "" - nest_type = None + nest_type = None # type: ignore def __init__(self, value): Token.__init__(self) diff --git a/pathod/language/http.py b/pathod/language/http.py index 8fcf9edc..5cd717a9 100644 --- a/pathod/language/http.py +++ b/pathod/language/http.py @@ -54,7 +54,7 @@ class Method(base.OptionsOrValue): class _HeaderMixin: - unique_name = None + unique_name = None # type: ignore def format_header(self, key, value): return [key, b": ", value, b"\r\n"] @@ -143,7 +143,7 @@ class _HTTPMessage(message.Message): class Response(_HTTPMessage): - unique_name = None + unique_name = None # type: ignore comps = ( Header, ShortcutContentType, diff --git a/pathod/language/http2.py b/pathod/language/http2.py index 08c5f6d7..47d6e370 100644 --- a/pathod/language/http2.py +++ b/pathod/language/http2.py @@ -1,9 +1,9 @@ import pyparsing as pp - from mitmproxy.net import http from mitmproxy.net.http import user_agents, Headers from . import base, message + """ Normal HTTP requests: <method>:<path>:<header>:<body> @@ -41,7 +41,7 @@ def get_header(val, headers): class _HeaderMixin: - unique_name = None + unique_name = None # type: ignore def values(self, settings): return ( @@ -146,7 +146,7 @@ class Times(base.Integer): class Response(_HTTP2Message): - unique_name = None + unique_name = None # type: ignore comps = ( Header, Body, diff --git a/pathod/language/message.py b/pathod/language/message.py index 6cdaaa0b..6b4c5021 100644 --- a/pathod/language/message.py +++ b/pathod/language/message.py @@ -1,13 +1,14 @@ import abc from . import actions, exceptions from mitmproxy.utils import strutils +import typing # noqa LOG_TRUNCATE = 1024 class Message: __metaclass__ = abc.ABCMeta - logattrs = [] + logattrs = [] # type: typing.List[str] def __init__(self, tokens): track = set([]) diff --git a/pathod/language/websockets.py b/pathod/language/websockets.py index a237381c..b4faf59b 100644 --- a/pathod/language/websockets.py +++ b/pathod/language/websockets.py @@ -4,6 +4,7 @@ import mitmproxy.net.websockets from mitmproxy.utils import strutils import pyparsing as pp from . import base, generators, actions, message +import typing # noqa NESTED_LEADER = b"pathod!" @@ -20,7 +21,7 @@ class OpCode(base.IntField): "close": mitmproxy.net.websockets.OPCODE.CLOSE, "ping": mitmproxy.net.websockets.OPCODE.PING, "pong": mitmproxy.net.websockets.OPCODE.PONG, - } + } # type: typing.Dict[str, int] max = 15 preamble = "c" @@ -239,7 +240,14 @@ class NestedFrame(base.NestedMessage): nest_type = WebsocketFrame +COMP = typing.Tuple[ + typing.Type[OpCode], typing.Type[Length], typing.Type[Fin], typing.Type[RSV1], typing.Type[RSV2], typing.Type[RSV3], typing.Type[Mask], + typing.Type[actions.PauseAt], typing.Type[actions.DisconnectAt], typing.Type[actions.InjectAt], typing.Type[KeyNone], typing.Type[Key], + typing.Type[Times], typing.Type[Body], typing.Type[RawBody] +] + + class WebsocketClientFrame(WebsocketFrame): - components = COMPONENTS + ( + components = typing.cast(COMP, COMPONENTS + ( NestedFrame, - ) + )) diff --git a/pathod/pathod.py b/pathod/pathod.py index 7416d325..7c773c3b 100644 --- a/pathod/pathod.py +++ b/pathod/pathod.py @@ -3,19 +3,17 @@ import logging import os import sys import threading - from mitmproxy.net import tcp from mitmproxy import certs as mcerts from mitmproxy.net import websockets from mitmproxy import version - import urllib from mitmproxy import exceptions - from pathod import language from pathod import utils from pathod import log from pathod import protocols +import typing # noqa DEFAULT_CERT_DOMAIN = b"pathod.net" @@ -71,7 +69,7 @@ class SSLOptions: class PathodHandler(tcp.BaseHandler): wbufsize = 0 - sni = None + sni = None # type: typing.Union[str, None, bool] def __init__( self, diff --git a/pathod/test.py b/pathod/test.py index 81f5805f..52f3ba02 100644 --- a/pathod/test.py +++ b/pathod/test.py @@ -1,16 +1,16 @@ import io import time import queue - from . import pathod from mitmproxy.types import basethread +import typing # noqa class Daemon: IFACE = "127.0.0.1" - def __init__(self, ssl=None, **daemonargs): - self.q = queue.Queue() + def __init__(self, ssl=None, **daemonargs) -> None: + self.q = queue.Queue() # type: queue.Queue self.logfp = io.StringIO() daemonargs["logfp"] = self.logfp self.thread = _PaThread(self.IFACE, self.q, ssl, daemonargs) @@ -25,18 +25,18 @@ class Daemon: def __enter__(self): return self - def __exit__(self, type, value, traceback): + def __exit__(self, type, value, traceback) -> bool: self.logfp.truncate(0) self.shutdown() return False - def p(self, spec): + def p(self, spec: str) -> str: """ Return a URL that will render the response in spec. """ return "%s/p/%s" % (self.urlbase, spec) - def text_log(self): + def text_log(self) -> str: return self.logfp.getvalue() def wait_for_silence(self, timeout=5): @@ -62,7 +62,7 @@ class Daemon: return None return l[-1] - def log(self): + def log(self) -> typing.List[typing.Dict]: """ Return the log buffer as a list of dictionaries. """ diff --git a/pathod/utils.py b/pathod/utils.py index 44ad1f87..11b1dccd 100644 --- a/pathod/utils.py +++ b/pathod/utils.py @@ -1,6 +1,7 @@ import os import sys from mitmproxy.utils import data as mdata +import typing # noqa class MemBool: @@ -9,10 +10,10 @@ class MemBool: Truth-checking with a memory, for use in chained if statements. """ - def __init__(self): - self.v = None + def __init__(self) -> None: + self.v = None # type: typing.Optional[bool] - def __call__(self, v): + def __call__(self, v: bool) -> bool: self.v = v return bool(v) diff --git a/test/mitmproxy/addons/test_core.py b/test/mitmproxy/addons/test_core.py index 64d0fa19..c132d80a 100644 --- a/test/mitmproxy/addons/test_core.py +++ b/test/mitmproxy/addons/test_core.py @@ -61,3 +61,105 @@ def test_revert(): assert f.modified() sa.revert([f]) assert not f.modified() + + +def test_flow_set(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow(resp=True) + assert sa.flow_set_options() + + with pytest.raises(exceptions.CommandError): + sa.flow_set([f], "flibble", "post") + + assert f.request.method != "post" + sa.flow_set([f], "method", "post") + assert f.request.method == "POST" + + assert f.request.host != "testhost" + sa.flow_set([f], "host", "testhost") + assert f.request.host == "testhost" + + assert f.request.path != "/test/path" + sa.flow_set([f], "path", "/test/path") + assert f.request.path == "/test/path" + + assert f.request.url != "http://foo.com/bar" + sa.flow_set([f], "url", "http://foo.com/bar") + assert f.request.url == "http://foo.com/bar" + with pytest.raises(exceptions.CommandError): + sa.flow_set([f], "url", "oink") + + assert f.response.status_code != 404 + sa.flow_set([f], "status_code", "404") + assert f.response.status_code == 404 + assert f.response.reason == "Not Found" + with pytest.raises(exceptions.CommandError): + sa.flow_set([f], "status_code", "oink") + + assert f.response.reason != "foo" + sa.flow_set([f], "reason", "foo") + assert f.response.reason == "foo" + + +def test_encoding(): + sa = core.Core() + with taddons.context(): + f = tflow.tflow() + assert sa.encode_options() + sa.encode([f], "request", "deflate") + assert f.request.headers["content-encoding"] == "deflate" + + sa.encode([f], "request", "br") + assert f.request.headers["content-encoding"] == "deflate" + + sa.decode([f], "request") + assert "content-encoding" not in f.request.headers + + sa.encode([f], "request", "br") + assert f.request.headers["content-encoding"] == "br" + + sa.encode_toggle([f], "request") + assert "content-encoding" not in f.request.headers + sa.encode_toggle([f], "request") + assert f.request.headers["content-encoding"] == "deflate" + sa.encode_toggle([f], "request") + assert "content-encoding" not in f.request.headers + + with pytest.raises(exceptions.CommandError): + sa.encode([f], "request", "invalid") + + +def test_options(tmpdir): + p = str(tmpdir.join("path")) + sa = core.Core() + with taddons.context() as tctx: + tctx.options.stickycookie = "foo" + assert tctx.options.stickycookie == "foo" + sa.options_reset() + assert tctx.options.stickycookie is None + + tctx.options.stickycookie = "foo" + tctx.options.stickyauth = "bar" + sa.options_reset_one("stickycookie") + assert tctx.options.stickycookie is None + assert tctx.options.stickyauth == "bar" + + with pytest.raises(exceptions.CommandError): + sa.options_reset_one("unknown") + + sa.options_save(p) + with pytest.raises(exceptions.CommandError): + sa.options_save("/") + + sa.options_reset() + assert tctx.options.stickyauth is None + sa.options_load(p) + assert tctx.options.stickyauth == "bar" + + sa.options_load("/nonexistent") + + with open(p, 'a') as f: + f.write("'''") + with pytest.raises(exceptions.CommandError): + sa.options_load(p) diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index a3df1fcf..dd5349cb 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -152,7 +152,7 @@ class TestScriptLoader: sc = script.ScriptLoader() with taddons.context(): with pytest.raises(exceptions.CommandError): - sc.script_run([tflow.tflow(resp=True)], "/nonexistent") + sc.script_run([tflow.tflow(resp=True)], "/") def test_simple(self): sc = script.ScriptLoader() diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py index 1724da49..6da13650 100644 --- a/test/mitmproxy/addons/test_view.py +++ b/test/mitmproxy/addons/test_view.py @@ -218,9 +218,10 @@ def test_resolve(): tctx.command(v.resolve, "~") -def test_go(): +def test_movement(): v = view.View() with taddons.context(): + v.go(0) v.add([ tflow.tflow(), tflow.tflow(), @@ -240,6 +241,11 @@ def test_go(): v.go(-999) assert v.focus.index == 0 + v.focus_next() + assert v.focus.index == 1 + v.focus_prev() + assert v.focus.index == 0 + def test_duplicate(): v = view.View() @@ -255,6 +261,21 @@ def test_duplicate(): assert v.focus.index == 2 +def test_setgetval(): + v = view.View() + with taddons.context(): + f = tflow.tflow() + v.add([f]) + v.setvalue([f], "key", "value") + assert v.getvalue(f, "key", "default") == "value" + assert v.getvalue(f, "unknow", "default") == "default" + + v.setvalue_toggle([f], "key") + assert v.getvalue(f, "key", "default") == "true" + v.setvalue_toggle([f], "key") + assert v.getvalue(f, "key", "default") == "false" + + def test_order(): v = view.View() with taddons.context() as tctx: diff --git a/test/mitmproxy/tools/console/test_help.py b/test/mitmproxy/tools/console/test_help.py index ac3011e6..0ebc2d6a 100644 --- a/test/mitmproxy/tools/console/test_help.py +++ b/test/mitmproxy/tools/console/test_help.py @@ -9,9 +9,3 @@ class TestHelp: def test_helptext(self): h = help.HelpView(None) assert h.helptext() - - def test_keypress(self): - h = help.HelpView([1, 2, 3]) - assert not h.keypress((0, 0), "q") - assert not h.keypress((0, 0), "?") - assert h.keypress((0, 0), "o") == "o" diff --git a/test/mitmproxy/tools/console/test_keymap.py b/test/mitmproxy/tools/console/test_keymap.py new file mode 100644 index 00000000..6a75800e --- /dev/null +++ b/test/mitmproxy/tools/console/test_keymap.py @@ -0,0 +1,29 @@ +from mitmproxy.tools.console import keymap +from mitmproxy.test import taddons +from unittest import mock +import pytest + + +def test_bind(): + with taddons.context() as tctx: + km = keymap.Keymap(tctx.master) + km.executor = mock.Mock() + + with pytest.raises(ValueError): + km.add("foo", "bar", ["unsupported"]) + + km.add("key", "str", ["options", "commands"]) + assert km.get("options", "key") + assert km.get("commands", "key") + assert not km.get("flowlist", "key") + + km.handle("unknown", "unknown") + assert not km.executor.called + + km.handle("options", "key") + assert km.executor.called + + km.add("glob", "str", ["global"]) + km.executor = mock.Mock() + km.handle("options", "glob") + assert km.executor.called @@ -28,6 +28,8 @@ commands = python3 test/filename_matching.py rstcheck README.rst mypy --ignore-missing-imports ./mitmproxy + mypy --ignore-missing-imports ./pathod + mypy --ignore-missing-imports --follow-imports=skip ./examples/simple/ [testenv:individual_coverage] deps = |