diff options
Diffstat (limited to 'libmproxy/console')
-rw-r--r-- | libmproxy/console/__init__.py | 854 | ||||
-rw-r--r-- | libmproxy/console/common.py | 194 | ||||
-rw-r--r-- | libmproxy/console/contentview.py | 62 | ||||
-rw-r--r-- | libmproxy/console/flowdetailview.py | 233 | ||||
-rw-r--r-- | libmproxy/console/flowlist.py | 184 | ||||
-rw-r--r-- | libmproxy/console/flowview.py | 982 | ||||
-rw-r--r-- | libmproxy/console/grideditor.py | 354 | ||||
-rw-r--r-- | libmproxy/console/help.py | 125 | ||||
-rw-r--r-- | libmproxy/console/options.py | 269 | ||||
-rw-r--r-- | libmproxy/console/palettepicker.py | 81 | ||||
-rw-r--r-- | libmproxy/console/palettes.py | 119 | ||||
-rw-r--r-- | libmproxy/console/pathedit.py | 69 | ||||
-rw-r--r-- | libmproxy/console/searchable.py | 91 | ||||
-rw-r--r-- | libmproxy/console/select.py | 115 | ||||
-rw-r--r-- | libmproxy/console/signals.py | 41 | ||||
-rw-r--r-- | libmproxy/console/statusbar.py | 254 | ||||
-rw-r--r-- | libmproxy/console/tabs.py | 69 | ||||
-rw-r--r-- | libmproxy/console/window.py | 89 |
18 files changed, 2491 insertions, 1694 deletions
diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py index 198b7bbe..3d20947b 100644 --- a/libmproxy/console/__init__.py +++ b/libmproxy/console/__init__.py @@ -1,281 +1,38 @@ from __future__ import absolute_import -import glob import mailcap import mimetypes import tempfile import os import os.path import shlex +import signal import stat import subprocess import sys -import time import traceback import urwid import weakref -from .. import controller, utils, flow, script -from . import flowlist, flowview, help, common -from . import grideditor, palettes, contentview, flowdetailview +from .. import controller, flow, script +from . import flowlist, flowview, help, window, signals, options +from . import grideditor, palettes, contentview, statusbar, palettepicker EVENTLOG_SIZE = 500 -class _PathCompleter: - def __init__(self, _testing=False): - """ - _testing: disables reloading of the lookup table to make testing possible. - """ - self.lookup, self.offset = None, None - self.final = None - self._testing = _testing - - def reset(self): - self.lookup = None - self.offset = -1 - - def complete(self, txt): - """ - Returns the next completion for txt, or None if there is no completion. - """ - path = os.path.expanduser(txt) - if not self.lookup: - if not self._testing: - # Lookup is a set of (display value, actual value) tuples. - self.lookup = [] - if os.path.isdir(path): - files = glob.glob(os.path.join(path, "*")) - prefix = txt - else: - files = glob.glob(path+"*") - prefix = os.path.dirname(txt) - prefix = prefix or "./" - for f in files: - display = os.path.join(prefix, os.path.basename(f)) - if os.path.isdir(f): - display += "/" - self.lookup.append((display, f)) - if not self.lookup: - self.final = path - return path - self.lookup.sort() - self.offset = -1 - self.lookup.append((txt, txt)) - self.offset += 1 - if self.offset >= len(self.lookup): - self.offset = 0 - ret = self.lookup[self.offset] - self.final = ret[1] - return ret[0] - - -class PathEdit(urwid.Edit, _PathCompleter): - def __init__(self, *args, **kwargs): - urwid.Edit.__init__(self, *args, **kwargs) - _PathCompleter.__init__(self) - - def keypress(self, size, key): - if key == "tab": - comp = self.complete(self.get_edit_text()) - self.set_edit_text(comp) - self.set_edit_pos(len(comp)) - else: - self.reset() - return urwid.Edit.keypress(self, size, key) - - -class ActionBar(urwid.WidgetWrap): - def __init__(self): - self.message("") - - def selectable(self): - return True - - def path_prompt(self, prompt, text): - self.expire = None - self._w = PathEdit(prompt, text) - - def prompt(self, prompt, text = ""): - self.expire = None - # A (partial) workaround for this Urwid issue: - # https://github.com/Nic0/tyrs/issues/115 - # We can remove it once veryone is beyond 1.0.1 - if isinstance(prompt, basestring): - prompt = unicode(prompt) - self._w = urwid.Edit(prompt, text or "") - - def message(self, message, expire=None): - self.expire = expire - self._w = urwid.Text(message) - - -class StatusBar(urwid.WidgetWrap): - def __init__(self, master, helptext): - self.master, self.helptext = master, helptext - self.ab = ActionBar() - self.ib = urwid.WidgetWrap(urwid.Text("")) - self._w = urwid.Pile([self.ib, self.ab]) - - def get_status(self): - r = [] - - if self.master.setheaders.count(): - r.append("[") - r.append(("heading_key", "H")) - r.append("eaders]") - if self.master.replacehooks.count(): - r.append("[") - r.append(("heading_key", "R")) - r.append("eplacing]") - if self.master.client_playback: - r.append("[") - r.append(("heading_key", "cplayback")) - r.append(":%s to go]"%self.master.client_playback.count()) - if self.master.server_playback: - r.append("[") - r.append(("heading_key", "splayback")) - if self.master.nopop: - r.append(":%s in file]"%self.master.server_playback.count()) - else: - r.append(":%s to go]"%self.master.server_playback.count()) - if self.master.get_ignore_filter(): - r.append("[") - r.append(("heading_key", "I")) - r.append("gnore:%d]" % len(self.master.get_ignore_filter())) - if self.master.get_tcp_filter(): - r.append("[") - r.append(("heading_key", "T")) - r.append("CP:%d]" % len(self.master.get_tcp_filter())) - if self.master.state.intercept_txt: - r.append("[") - r.append(("heading_key", "i")) - r.append(":%s]"%self.master.state.intercept_txt) - if self.master.state.limit_txt: - r.append("[") - r.append(("heading_key", "l")) - r.append(":%s]"%self.master.state.limit_txt) - if self.master.stickycookie_txt: - r.append("[") - r.append(("heading_key", "t")) - r.append(":%s]"%self.master.stickycookie_txt) - if self.master.stickyauth_txt: - r.append("[") - r.append(("heading_key", "u")) - r.append(":%s]"%self.master.stickyauth_txt) - if self.master.state.default_body_view.name != "Auto": - r.append("[") - r.append(("heading_key", "M")) - r.append(":%s]"%self.master.state.default_body_view.name) - - opts = [] - if self.master.anticache: - opts.append("anticache") - if self.master.anticomp: - opts.append("anticomp") - if self.master.showhost: - opts.append("showhost") - if not self.master.refresh_server_playback: - opts.append("norefresh") - if self.master.killextra: - opts.append("killextra") - if self.master.server.config.no_upstream_cert: - opts.append("no-upstream-cert") - if self.master.state.follow_focus: - opts.append("following") - if self.master.stream_large_bodies: - opts.append("stream:%s" % utils.pretty_size(self.master.stream_large_bodies.max_size)) - - if opts: - r.append("[%s]"%(":".join(opts))) - - if self.master.server.config.mode in ["reverse", "upstream"]: - dst = self.master.server.config.mode.dst - scheme = "https" if dst[0] else "http" - if dst[1] != dst[0]: - scheme += "2https" if dst[1] else "http" - r.append("[dest:%s]"%utils.unparse_url(scheme, *dst[2:])) - if self.master.scripts: - r.append("[") - r.append(("heading_key", "s")) - r.append("cripts:%s]"%len(self.master.scripts)) - # r.append("[lt:%0.3f]"%self.master.looptime) - - if self.master.stream: - r.append("[W:%s]"%self.master.stream_path) - - return r - - def redraw(self): - if self.ab.expire and time.time() > self.ab.expire: - self.message("") - - fc = self.master.state.flow_count() - if self.master.state.focus is None: - offset = 0 - else: - offset = min(self.master.state.focus + 1, fc) - t = [ - ('heading', ("[%s/%s]"%(offset, fc)).ljust(9)) - ] - - if self.master.server.bound: - host = self.master.server.address.host - if host == "0.0.0.0": - host = "*" - boundaddr = "[%s:%s]"%(host, self.master.server.address.port) - else: - boundaddr = "" - t.extend(self.get_status()) - status = urwid.AttrWrap(urwid.Columns([ - urwid.Text(t), - urwid.Text( - [ - self.helptext, - boundaddr - ], - align="right" - ), - ]), "heading") - self.ib._w = status - - def update(self, text): - self.helptext = text - self.redraw() - self.master.loop.draw_screen() - - def selectable(self): - return True - - def get_edit_text(self): - return self.ab._w.get_edit_text() - - def path_prompt(self, prompt, text): - return self.ab.path_prompt(prompt, text) - - def prompt(self, prompt, text = ""): - self.ab.prompt(prompt, text) - - def message(self, msg, expire=None): - if expire: - expire = time.time() + float(expire)/1000 - self.ab.message(msg, expire) - self.master.loop.draw_screen() - - class ConsoleState(flow.State): def __init__(self): flow.State.__init__(self) self.focus = None self.follow_focus = None self.default_body_view = contentview.get("Auto") - - self.view_mode = common.VIEW_LIST - self.view_flow_mode = common.VIEW_FLOW_REQUEST - - self.last_script = "" - self.last_saveload = "" self.flowsettings = weakref.WeakKeyDictionary() + self.last_search = None + + def __setattr__(self, name, value): + self.__dict__[name] = value + signals.update_settings.send(self) def add_flow_setting(self, flow, key, value): d = self.flowsettings.setdefault(flow, {}) @@ -291,6 +48,7 @@ class ConsoleState(flow.State): self.set_focus(0) elif self.follow_focus: self.set_focus(len(self.view) - 1) + self.set_flow_marked(f, False) return f def update_flow(self, f): @@ -316,6 +74,8 @@ class ConsoleState(flow.State): elif idx < 0: idx = 0 self.focus = idx + else: + self.focus = None def set_focus_flow(self, f): self.set_focus(self.view.index(f)) @@ -326,10 +86,10 @@ class ConsoleState(flow.State): return self.view[pos], pos def get_next(self, pos): - return self.get_from_pos(pos+1) + return self.get_from_pos(pos + 1) def get_prev(self, pos): - return self.get_from_pos(pos-1) + return self.get_from_pos(pos - 1) def delete_flow(self, f): if f in self.view and self.view.index(f) <= self.focus: @@ -341,9 +101,29 @@ class ConsoleState(flow.State): return ret def clear(self): - self.focus = None + marked_flows = [] + for f in self.flows: + if self.flow_marked(f): + marked_flows.append(f) + super(ConsoleState, self).clear() - + + for f in marked_flows: + self.add_flow(f) + self.set_flow_marked(f, True) + + if len(self.flows.views) == 0: + self.focus = None + else: + self.focus = 0 + self.set_focus(self.focus) + + def flow_marked(self, flow): + return self.get_flow_setting(flow, "marked", False) + + def set_flow_marked(self, flow, marked): + self.add_flow_setting(flow, "marked", marked) + class Options(object): attributes = [ @@ -357,6 +137,7 @@ class Options(object): "keepserving", "kill", "intercept", + "limit", "no_server", "refresh_server_playback", "rfile", @@ -373,6 +154,7 @@ class Options(object): "wfile", "nopop", "palette", + "palette_transparent" ] def __init__(self, **kwargs): @@ -388,7 +170,6 @@ class ConsoleMaster(flow.FlowMaster): def __init__(self, server, options): flow.FlowMaster.__init__(self, server, ConsoleState()) - self.looptime = 0 self.stream_path = None self.options = options @@ -398,14 +179,14 @@ class ConsoleMaster(flow.FlowMaster): for i in options.setheaders: self.setheaders.add(*i) - self.flow_list_walker = None - self.set_palette(options.palette) - r = self.set_intercept(options.intercept) if r: print >> sys.stderr, "Intercept error:", r sys.exit(1) + if options.limit: + self.set_limit(options.limit) + r = self.set_stickycookie(options.stickycookie) if r: print >> sys.stderr, "Sticky cookies error:", r @@ -425,12 +206,12 @@ class ConsoleMaster(flow.FlowMaster): self.rheaders = options.rheaders self.nopop = options.nopop self.showhost = options.showhost + self.palette = options.palette + self.palette_transparent = options.palette_transparent self.eventlog = options.eventlog self.eventlist = urwid.SimpleListWalker([]) - self.statusbar = None - if options.client_replay: self.client_playback_path(options.client_replay) @@ -453,15 +234,67 @@ class ConsoleMaster(flow.FlowMaster): print >> sys.stderr, "Stream file error:", err sys.exit(1) + self.view_stack = [] + if options.app: self.start_app(self.options.app_host, self.options.app_port) + signals.call_in.connect(self.sig_call_in) + signals.pop_view_state.connect(self.sig_pop_view_state) + signals.push_view_state.connect(self.sig_push_view_state) + signals.sig_add_event.connect(self.sig_add_event) + + def __setattr__(self, name, value): + self.__dict__[name] = value + signals.update_settings.send(self) + + def sig_add_event(self, sender, e, level): + needed = dict(error=0, info=1, debug=2).get(level, 1) + if self.options.verbosity < needed: + return + + if level == "error": + e = urwid.Text(("error", str(e))) + else: + e = urwid.Text(str(e)) + self.eventlist.append(e) + if len(self.eventlist) > EVENTLOG_SIZE: + self.eventlist.pop(0) + self.eventlist.set_focus(len(self.eventlist) - 1) + + def add_event(self, e, level): + signals.add_event(e, level) + + def sig_call_in(self, sender, seconds, callback, args=()): + def cb(*_): + return callback(*args) + self.loop.set_alarm_in(seconds, cb) + + def sig_pop_view_state(self, sender): + if len(self.view_stack) > 1: + self.view_stack.pop() + self.loop.widget = self.view_stack[-1] + else: + signals.status_prompt_onekey.send( + self, + prompt = "Quit", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = self.quit, + ) + + def sig_push_view_state(self, sender, window): + self.view_stack.append(window) + self.loop.widget = window + self.loop.draw_screen() def start_stream_to_path(self, path, mode="wb"): path = os.path.expanduser(path) try: f = file(path, mode) self.start_stream(f, None) - except IOError, v: + except IOError as v: return str(v) self.stream_path = path @@ -469,20 +302,24 @@ class ConsoleMaster(flow.FlowMaster): status, val = s.run(method, f) if val: if status: - self.add_event("Method %s return: %s"%(method, val), "debug") + signals.add_event("Method %s return: %s" % (method, val), "debug") else: - self.add_event("Method %s error: %s"%(method, val[1]), "error") + signals.add_event( + "Method %s error: %s" % + (method, val[1]), "error") def run_script_once(self, command, f): if not command: return - self.add_event("Running script on flow: %s"%command, "debug") + signals.add_event("Running script on flow: %s" % command, "debug") try: s = script.Script(command, self) - except script.ScriptError, v: - self.statusbar.message("Error loading script.") - self.add_event("Error loading script:\n%s"%v.args[0], "error") + except script.ScriptError as v: + signals.status_message.send( + message = "Error loading script." + ) + signals.add_event("Error loading script:\n%s" % v.args[0], "error") return if f.request: @@ -492,22 +329,21 @@ class ConsoleMaster(flow.FlowMaster): if f.error: self._run_script_method("error", s, f) s.unload() - self.refresh_flow(f) - self.state.last_script = command + signals.flow_change.send(self, flow = f) def set_script(self, command): if not command: return ret = self.load_script(command) if ret: - self.statusbar.message(ret) - self.state.last_script = command + signals.status_message.send(message=ret) def toggle_eventlog(self): self.eventlog = not self.eventlog + signals.pop_view_state.send(self) self.view_flowlist() - def _readflow(self, paths): + def _readflows(self, path): """ Utitility function that reads a list of flows or prints an error to the UI if that fails. @@ -516,22 +352,21 @@ class ConsoleMaster(flow.FlowMaster): - a list of flows, otherwise. """ try: - return flow.read_flows_from_paths(paths) + return flow.read_flows_from_paths(path) except flow.FlowReadError as e: - if not self.statusbar: - print >> sys.stderr, e.strerror - sys.exit(1) - else: - self.statusbar.message(e.strerror) - return None + signals.status_message.send(message=e.strerror) def client_playback_path(self, path): - flows = self._readflow(path) + if not isinstance(path, list): + path = [path] + flows = self._readflows(path) if flows: self.start_client_playback(flows, False) def server_playback_path(self, path): - flows = self._readflow(path) + if not isinstance(path, list): + path = [path] + flows = self._readflows(path) if flows: self.start_server_playback( flows, @@ -557,7 +392,9 @@ class ConsoleMaster(flow.FlowMaster): try: subprocess.call(cmd) except: - self.statusbar.message("Can't start editor: %s" % " ".join(c)) + signals.status_message.send( + message = "Can't start editor: %s" % " ".join(c) + ) else: data = open(name, "rb").read() self.ui.start() @@ -596,191 +433,35 @@ class ConsoleMaster(flow.FlowMaster): try: subprocess.call(cmd, shell=shell) except: - self.statusbar.message( - "Can't start external viewer: %s" % " ".join(c) + signals.status_message.send( + message="Can't start external viewer: %s" % " ".join(c) ) self.ui.start() os.unlink(name) def set_palette(self, name): - self.palette = palettes.palettes[name] - - def input_filter(self, keys, raw): - for k in keys: - if self.prompting: - if k == "esc": - self.prompt_cancel() - elif self.onekey: - if k == "enter": - self.prompt_cancel() - elif k in self.onekey: - self.prompt_execute(k) - elif k == "enter": - self.prompt_execute() - else: - self.view.keypress(self.loop.screen_size, k) - else: - k = self.view.keypress(self.loop.screen_size, k) - if k: - self.statusbar.message("") - if k == "?": - self.view_help() - elif k == "c": - if not self.client_playback: - self.path_prompt( - "Client replay: ", - self.state.last_saveload, - self.client_playback_path - ) - else: - self.prompt_onekey( - "Stop current client replay?", - ( - ("yes", "y"), - ("no", "n"), - ), - self.stop_client_playback_prompt, - ) - elif k == "H": - self.view_grideditor( - grideditor.SetHeadersEditor( - self, - self.setheaders.get_specs(), - self.setheaders.set - ) - ) - elif k == "I": - self.view_grideditor( - grideditor.HostPatternEditor( - self, - [[x] for x in self.get_ignore_filter()], - self.edit_ignore_filter - ) - ) - elif k == "T": - self.view_grideditor( - grideditor.HostPatternEditor( - self, - [[x] for x in self.get_tcp_filter()], - self.edit_tcp_filter - ) - ) - elif k == "i": - self.prompt( - "Intercept filter: ", - self.state.intercept_txt, - self.set_intercept - ) - elif k == "Q": - raise urwid.ExitMainLoop - elif k == "q": - self.prompt_onekey( - "Quit", - ( - ("yes", "y"), - ("no", "n"), - ), - self.quit, - ) - elif k == "M": - self.prompt_onekey( - "Global default display mode", - contentview.view_prompts, - self.change_default_display_mode - ) - elif k == "R": - self.view_grideditor( - grideditor.ReplaceEditor( - self, - self.replacehooks.get_specs(), - self.replacehooks.set - ) - ) - elif k == "s": - self.view_grideditor( - grideditor.ScriptEditor( - self, - [[i.command] for i in self.scripts], - self.edit_scripts - ) - ) - #if self.scripts: - # self.load_script(None) - #else: - # self.path_prompt( - # "Set script: ", - # self.state.last_script, - # self.set_script - # ) - elif k == "S": - if not self.server_playback: - self.path_prompt( - "Server replay path: ", - self.state.last_saveload, - self.server_playback_path - ) - else: - self.prompt_onekey( - "Stop current server replay?", - ( - ("yes", "y"), - ("no", "n"), - ), - self.stop_server_playback_prompt, - ) - elif k == "o": - self.prompt_onekey( - "Options", - ( - ("anticache", "a"), - ("anticomp", "c"), - ("showhost", "h"), - ("killextra", "k"), - ("norefresh", "n"), - ("no-upstream-certs", "u"), - ), - self._change_options - ) - elif k == "t": - self.prompt( - "Sticky cookie filter: ", - self.stickycookie_txt, - self.set_stickycookie - ) - elif k == "u": - self.prompt( - "Sticky auth filter: ", - self.stickyauth_txt, - self.set_stickyauth - ) - self.statusbar.redraw() + self.palette = name + self.ui.register_palette( + palettes.palettes[name].palette(self.palette_transparent) + ) + self.ui.clear() def ticker(self, *userdata): changed = self.tick(self.masterq, timeout=0) if changed: self.loop.draw_screen() - self.statusbar.redraw() + signals.update_settings.send() self.loop.set_alarm_in(0.01, self.ticker) def run(self): self.ui = urwid.raw_display.Screen() + self.ui.set_mouse_tracking() self.ui.set_terminal_properties(256) - self.ui.register_palette(self.palette.palette()) - self.flow_list_walker = flowlist.FlowListWalker(self, self.state) - self.view = None - self.statusbar = None - self.header = None - self.body = None - self.help_context = None - self.prompting = False - self.onekey = False + self.set_palette(self.palette) self.loop = urwid.MainLoop( - self.view, + urwid.SolidFill("x"), screen = self.ui, - input_filter = self.input_filter ) - self.view_flowlist() - self.statusbar.redraw() self.server.start_slave( controller.Slave, @@ -790,7 +471,7 @@ class ConsoleMaster(flow.FlowMaster): if self.options.rfile: ret = self.load_flows_path(self.options.rfile) if ret and self.state.flow_count(): - self.add_event( + signals.add_event( "File truncated or corrupted. " "Loaded as many flows as possible.", "error" @@ -801,6 +482,19 @@ class ConsoleMaster(flow.FlowMaster): sys.exit(1) self.loop.set_alarm_in(0.01, self.ticker) + + # It's not clear why we need to handle this explicitly - without this, + # mitmproxy hangs on keyboard interrupt. Remove if we ever figure it + # out. + def exit(s, f): + raise urwid.ExitMainLoop + signal.signal(signal.SIGINT, exit) + + self.loop.set_alarm_in( + 0.0001, + lambda *args: self.view_flowlist() + ) + try: self.loop.run() except Exception: @@ -814,43 +508,56 @@ class ConsoleMaster(flow.FlowMaster): sys.stderr.flush() self.shutdown() - def make_view(self): - self.view = urwid.Frame( - self.body, - header = self.header, - footer = self.statusbar + def view_help(self, helpctx): + signals.push_view_state.send( + self, + window = window.Window( + self, + help.HelpView(helpctx), + None, + statusbar.StatusBar(self, help.footer), + None + ) ) - self.view.set_focus("body") - return self.view - def view_help(self): - h = help.HelpView( + def view_options(self): + for i in self.view_stack: + if isinstance(i["body"], options.Options): + return + signals.push_view_state.send( self, - self.help_context, - (self.statusbar, self.body, self.header) + window = window.Window( + self, + options.Options(self), + None, + statusbar.StatusBar(self, options.footer), + options.help_context, + ) ) - self.statusbar = StatusBar(self, help.footer) - self.body = h - self.header = None - self.loop.widget = self.make_view() - def view_flowdetails(self, flow): - h = flowdetailview.FlowDetailsView( + def view_palette_picker(self): + signals.push_view_state.send( self, - flow, - (self.statusbar, self.body, self.header) + window = window.Window( + self, + palettepicker.PalettePicker(self), + None, + statusbar.StatusBar(self, palettepicker.footer), + palettepicker.help_context, + ) ) - self.statusbar = StatusBar(self, flowdetailview.footer) - self.body = h - self.header = None - self.loop.widget = self.make_view() def view_grideditor(self, ge): - self.body = ge - self.header = None - self.help_context = ge.make_help() - self.statusbar = StatusBar(self, grideditor.footer) - self.loop.widget = self.make_view() + signals.push_view_state.send( + self, + window = window.Window( + self, + ge, + None, + statusbar.StatusBar(self, grideditor.FOOTER), + ge.make_help() + ) + ) def view_flowlist(self): if self.ui.started: @@ -859,27 +566,35 @@ class ConsoleMaster(flow.FlowMaster): self.state.set_focus(self.state.flow_count()) if self.eventlog: - self.body = flowlist.BodyPile(self) + body = flowlist.BodyPile(self) else: - self.body = flowlist.FlowListBox(self) - self.statusbar = StatusBar(self, flowlist.footer) - self.header = None - self.state.view_mode = common.VIEW_LIST - - self.loop.widget = self.make_view() - self.help_context = flowlist.help_context - - def view_flow(self, flow): - self.body = flowview.FlowView(self, self.state, flow) - self.header = flowview.FlowViewHeader(self, flow) - self.statusbar = StatusBar(self, flowview.footer) + body = flowlist.FlowListBox(self) + + signals.push_view_state.send( + self, + window = window.Window( + self, + body, + None, + statusbar.StatusBar(self, flowlist.footer), + flowlist.help_context + ) + ) + + def view_flow(self, flow, tab_offset=0): self.state.set_focus_flow(flow) - self.state.view_mode = common.VIEW_FLOW - self.loop.widget = self.make_view() - self.help_context = flowview.help_context + signals.push_view_state.send( + self, + window = window.Window( + self, + flowview.FlowView(self, self.state, flow, tab_offset), + flowview.FlowViewHeader(self, flow), + statusbar.StatusBar(self, flowview.footer), + flowview.help_context + ) + ) def _write_flows(self, path, flows): - self.state.last_saveload = path if not path: return path = os.path.expanduser(path) @@ -889,87 +604,43 @@ class ConsoleMaster(flow.FlowMaster): for i in flows: fw.add(i) f.close() - except IOError, v: - self.statusbar.message(v.strerror) + except IOError as v: + signals.status_message.send(message=v.strerror) def save_one_flow(self, path, flow): return self._write_flows(path, [flow]) def save_flows(self, path): return self._write_flows(path, self.state.view) + + def save_marked_flows(self, path): + marked_flows = [] + for f in self.state.view: + if self.state.flow_marked(f): + marked_flows.append(f) + return self._write_flows(path, marked_flows) def load_flows_callback(self, path): if not path: return ret = self.load_flows_path(path) - return ret or "Flows loaded from %s"%path + return ret or "Flows loaded from %s" % path def load_flows_path(self, path): - self.state.last_saveload = path reterr = None try: flow.FlowMaster.load_flows_file(self, path) - except flow.FlowReadError, v: + except flow.FlowReadError as v: reterr = str(v) - if self.flow_list_walker: - self.sync_list_view() + signals.flowlist_change.send(self) return reterr - def path_prompt(self, prompt, text, callback, *args): - self.statusbar.path_prompt(prompt, text) - self.view.set_focus("footer") - self.prompting = (callback, args) - - def prompt(self, prompt, text, callback, *args): - self.statusbar.prompt(prompt, text) - self.view.set_focus("footer") - self.prompting = (callback, args) - - def prompt_edit(self, prompt, text, callback): - self.statusbar.prompt(prompt + ": ", text) - self.view.set_focus("footer") - self.prompting = (callback, []) - - def prompt_onekey(self, prompt, keys, callback, *args): - """ - Keys are a set of (word, key) tuples. The appropriate key in the - word is highlighted. - """ - prompt = [prompt, " ("] - mkup = [] - for i, e in enumerate(keys): - mkup.extend(common.highlight_key(e[0], e[1])) - if i < len(keys)-1: - mkup.append(",") - prompt.extend(mkup) - prompt.append(")? ") - self.onekey = "".join(i[1] for i in keys) - self.prompt(prompt, "", callback, *args) - - def prompt_done(self): - self.prompting = False - self.onekey = False - self.view.set_focus("body") - self.statusbar.message("") - - def prompt_execute(self, txt=None): - if not txt: - txt = self.statusbar.get_edit_text() - p, args = self.prompting - self.prompt_done() - msg = p(txt, *args) - if msg: - self.statusbar.message(msg, 1000) - - def prompt_cancel(self): - self.prompt_done() - def accept_all(self): self.state.accept_all(self) def set_limit(self, txt): v = self.state.set_limit(txt) - self.sync_list_view() + signals.flowlist_change.send(self) return v def set_intercept(self, txt): @@ -980,12 +651,6 @@ class ConsoleMaster(flow.FlowMaster): self.state.default_body_view = v self.refresh_focus() - def pop_view(self): - if self.state.view_mode == common.VIEW_FLOW: - self.view_flow(self.state.view[self.state.focus]) - else: - self.view_flowlist() - def edit_scripts(self, scripts): commands = [x[0] for x in scripts] # remove outer array if commands == [s.command for s in self.scripts]: @@ -994,14 +659,7 @@ class ConsoleMaster(flow.FlowMaster): self.unload_scripts() for command in commands: self.load_script(command) - - def edit_ignore_filter(self, ignore): - patterns = (x[0] for x in ignore) - self.set_ignore_filter(patterns) - - def edit_tcp_filter(self, tcp): - patterns = (x[0] for x in tcp) - self.set_tcp_filter(patterns) + signals.update_settings.send(self) def stop_client_playback_prompt(self, a): if a != "n": @@ -1015,33 +673,13 @@ class ConsoleMaster(flow.FlowMaster): if a != "n": raise urwid.ExitMainLoop - def _change_options(self, a): - if a == "a": - self.anticache = not self.anticache - if a == "c": - self.anticomp = not self.anticomp - if a == "h": - self.showhost = not self.showhost - self.sync_list_view() - self.refresh_focus() - elif a == "k": - self.killextra = not self.killextra - elif a == "n": - self.refresh_server_playback = not self.refresh_server_playback - elif a == "u": - self.server.config.no_upstream_cert =\ - not self.server.config.no_upstream_cert - def shutdown(self): self.state.killall(self) flow.FlowMaster.shutdown(self) - def sync_list_view(self): - self.flow_list_walker._modified() - def clear_flows(self): self.state.clear() - self.sync_list_view() + signals.flowlist_change.send(self) def toggle_follow_flows(self): # toggle flow follow @@ -1049,49 +687,31 @@ class ConsoleMaster(flow.FlowMaster): # jump to most recent flow if follow is now on if self.state.follow_focus: self.state.set_focus(self.state.flow_count()) - self.sync_list_view() + signals.flowlist_change.send(self) def delete_flow(self, f): self.state.delete_flow(f) - self.sync_list_view() + signals.flowlist_change.send(self) def refresh_focus(self): if self.state.view: - self.refresh_flow(self.state.view[self.state.focus]) - - def refresh_flow(self, c): - if hasattr(self.header, "refresh_flow"): - self.header.refresh_flow(c) - if hasattr(self.body, "refresh_flow"): - self.body.refresh_flow(c) - if hasattr(self.statusbar, "refresh_flow"): - self.statusbar.refresh_flow(c) + signals.flow_change.send( + self, + flow = self.state.view[self.state.focus] + ) def process_flow(self, f): - if self.state.intercept and f.match(self.state.intercept) and not f.request.is_replay: + if self.state.intercept and f.match( + self.state.intercept) and not f.request.is_replay: f.intercept(self) else: f.reply() - self.sync_list_view() - self.refresh_flow(f) + signals.flowlist_change.send(self) + signals.flow_change.send(self, flow = f) def clear_events(self): self.eventlist[:] = [] - def add_event(self, e, level="info"): - needed = dict(error=0, info=1, debug=2).get(level, 1) - if self.options.verbosity < needed: - return - - if level == "error": - e = urwid.Text(("error", str(e))) - else: - e = urwid.Text(str(e)) - self.eventlist.append(e) - if len(self.eventlist) > EVENTLOG_SIZE: - self.eventlist.pop(0) - self.eventlist.set_focus(len(self.eventlist)-1) - # Handlers def handle_error(self, f): f = flow.FlowMaster.handle_error(self, f) diff --git a/libmproxy/console/common.py b/libmproxy/console/common.py index 3a708c7c..90bccfe7 100644 --- a/libmproxy/console/common.py +++ b/libmproxy/console/common.py @@ -6,15 +6,14 @@ import os from .. import utils from ..protocol.http import CONTENT_MISSING, decoded +from . import signals +import netlib.utils try: import pyperclip except: pyperclip = False -VIEW_LIST = 0 -VIEW_FLOW = 1 - VIEW_FLOW_REQUEST = 0 VIEW_FLOW_RESPONSE = 1 @@ -31,14 +30,22 @@ METHOD_OPTIONS = [ ] -def highlight_key(s, k): +def is_keypress(k): + """ + Is this input event a keypress? + """ + if isinstance(k, basestring): + return True + + +def highlight_key(str, key, textattr="text", keyattr="key"): l = [] - parts = s.split(k, 1) + parts = str.split(key, 1) if parts[0]: - l.append(("text", parts[0])) - l.append(("key", k)) + l.append((textattr, parts[0])) + l.append((keyattr, key)) if parts[1]: - l.append(("text", parts[1])) + l.append((textattr, parts[1])) return l @@ -60,20 +67,26 @@ def format_keyvals(lst, key="key", val="text", indent=0): if kv is None: ret.append(urwid.Text("")) else: - cols = [] - # This cumbersome construction process is here for a reason: - # Urwid < 1.0 barfs if given a fixed size column of size zero. - if indent: - cols.append(("fixed", indent, urwid.Text(""))) - cols.extend([ - ( - "fixed", - maxk, - urwid.Text([(key, kv[0] or "")]) - ), - kv[1] if isinstance(kv[1], urwid.Widget) else urwid.Text([(val, kv[1])]) - ]) - ret.append(urwid.Columns(cols, dividechars = 2)) + if isinstance(kv[1], urwid.Widget): + v = kv[1] + elif kv[1] is None: + v = urwid.Text("") + else: + v = urwid.Text([(val, kv[1])]) + ret.append( + urwid.Columns( + [ + ("fixed", indent, urwid.Text("")), + ( + "fixed", + maxk, + urwid.Text([(key, kv[0] or "")]) + ), + v + ], + dividechars = 2 + ) + ) return ret @@ -102,9 +115,11 @@ def fcol(s, attr): if urwid.util.detected_encoding: SYMBOL_REPLAY = u"\u21ba" SYMBOL_RETURN = u"\u2190" + SYMBOL_MARK = u"\u25cf" else: SYMBOL_REPLAY = u"[r]" SYMBOL_RETURN = u"<-" + SYMBOL_MARK = "[m]" def raw_format_flow(f, focus, extended, padding): @@ -120,6 +135,10 @@ def raw_format_flow(f, focus, extended, padding): ) else: req.append(fcol(">>" if focus else " ", "focus")) + + if f["marked"]: + req.append(fcol(SYMBOL_MARK, "mark")) + if f["req_is_replay"]: req.append(fcol(SYMBOL_REPLAY, "replay")) req.append(fcol(f["req_method"], "method")) @@ -151,7 +170,7 @@ def raw_format_flow(f, focus, extended, padding): 4: "code_400", 5: "code_500", } - ccol = codes.get(f["resp_code"]/100, "code_other") + ccol = codes.get(f["resp_code"] / 100, "code_other") resp.append(fcol(SYMBOL_RETURN, ccol)) if f["resp_is_replay"]: resp.append(fcol(SYMBOL_REPLAY, "replay")) @@ -184,23 +203,39 @@ def raw_format_flow(f, focus, extended, padding): def save_data(path, data, master, state): if not path: return - state.last_saveload = path - path = os.path.expanduser(path) try: with file(path, "wb") as f: f.write(data) - except IOError, v: - master.statusbar.message(v.strerror) + except IOError as v: + signals.status_message.send(message=v.strerror) + + +def ask_save_overwite(path, data, master, state): + if not path: + return + path = os.path.expanduser(path) + if os.path.exists(path): + def save_overwite(k): + if k == "y": + save_data(path, data, master, state) + + signals.status_prompt_onekey.send( + prompt = "'" + path + "' already exists. Overwite?", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = save_overwite + ) + else: + save_data(path, data, master, state) def ask_save_path(prompt, data, master, state): - master.path_prompt( - prompt, - state.last_saveload, - save_data, - data, - master, - state + signals.status_prompt_path.send( + prompt = prompt, + callback = ask_save_overwite, + args = (data, master, state) ) @@ -210,6 +245,8 @@ def copy_flow_format_data(part, scope, flow): else: data = "" if scope in ("q", "a"): + if flow.request.content is None or flow.request.content == CONTENT_MISSING: + return None, "Request content is missing" with decoded(flow.request): if part == "h": data += flow.request.assemble() @@ -221,6 +258,8 @@ def copy_flow_format_data(part, scope, flow): # Add padding between request and response data += "\r\n" * 2 if scope in ("s", "a") and flow.response: + if flow.response.content is None or flow.response.content == CONTENT_MISSING: + return None, "Response content is missing" with decoded(flow.response): if part == "h": data += flow.response.assemble() @@ -228,40 +267,50 @@ def copy_flow_format_data(part, scope, flow): data += flow.response.content else: raise ValueError("Unknown part: {}".format(part)) - return data + return data, False def copy_flow(part, scope, flow, master, state): """ - part: _c_ontent, _a_ll, _u_rl + part: _c_ontent, _h_eaders+content, _u_rl scope: _a_ll, re_q_uest, re_s_ponse """ - data = copy_flow_format_data(part, scope, flow) + data, err = copy_flow_format_data(part, scope, flow) + + if err: + signals.status_message.send(message=err) + return if not data: if scope == "q": - master.statusbar.message("No request content to copy.") + signals.status_message.send(message="No request content to copy.") elif scope == "s": - master.statusbar.message("No response content to copy.") + signals.status_message.send(message="No response content to copy.") else: - master.statusbar.message("No contents to copy.") + signals.status_message.send(message="No contents to copy.") return + # pyperclip calls encode('utf-8') on data to be copied without checking. + # if data are already encoded that way UnicodeDecodeError is thrown. + toclip = "" + try: + toclip = data.decode('utf-8') + except (UnicodeDecodeError): + toclip = data + try: - master.add_event(str(len(data))) - pyperclip.copy(data) - except RuntimeError: + pyperclip.copy(toclip) + except (RuntimeError, UnicodeDecodeError, AttributeError): def save(k): if k == "y": - ask_save_path("Save data: ", data, master, state) - - master.prompt_onekey( - "Cannot copy binary data to clipboard. Save as file?", - ( + ask_save_path("Save data", data, master, state) + signals.status_prompt_onekey.send( + prompt = "Cannot copy data to clipboard. Save as file?", + keys = ( ("yes", "y"), ("no", "n"), ), - save + callback = save ) @@ -273,14 +322,11 @@ def ask_copy_part(scope, flow, master, state): if scope != "s": choices.append(("url", "u")) - master.prompt_onekey( - "Copy", - choices, - copy_flow, - scope, - flow, - master, - state + signals.status_prompt_onekey.send( + prompt = "Copy", + keys = choices, + callback = copy_flow, + args = (scope, flow, master, state) ) @@ -297,16 +343,14 @@ def ask_save_body(part, master, state, flow): # We first need to determine whether we want to save the request or the # response content. if request_has_content and response_has_content: - master.prompt_onekey( - "Save", - ( + signals.status_prompt_onekey.send( + prompt = "Save", + keys = ( ("request", "q"), ("response", "s"), ), - ask_save_body, - master, - state, - flow + callback = ask_save_body, + args = (master, state, flow) ) elif response_has_content: ask_save_body("s", master, state, flow) @@ -315,30 +359,27 @@ def ask_save_body(part, master, state, flow): elif part == "q" and request_has_content: ask_save_path( - "Save request content: ", + "Save request content", flow.request.get_decoded_content(), master, state ) elif part == "s" and response_has_content: ask_save_path( - "Save response content: ", + "Save response content", flow.response.get_decoded_content(), master, state ) else: - master.statusbar.message("No content to save.") + signals.status_message.send(message="No content to save.") -class FlowCache: - @utils.LRUCache(200) - def format_flow(self, *args): - return raw_format_flow(*args) -flowcache = FlowCache() +flowcache = utils.LRUCache(800) -def format_flow(f, focus, extended=False, hostheader=False, padding=2): +def format_flow(f, focus, extended=False, hostheader=False, padding=2, + marked=False): d = dict( intercepted = f.intercepted, acked = f.reply.acked, @@ -350,10 +391,12 @@ def format_flow(f, focus, extended=False, hostheader=False, padding=2): err_msg = f.error.msg if f.error else None, resp_code = f.response.code if f.response else None, + + marked = marked, ) if f.response: if f.response.content: - contentdesc = utils.pretty_size(len(f.response.content)) + contentdesc = netlib.utils.pretty_size(len(f.response.content)) elif f.response.content == CONTENT_MISSING: contentdesc = "[content missing]" else: @@ -374,6 +417,7 @@ def format_flow(f, focus, extended=False, hostheader=False, padding=2): d["resp_ctype"] = t[0].split(";")[0] else: d["resp_ctype"] = "" - return flowcache.format_flow( + return flowcache.get( + raw_format_flow, tuple(sorted(d.items())), focus, extended, padding ) diff --git a/libmproxy/console/contentview.py b/libmproxy/console/contentview.py index 95d908a4..e4ffcd47 100644 --- a/libmproxy/console/contentview.py +++ b/libmproxy/console/contentview.py @@ -6,27 +6,28 @@ import lxml.html import lxml.etree from PIL import Image from PIL.ExifTags import TAGS -import re import subprocess import traceback import urwid +import html2text import netlib.utils +from netlib import odict -from . import common -from .. import utils, encoding, flow -from ..contrib import jsbeautifier, html2text +from . import common, signals +from .. import utils, encoding +from ..contrib import jsbeautifier from ..contrib.wbxml.ASCommandResponse import ASCommandResponse try: import pyamf from pyamf import remoting, flex -except ImportError: # pragma nocover +except ImportError: # pragma nocover pyamf = None try: import cssutils -except ImportError: # pragma nocover +except ImportError: # pragma nocover cssutils = None else: cssutils.log.setLevel(logging.CRITICAL) @@ -36,7 +37,7 @@ else: cssutils.ser.prefs.indentClosingBrace = False cssutils.ser.prefs.validOnly = False -VIEW_CUTOFF = 1024*50 +VIEW_CUTOFF = 1024 * 50 def _view_text(content, total, limit): @@ -59,7 +60,7 @@ def trailer(clen, txt, limit): txt.append( urwid.Text( [ - ("highlight", "... %s of data not shown. Press "%utils.pretty_size(rem)), + ("highlight", "... %s of data not shown. Press " % netlib.utils.pretty_size(rem)), ("key", "f"), ("highlight", " to load all data.") ] @@ -76,7 +77,7 @@ class ViewAuto: ctype = hdrs.get_first("content-type") if ctype: ct = utils.parse_content_type(ctype) if ctype else None - ct = "%s/%s"%(ct[0], ct[1]) + ct = "%s/%s" % (ct[0], ct[1]) if ct in content_types_map: return content_types_map[ct][0](hdrs, content, limit) elif utils.isXML(content): @@ -227,7 +228,7 @@ class ViewURLEncoded: lines = utils.urldecode(content) if lines: body = common.format_keyvals( - [(k+":", v) for (k, v) in lines], + [(k + ":", v) for (k, v) in lines], key = "header", val = "text" ) @@ -240,33 +241,13 @@ class ViewMultipart: content_types = ["multipart/form-data"] def __call__(self, hdrs, content, limit): - v = hdrs.get_first("content-type") + v = utils.multipartdecode(hdrs, content) if v: - v = utils.parse_content_type(v) - if not v: - return - boundary = v[2].get("boundary") - if not boundary: - return - - rx = re.compile(r'\bname="([^"]+)"') - keys = [] - vals = [] - - for i in content.split("--" + boundary): - parts = i.splitlines() - if len(parts) > 1 and parts[0][0:2] != "--": - match = rx.search(parts[1]) - if match: - keys.append(match.group(1) + ":") - vals.append(netlib.utils.cleanBin( - "\n".join(parts[3+parts[2:].index(""):]) - )) r = [ urwid.Text(("highlight", "Form data:\n")), ] r.extend(common.format_keyvals( - zip(keys, vals), + v, key = "header", val = "text" )) @@ -324,7 +305,6 @@ if pyamf: if not envelope: return None - txt = [] for target, message in iter(envelope): if isinstance(message, pyamf.remoting.Request): @@ -335,13 +315,13 @@ if pyamf: else: txt.append(urwid.Text([ ("header", "Response: "), - ("text", "%s, code %s"%(target, message.status)), + ("text", "%s, code %s" % (target, message.status)), ])) s = json.dumps(self.unpack(message), indent=4) txt.extend(_view_text(s[:limit], len(s), limit)) - return "AMF v%s"%envelope.amfVersion, txt + return "AMF v%s" % envelope.amfVersion, txt class ViewJavaScript: @@ -395,7 +375,7 @@ class ViewImage: return None parts = [ ("Format", str(img.format_description)), - ("Size", "%s x %s px"%img.size), + ("Size", "%s x %s px" % img.size), ("Mode", str(img.mode)), ] for i in sorted(img.info.keys()): @@ -421,7 +401,7 @@ class ViewImage: key = "header", val = "text" ) - return "%s image"%img.format, fmt + return "%s image" % img.format, fmt class ViewProtobuf: @@ -528,7 +508,7 @@ def get(name): return i -def get_content_view(viewmode, hdrItems, content, limit, logfunc, is_request): +def get_content_view(viewmode, hdrItems, content, limit, is_request): """ Returns a (msg, body) tuple. """ @@ -539,21 +519,21 @@ def get_content_view(viewmode, hdrItems, content, limit, logfunc, is_request): return "No content", "" msg = [] - hdrs = flow.ODictCaseless([list(i) for i in hdrItems]) + hdrs = odict.ODictCaseless([list(i) for i in hdrItems]) enc = hdrs.get_first("content-encoding") if enc and enc != "identity": decoded = encoding.decode(enc, content) if decoded: content = decoded - msg.append("[decoded %s]"%enc) + msg.append("[decoded %s]" % enc) try: ret = viewmode(hdrs, content, limit) # Third-party viewers can fail in unexpected ways... except Exception: s = traceback.format_exc() s = "Content viewer failed: \n" + s - logfunc(s, "error") + signals.add_event(s, "error") ret = None if not ret: ret = get("Raw")(hdrs, content, limit) diff --git a/libmproxy/console/flowdetailview.py b/libmproxy/console/flowdetailview.py index f351bff1..40769c95 100644 --- a/libmproxy/console/flowdetailview.py +++ b/libmproxy/console/flowdetailview.py @@ -1,113 +1,154 @@ from __future__ import absolute_import import urwid -from . import common +from . import common, searchable from .. import utils -footer = [ - ('heading_key', "q"), ":back ", -] -class FlowDetailsView(urwid.ListBox): - def __init__(self, master, flow, state): - self.master, self.flow, self.state = master, flow, state - urwid.ListBox.__init__( - self, - self.flowtext() - ) +def maybe_timestamp(base, attr): + if base and getattr(base, attr): + return utils.format_timestamp_with_milli(getattr(base, attr)) + else: + return "active" + pass - def keypress(self, size, key): - key = common.shortcuts(key) - if key == "q": - self.master.statusbar = self.state[0] - self.master.body = self.state[1] - self.master.header = self.state[2] - self.master.loop.widget = self.master.make_view() - return None - elif key == "?": - key = None - return urwid.ListBox.keypress(self, size, key) - - def flowtext(self): - text = [] - - title = urwid.Text("Flow details") - title = urwid.Padding(title, align="left", width=("relative", 100)) - title = urwid.AttrWrap(title, "heading") - text.append(title) - - cc = self.flow.client_conn - sc = self.flow.server_conn - req = self.flow.request - resp = self.flow.response - - if sc: - text.append(urwid.Text([("head", "Server Connection:")])) - parts = [ - ["Address", "%s:%s" % sc.address()], - ] - text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) - - c = sc.cert - if c: - text.append(urwid.Text([("head", "Server Certificate:")])) - parts = [ - ["Type", "%s, %s bits"%c.keyinfo], - ["SHA1 digest", c.digest("sha1")], - ["Valid to", str(c.notafter)], - ["Valid from", str(c.notbefore)], - ["Serial", str(c.serial)], - [ - "Subject", - urwid.BoxAdapter( - urwid.ListBox(common.format_keyvals(c.subject, key="highlight", val="text")), - len(c.subject) - ) - ], - [ - "Issuer", - urwid.BoxAdapter( - urwid.ListBox(common.format_keyvals(c.issuer, key="highlight", val="text")), - len(c.issuer) - ) - ] - ] +def flowdetails(state, flow): + text = [] - if c.altnames: - parts.append( - [ - "Alt names", - ", ".join(c.altnames) - ] - ) - text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) + cc = flow.client_conn + sc = flow.server_conn + req = flow.request + resp = flow.response - if cc: - text.append(urwid.Text([("head", "Client Connection:")])) + if sc: + text.append(urwid.Text([("head", "Server Connection:")])) + parts = [ + ["Address", "%s:%s" % sc.address()], + ] + + text.extend( + common.format_keyvals(parts, key="key", val="text", indent=4) + ) + c = sc.cert + if c: + text.append(urwid.Text([("head", "Server Certificate:")])) parts = [ - ["Address", "%s:%s" % cc.address()], - # ["Requests", "%s"%cc.requestcount], + ["Type", "%s, %s bits" % c.keyinfo], + ["SHA1 digest", c.digest("sha1")], + ["Valid to", str(c.notafter)], + ["Valid from", str(c.notbefore)], + ["Serial", str(c.serial)], + [ + "Subject", + urwid.BoxAdapter( + urwid.ListBox( + common.format_keyvals( + c.subject, + key="highlight", + val="text" + ) + ), + len(c.subject) + ) + ], + [ + "Issuer", + urwid.BoxAdapter( + urwid.ListBox( + common.format_keyvals( + c.issuer, key="highlight", val="text" + ) + ), + len(c.issuer) + ) + ] ] - text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) + if c.altnames: + parts.append( + [ + "Alt names", + ", ".join(c.altnames) + ] + ) + text.extend( + common.format_keyvals(parts, key="key", val="text", indent=4) + ) - parts = [] + if cc: + text.append(urwid.Text([("head", "Client Connection:")])) - parts.append(["Client conn. established", utils.format_timestamp_with_milli(cc.timestamp_start) if (cc and cc.timestamp_start) else "active"]) - parts.append(["Server conn. initiated", utils.format_timestamp_with_milli(sc.timestamp_start) if sc else "active" ]) - parts.append(["Server conn. TCP handshake", utils.format_timestamp_with_milli(sc.timestamp_tcp_setup) if (sc and sc.timestamp_tcp_setup) else "active"]) - if sc.ssl_established: - parts.append(["Server conn. SSL handshake", utils.format_timestamp_with_milli(sc.timestamp_ssl_setup) if sc.timestamp_ssl_setup else "active"]) - parts.append(["Client conn. SSL handshake", utils.format_timestamp_with_milli(cc.timestamp_ssl_setup) if (cc and cc.timestamp_ssl_setup) else "active"]) - parts.append(["First request byte", utils.format_timestamp_with_milli(req.timestamp_start)]) - parts.append(["Request complete", utils.format_timestamp_with_milli(req.timestamp_end) if req.timestamp_end else "active"]) - parts.append(["First response byte", utils.format_timestamp_with_milli(resp.timestamp_start) if resp else "active"]) - parts.append(["Response complete", utils.format_timestamp_with_milli(resp.timestamp_end) if (resp and resp.timestamp_end) else "active"]) + parts = [ + ["Address", "%s:%s" % cc.address()], + # ["Requests", "%s"%cc.requestcount], + ] - # sort operations by timestamp - parts = sorted(parts, key=lambda p: p[1]) + text.extend( + common.format_keyvals(parts, key="key", val="text", indent=4) + ) - text.append(urwid.Text([("head", "Timing:")])) - text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) - return text + parts = [] + + parts.append( + [ + "Client conn. established", + maybe_timestamp(cc, "timestamp_start") + ] + ) + parts.append( + [ + "Server conn. initiated", + maybe_timestamp(sc, "timestamp_start") + ] + ) + parts.append( + [ + "Server conn. TCP handshake", + maybe_timestamp(sc, "timestamp_tcp_setup") + ] + ) + if sc.ssl_established: + parts.append( + [ + "Server conn. SSL handshake", + maybe_timestamp(sc, "timestamp_ssl_setup") + ] + ) + parts.append( + [ + "Client conn. SSL handshake", + maybe_timestamp(cc, "timestamp_ssl_setup") + ] + ) + parts.append( + [ + "First request byte", + maybe_timestamp(req, "timestamp_start") + ] + ) + parts.append( + [ + "Request complete", + maybe_timestamp(req, "timestamp_end") + ] + ) + parts.append( + [ + "First response byte", + maybe_timestamp(resp, "timestamp_start") + ] + ) + parts.append( + [ + "Response complete", + maybe_timestamp(resp, "timestamp_end") + ] + ) + + # sort operations by timestamp + parts = sorted(parts, key=lambda p: p[1]) + + text.append(urwid.Text([("head", "Timing:")])) + text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) + return searchable.Searchable(state, text) diff --git a/libmproxy/console/flowlist.py b/libmproxy/console/flowlist.py index 5d8ad942..bb23df75 100644 --- a/libmproxy/console/flowlist.py +++ b/libmproxy/console/flowlist.py @@ -1,7 +1,7 @@ from __future__ import absolute_import import urwid from netlib import http -from . import common +from . import common, signals def _mkhelp(): @@ -15,11 +15,13 @@ def _mkhelp(): ("D", "duplicate flow"), ("e", "toggle eventlog"), ("F", "toggle follow flow list"), - ("g", "copy flow to clipboard"), ("l", "set limit filter pattern"), ("L", "load saved flows"), + ("m", "toggle flow mark"), ("n", "create a new request"), + ("P", "copy flow to clipboard"), ("r", "replay request"), + ("U", "unmark all marked flows"), ("V", "revert changes to request"), ("w", "save flows "), ("W", "stream flows to file"), @@ -47,6 +49,10 @@ class EventListBox(urwid.ListBox): if key == "C": self.master.clear_events() key = None + elif key == "G": + self.set_focus(0) + elif key == "g": + self.set_focus(len(self.master.eventlist) - 1) return urwid.ListBox.keypress(self, size, key) @@ -72,7 +78,8 @@ class BodyPile(urwid.Pile): def keypress(self, size, key): if key == "tab": - self.focus_position = (self.focus_position + 1)%len(self.widget_list) + self.focus_position = ( + self.focus_position + 1) % len(self.widget_list) if self.focus_position == 1: self.widget_list[1].header = self.active_header else: @@ -103,7 +110,8 @@ class ConnectionItem(urwid.WidgetWrap): return common.format_flow( self.flow, self.f, - hostheader = self.master.showhost + hostheader = self.master.showhost, + marked=self.state.flow_marked(self.flow) ) def selectable(self): @@ -111,17 +119,20 @@ class ConnectionItem(urwid.WidgetWrap): def save_flows_prompt(self, k): if k == "a": - self.master.path_prompt( - "Save all flows to: ", - self.state.last_saveload, - self.master.save_flows + signals.status_prompt_path.send( + prompt = "Save all flows to", + callback = self.master.save_flows + ) + elif k == "m": + signals.status_prompt_path.send( + prompt = "Save marked flows to", + callback = self.master.save_marked_flows ) else: - self.master.path_prompt( - "Save this flow to: ", - self.state.last_saveload, - self.master.save_one_flow, - self.flow + signals.status_prompt_path.send( + prompt = "Save this flow to", + callback = self.master.save_one_flow, + args = (self.flow,) ) def stop_server_playback_prompt(self, a): @@ -150,64 +161,82 @@ class ConnectionItem(urwid.WidgetWrap): self.master.options.replay_ignore_host ) else: - self.master.path_prompt( - "Server replay path: ", - self.state.last_saveload, - self.master.server_playback_path + signals.status_prompt_path.send( + prompt = "Server replay path", + callback = self.master.server_playback_path ) - def keypress(self, (maxcol,), key): + def mouse_event(self, size, event, button, col, row, focus): + if event == "mouse press" and button == 1: + if self.flow.request: + self.master.view_flow(self.flow) + return True + + def keypress(self, xxx_todo_changeme, key): + (maxcol,) = xxx_todo_changeme key = common.shortcuts(key) if key == "a": self.flow.accept_intercept(self.master) - self.master.sync_list_view() + signals.flowlist_change.send(self) elif key == "d": self.flow.kill(self.master) self.state.delete_flow(self.flow) - self.master.sync_list_view() + signals.flowlist_change.send(self) elif key == "D": f = self.master.duplicate_flow(self.flow) self.master.view_flow(f) + elif key == "m": + if self.state.flow_marked(self.flow): + self.state.set_flow_marked(self.flow, False) + else: + self.state.set_flow_marked(self.flow, True) + signals.flowlist_change.send(self) elif key == "r": r = self.master.replay_request(self.flow) if r: - self.master.statusbar.message(r) - self.master.sync_list_view() + signals.status_message.send(message=r) + signals.flowlist_change.send(self) elif key == "S": if not self.master.server_playback: - self.master.prompt_onekey( - "Server Replay", - ( + signals.status_prompt_onekey.send( + prompt = "Server Replay", + keys = ( ("all flows", "a"), ("this flow", "t"), ("file", "f"), ), - self.server_replay_prompt, + callback = self.server_replay_prompt, ) else: - self.master.prompt_onekey( - "Stop current server replay?", - ( + signals.status_prompt_onekey.send( + prompt = "Stop current server replay?", + keys = ( ("yes", "y"), ("no", "n"), ), - self.stop_server_playback_prompt, + callback = self.stop_server_playback_prompt, ) + elif key == "U": + for f in self.state.flows: + self.state.set_flow_marked(f, False) + signals.flowlist_change.send(self) elif key == "V": if not self.flow.modified(): - self.master.statusbar.message("Flow not modified.") + signals.status_message.send(message="Flow not modified.") return self.state.revert(self.flow) - self.master.sync_list_view() - self.master.statusbar.message("Reverted.") + signals.flowlist_change.send(self) + signals.status_message.send(message="Reverted.") elif key == "w": - self.master.prompt_onekey( - "Save", - ( + signals.status_prompt_onekey.send( + self, + prompt = "Save", + keys = ( ("all flows", "a"), ("this flow", "t"), + ("marked flows", "m"), ), - self.save_flows_prompt, + callback = self.save_flows_prompt, ) elif key == "X": self.flow.kill(self.master) @@ -215,13 +244,12 @@ class ConnectionItem(urwid.WidgetWrap): if self.flow.request: self.master.view_flow(self.flow) elif key == "|": - self.master.path_prompt( - "Send flow to script: ", - self.state.last_script, - self.master.run_script_once, - self.flow + signals.status_prompt_path.send( + prompt = "Send flow to script", + callback = self.master.run_script_once, + args = (self.flow,) ) - elif key == "g": + elif key == "P": common.ask_copy_part("a", self.flow, self.master, self.state) elif key == "b": common.ask_save_body(None, self.master, self.state, self.flow) @@ -232,8 +260,10 @@ class ConnectionItem(urwid.WidgetWrap): class FlowListWalker(urwid.ListWalker): def __init__(self, master, state): self.master, self.state = master, state - if self.state.flow_count(): - self.set_focus(0) + signals.flowlist_change.connect(self.sig_flowlist_change) + + def sig_flowlist_change(self, sender): + self._modified() def get_focus(self): f, i = self.state.get_focus() @@ -258,7 +288,10 @@ class FlowListWalker(urwid.ListWalker): class FlowListBox(urwid.ListBox): def __init__(self, master): self.master = master - urwid.ListBox.__init__(self, master.flow_list_walker) + urwid.ListBox.__init__( + self, + FlowListWalker(master, master.state) + ) def get_method_raw(self, k): if k: @@ -266,7 +299,12 @@ class FlowListBox(urwid.ListBox): def get_method(self, k): if k == "e": - self.master.prompt("Method:", "", self.get_method_raw) + signals.status_prompt.send( + self, + prompt = "Method", + text = "", + callback = self.get_method_raw + ) else: method = "" for i in common.METHOD_OPTIONS: @@ -275,17 +313,17 @@ class FlowListBox(urwid.ListBox): self.get_url(method) def get_url(self, method): - self.master.prompt( - "URL:", - "http://www.example.com/", - self.new_request, - method + signals.status_prompt.send( + prompt = "URL", + text = "http://www.example.com/", + callback = self.new_request, + args = (method,) ) def new_request(self, url, method): parts = http.parse_url(str(url)) if not parts: - self.master.statusbar.message("Invalid Url") + signals.status_message.send(message="Invalid Url") return scheme, host, port, path = parts f = self.master.create_request(method, scheme, host, port, path) @@ -295,28 +333,34 @@ class FlowListBox(urwid.ListBox): key = common.shortcuts(key) if key == "A": self.master.accept_all() - self.master.sync_list_view() + signals.flowlist_change.send(self) elif key == "C": self.master.clear_flows() elif key == "e": self.master.toggle_eventlog() + elif key == "G": + self.master.state.set_focus(0) + signals.flowlist_change.send(self) + elif key == "g": + self.master.state.set_focus(self.master.state.flow_count()) + signals.flowlist_change.send(self) elif key == "l": - self.master.prompt( - "Limit: ", - self.master.state.limit_txt, - self.master.set_limit + signals.status_prompt.send( + prompt = "Limit", + text = self.master.state.limit_txt, + callback = self.master.set_limit ) elif key == "L": - self.master.path_prompt( - "Load flows: ", - self.master.state.last_saveload, - self.master.load_flows_callback + signals.status_prompt_path.send( + self, + prompt = "Load flows", + callback = self.master.load_flows_callback ) elif key == "n": - self.master.prompt_onekey( - "Method", - common.METHOD_OPTIONS, - self.get_method + signals.status_prompt_onekey.send( + prompt = "Method", + keys = common.METHOD_OPTIONS, + callback = self.get_method ) elif key == "F": self.master.toggle_follow_flows() @@ -324,10 +368,10 @@ class FlowListBox(urwid.ListBox): if self.master.stream: self.master.stop_stream() else: - self.master.path_prompt( - "Stream flows to: ", - self.master.state.last_saveload, - self.master.start_stream_to_path + signals.status_prompt_path.send( + self, + prompt = "Stream flows to", + callback = self.master.start_stream_to_path ) else: return urwid.ListBox.keypress(self, size, key) diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py index 89e75aad..c6c4c10d 100644 --- a/libmproxy/console/flowview.py +++ b/libmproxy/console/flowview.py @@ -1,12 +1,16 @@ from __future__ import absolute_import -import os, sys, copy +import os +import sys import urwid -from . import common, grideditor, contentview -from .. import utils, flow, controller +from netlib import odict +from . import common, grideditor, contentview, signals, searchable, tabs +from . import flowdetailview +from .. import utils, controller from ..protocol.http import HTTPRequest, HTTPResponse, CONTENT_MISSING, decoded -class SearchError(Exception): pass +class SearchError(Exception): + pass def _mkhelp(): @@ -19,58 +23,58 @@ def _mkhelp(): ("D", "duplicate flow"), ("e", "edit request/response"), ("f", "load full body data"), - ("g", "copy response(content/headers) to clipboard"), ("m", "change body display mode for this entity"), - (None, - common.highlight_key("automatic", "a") + - [("text", ": automatic detection")] - ), - (None, - common.highlight_key("hex", "e") + - [("text", ": Hex")] - ), - (None, - common.highlight_key("html", "h") + - [("text", ": HTML")] - ), - (None, - common.highlight_key("image", "i") + - [("text", ": Image")] - ), - (None, - common.highlight_key("javascript", "j") + - [("text", ": JavaScript")] - ), - (None, - common.highlight_key("json", "s") + - [("text", ": JSON")] - ), - (None, - common.highlight_key("urlencoded", "u") + - [("text", ": URL-encoded data")] - ), - (None, - common.highlight_key("raw", "r") + - [("text", ": raw data")] - ), - (None, - common.highlight_key("xml", "x") + - [("text", ": XML")] - ), + (None, + common.highlight_key("automatic", "a") + + [("text", ": automatic detection")] + ), + (None, + common.highlight_key("hex", "e") + + [("text", ": Hex")] + ), + (None, + common.highlight_key("html", "h") + + [("text", ": HTML")] + ), + (None, + common.highlight_key("image", "i") + + [("text", ": Image")] + ), + (None, + common.highlight_key("javascript", "j") + + [("text", ": JavaScript")] + ), + (None, + common.highlight_key("json", "s") + + [("text", ": JSON")] + ), + (None, + common.highlight_key("urlencoded", "u") + + [("text", ": URL-encoded data")] + ), + (None, + common.highlight_key("raw", "r") + + [("text", ": raw data")] + ), + (None, + common.highlight_key("xml", "x") + + [("text", ": XML")] + ), ("M", "change default body display mode"), ("p", "previous flow"), + ("P", "copy response(content/headers) to clipboard"), ("r", "replay request"), ("V", "revert changes to request"), ("v", "view body in external viewer"), ("w", "save all flows matching current limit"), ("W", "save this flow"), ("x", "delete body"), - ("X", "view flow details"), ("z", "encode/decode a request/response"), - ("tab", "toggle request/response view"), + ("tab", "next tab"), + ("h, l", "previous tab, next tab"), ("space", "next flow"), ("|", "run script on this flow"), - ("/", "search in response body (case sensitive)"), + ("/", "search (case sensitive)"), ("n", "repeat search forward"), ("N", "repeat search backwards"), ] @@ -87,417 +91,170 @@ footer = [ class FlowViewHeader(urwid.WidgetWrap): def __init__(self, master, f): self.master, self.flow = master, f - self._w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost) - - def refresh_flow(self, f): - if f == self.flow: - self._w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost) - + self._w = common.format_flow( + f, + False, + extended=True, + padding=0, + hostheader=self.master.showhost + ) + signals.flow_change.connect(self.sig_flow_change) + + def sig_flow_change(self, sender, flow): + if flow == self.flow: + self._w = common.format_flow( + flow, + False, + extended=True, + padding=0, + hostheader=self.master.showhost + ) -class CallbackCache: - @utils.LRUCache(200) - def _callback(self, method, *args, **kwargs): - return getattr(self.obj, method)(*args, **kwargs) - def callback(self, obj, method, *args, **kwargs): - # obj varies! - self.obj = obj - return self._callback(method, *args, **kwargs) -cache = CallbackCache() +cache = utils.LRUCache(200) +TAB_REQ = 0 +TAB_RESP = 1 -class FlowView(urwid.WidgetWrap): - REQ = 0 - RESP = 1 +class FlowView(tabs.Tabs): highlight_color = "focusfield" - def __init__(self, master, state, flow): + def __init__(self, master, state, flow, tab_offset): self.master, self.state, self.flow = master, state, flow + tabs.Tabs.__init__(self, + [ + (self.tab_request, self.view_request), + (self.tab_response, self.view_response), + (self.tab_details, self.view_details), + ], + tab_offset + ) + self.show() self.last_displayed_body = None - if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: - self.view_response() - else: - self.view_request() + signals.flow_change.connect(self.sig_flow_change) - def _cached_content_view(self, viewmode, hdrItems, content, limit, is_request): - return contentview.get_content_view(viewmode, hdrItems, content, limit, self.master.add_event, is_request) + def tab_request(self): + if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response: + return "Request intercepted" + else: + return "Request" - def content_view(self, viewmode, conn): - full = self.state.get_flow_setting( - self.flow, - (self.state.view_flow_mode, "fullcontents"), - False - ) - if full: - limit = sys.maxint + def tab_response(self): + if self.flow.intercepted and not self.flow.reply.acked and self.flow.response: + return "Response intercepted" else: - limit = contentview.VIEW_CUTOFF - description, text_objects = cache.callback( - self, "_cached_content_view", - viewmode, - tuple(tuple(i) for i in conn.headers.lst), - conn.content, - limit, - isinstance(conn, HTTPRequest) - ) - return (description, text_objects) + return "Response" - def cont_view_handle_missing(self, conn, viewmode): - if conn.content == CONTENT_MISSING: - msg, body = "", [urwid.Text([("error", "[content missing]")])] - else: - msg, body = self.content_view(viewmode, conn) + def tab_details(self): + return "Detail" + + def view_request(self): + return self.conn_text(self.flow.request) + + def view_response(self): + return self.conn_text(self.flow.response) + + def view_details(self): + return flowdetailview.flowdetails(self.state, self.flow) + def sig_flow_change(self, sender, flow): + if flow == self.flow: + self.show() + + def content_view(self, viewmode, conn): + if conn.content == CONTENT_MISSING: + msg, body = "", [urwid.Text([("error", "[content missing]")])] return (msg, body) + else: + full = self.state.get_flow_setting( + self.flow, + (self.tab_offset, "fullcontents"), + False + ) + if full: + limit = sys.maxsize + else: + limit = contentview.VIEW_CUTOFF + description, text_objects = cache.get( + contentview.get_content_view, + viewmode, + tuple(tuple(i) for i in conn.headers.lst), + conn.content, + limit, + isinstance(conn, HTTPRequest) + ) + return (description, text_objects) - def viewmode_get(self, override): + def viewmode_get(self): + override = self.state.get_flow_setting( + self.flow, + (self.tab_offset, "prettyview") + ) return self.state.default_body_view if override is None else override - def override_get(self): - return self.state.get_flow_setting(self.flow, - (self.state.view_flow_mode, "prettyview")) - - def conn_text_raw(self, conn): - """ - Based on a request/response, conn, returns the elements for - display. - """ - headers = common.format_keyvals( - [(h+":", v) for (h, v) in conn.headers.lst], + def conn_text(self, conn): + if conn: + txt = common.format_keyvals( + [(h + ":", v) for (h, v) in conn.headers.lst], key = "header", val = "text" ) - override = self.override_get() - viewmode = self.viewmode_get(override) - msg, body = self.cont_view_handle_missing(conn, viewmode) - return headers, msg, body - - def conn_text_merge(self, headers, msg, body): - """ - Grabs what is returned by conn_text_raw and merges them all - toghether, mainly used by conn_text and search - """ - override = self.override_get() - viewmode = self.viewmode_get(override) - - cols = [urwid.Text( - [ - ("heading", msg), - ] - ) - ] + viewmode = self.viewmode_get() + msg, body = self.content_view(viewmode, conn) - if override is not None: - cols.append(urwid.Text([ + cols = [ + urwid.Text( + [ + ("heading", msg), + ] + ) + ] + cols.append( + urwid.Text( + [ " ", ('heading', "["), ('heading_key', "m"), - ('heading', (":%s]"%viewmode.name)), + ('heading', (":%s]" % viewmode.name)), ], align="right" ) ) + title = urwid.AttrWrap(urwid.Columns(cols), "heading") - title = urwid.AttrWrap(urwid.Columns(cols), "heading") - headers.append(title) - headers.extend(body) - - return headers - - def conn_text(self, conn): - """ - Same as conn_text_raw, but returns result wrapped in a listbox ready for usage. - """ - headers, msg, body = self.conn_text_raw(conn) - merged = self.conn_text_merge(headers, msg, body) - return urwid.ListBox(merged) - - def _tab(self, content, attr): - p = urwid.Text(content) - p = urwid.Padding(p, align="left", width=("relative", 100)) - p = urwid.AttrWrap(p, attr) - return p - - def wrap_body(self, active, body): - parts = [] - - if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response: - qt = "Request intercepted" - else: - qt = "Request" - if active == common.VIEW_FLOW_REQUEST: - parts.append(self._tab(qt, "heading")) + txt.append(title) + txt.extend(body) else: - parts.append(self._tab(qt, "heading_inactive")) - - if self.flow.intercepted and not self.flow.reply.acked and self.flow.response: - st = "Response intercepted" - else: - st = "Response" - if active == common.VIEW_FLOW_RESPONSE: - parts.append(self._tab(st, "heading")) - else: - parts.append(self._tab(st, "heading_inactive")) - - h = urwid.Columns(parts) - f = urwid.Frame( - body, - header=h + txt = [ + urwid.Text(""), + urwid.Text( + [ + ("highlight", "No response. Press "), + ("key", "e"), + ("highlight", " and edit any aspect to add one."), + ] ) - return f - - def search_wrapped_around(self, last_find_line, last_search_index, backwards): - """ - returns true if search wrapped around the bottom. - """ - - current_find_line = self.state.get_flow_setting(self.flow, - "last_find_line") - current_search_index = self.state.get_flow_setting(self.flow, - "last_search_index") - - if not backwards: - message = "search hit BOTTOM, continuing at TOP" - if current_find_line <= last_find_line: - return True, message - elif current_find_line == last_find_line: - if current_search_index <= last_search_index: - return True, message - else: - message = "search hit TOP, continuing at BOTTOM" - if current_find_line >= last_find_line: - return True, message - elif current_find_line == last_find_line: - if current_search_index >= last_search_index: - return True, message - - return False, "" - - def search_again(self, backwards=False): - """ - runs the previous search again, forwards or backwards. - """ - last_search_string = self.state.get_flow_setting(self.flow, "last_search_string") - if last_search_string: - message = self.search(last_search_string, backwards) - if message: - self.master.statusbar.message(message) - else: - message = "no previous searches have been made" - self.master.statusbar.message(message) - - return message - - def search(self, search_string, backwards=False): - """ - similar to view_response or view_request, but instead of just - displaying the conn, it highlights a word that the user is - searching for and handles all the logic surrounding that. - """ - - if not search_string: - search_string = self.state.get_flow_setting(self.flow, - "last_search_string") - if not search_string: - return - - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - text = self.flow.request - const = common.VIEW_FLOW_REQUEST - else: - text = self.flow.response - const = common.VIEW_FLOW_RESPONSE - if not self.flow.response: - return "no response to search in" - - last_find_line = self.state.get_flow_setting(self.flow, - "last_find_line") - last_search_index = self.state.get_flow_setting(self.flow, - "last_search_index") - - # generate the body, highlight the words and get focus - headers, msg, body = self.conn_text_raw(text) - try: - body, focus_position = self.search_highlight_text(body, search_string, backwards=backwards) - except SearchError: - return "Search not supported in this view." - - if focus_position == None: - # no results found. - return "no matches for '%s'" % search_string - - # UI stuff. - merged = self.conn_text_merge(headers, msg, body) - list_box = urwid.ListBox(merged) - list_box.set_focus(focus_position + 2) - self._w = self.wrap_body(const, list_box) - self.master.statusbar.redraw() - - self.last_displayed_body = list_box - - wrapped, wrapped_message = self.search_wrapped_around(last_find_line, last_search_index, backwards) - - if wrapped: - return wrapped_message - - def search_get_start(self, search_string): - start_line = 0 - start_index = 0 - last_search_string = self.state.get_flow_setting(self.flow, "last_search_string") - if search_string == last_search_string: - start_line = self.state.get_flow_setting(self.flow, "last_find_line") - start_index = self.state.get_flow_setting(self.flow, - "last_search_index") - - if start_index == None: - start_index = 0 - else: - start_index += len(search_string) - - if start_line == None: - start_line = 0 - - else: - self.state.add_flow_setting(self.flow, "last_search_string", - search_string) - - return (start_line, start_index) - - def search_get_range(self, len_text_objects, start_line, backwards): - if not backwards: - loop_range = xrange(start_line, len_text_objects) - else: - loop_range = xrange(start_line, -1, -1) - - return loop_range - - def search_find(self, text, search_string, start_index, backwards): - if backwards == False: - find_index = text.find(search_string, start_index) - else: - if start_index != 0: - start_index -= len(search_string) - else: - start_index = None - - find_index = text.rfind(search_string, 0, start_index) - - return find_index - - def search_highlight_text(self, text_objects, search_string, looping = False, backwards = False): - start_line, start_index = self.search_get_start(search_string) - i = start_line - - found = False - text_objects = copy.deepcopy(text_objects) - loop_range = self.search_get_range(len(text_objects), start_line, backwards) - for i in loop_range: - text_object = text_objects[i] - - try: - text, style = text_object.get_text() - except AttributeError: - raise SearchError() - - if i != start_line: - start_index = 0 - - find_index = self.search_find(text, search_string, start_index, backwards) - - if find_index != -1: - new_text = self.search_highlight_object(text, find_index, search_string) - text_objects[i] = new_text - - found = True - self.state.add_flow_setting(self.flow, "last_search_index", - find_index) - self.state.add_flow_setting(self.flow, "last_find_line", i) - - break - - # handle search WRAP - if found: - focus_pos = i - else : - if looping: - focus_pos = None - else: - if not backwards: - self.state.add_flow_setting(self.flow, "last_search_index", 0) - self.state.add_flow_setting(self.flow, "last_find_line", 0) - else: - self.state.add_flow_setting(self.flow, "last_search_index", None) - self.state.add_flow_setting(self.flow, "last_find_line", len(text_objects) - 1) - - text_objects, focus_pos = self.search_highlight_text(text_objects, - search_string, looping=True, backwards=backwards) - - return text_objects, focus_pos - - def search_highlight_object(self, text_object, find_index, search_string): - """ - just a little abstraction - """ - before = text_object[:find_index] - after = text_object[find_index+len(search_string):] - - new_text = urwid.Text( - [ - before, - (self.highlight_color, search_string), - after, ] - ) - - return new_text - - def view_request(self): - self.state.view_flow_mode = common.VIEW_FLOW_REQUEST - body = self.conn_text(self.flow.request) - self._w = self.wrap_body(common.VIEW_FLOW_REQUEST, body) - self.master.statusbar.redraw() - - def view_response(self): - self.state.view_flow_mode = common.VIEW_FLOW_RESPONSE - if self.flow.response: - body = self.conn_text(self.flow.response) - else: - body = urwid.ListBox( - [ - urwid.Text(""), - urwid.Text( - [ - ("highlight", "No response. Press "), - ("key", "e"), - ("highlight", " and edit any aspect to add one."), - ] - ) - ] - ) - self._w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body) - self.master.statusbar.redraw() - - def refresh_flow(self, c=None): - if c == self.flow: - if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response: - self.view_response() - else: - self.view_request() + return searchable.Searchable(self.state, txt) def set_method_raw(self, m): if m: self.flow.request.method = m - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def edit_method(self, m): if m == "e": - self.master.prompt_edit("Method", self.flow.request.method, self.set_method_raw) + signals.status_prompt.send( + prompt = "Method", + text = self.flow.request.method, + callback = self.set_method_raw + ) else: for i in common.METHOD_OPTIONS: if i[1] == m: self.flow.request.method = i[0].upper() - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def set_url(self, url): request = self.flow.request @@ -505,7 +262,7 @@ class FlowView(urwid.WidgetWrap): request.url = str(url) except ValueError: return "Invalid URL." - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def set_resp_code(self, code): response = self.flow.response @@ -514,87 +271,161 @@ class FlowView(urwid.WidgetWrap): except ValueError: return None import BaseHTTPServer - if BaseHTTPServer.BaseHTTPRequestHandler.responses.has_key(int(code)): - response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[int(code)][0] - self.master.refresh_flow(self.flow) + if int(code) in BaseHTTPServer.BaseHTTPRequestHandler.responses: + response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[ + int(code)][0] + signals.flow_change.send(self, flow = self.flow) def set_resp_msg(self, msg): response = self.flow.response response.msg = msg - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def set_headers(self, lst, conn): - conn.headers = flow.ODictCaseless(lst) + conn.headers = odict.ODictCaseless(lst) + signals.flow_change.send(self, flow = self.flow) def set_query(self, lst, conn): - conn.set_query(flow.ODict(lst)) + conn.set_query(odict.ODict(lst)) + signals.flow_change.send(self, flow = self.flow) def set_path_components(self, lst, conn): - conn.set_path_components([i[0] for i in lst]) + conn.set_path_components(lst) + signals.flow_change.send(self, flow = self.flow) def set_form(self, lst, conn): - conn.set_form_urlencoded(flow.ODict(lst)) + conn.set_form_urlencoded(odict.ODict(lst)) + signals.flow_change.send(self, flow = self.flow) def edit_form(self, conn): self.master.view_grideditor( - grideditor.URLEncodedFormEditor(self.master, conn.get_form_urlencoded().lst, self.set_form, conn) + grideditor.URLEncodedFormEditor( + self.master, + conn.get_form_urlencoded().lst, + self.set_form, + conn + ) ) def edit_form_confirm(self, key, conn): if key == "y": self.edit_form(conn) + def set_cookies(self, lst, conn): + od = odict.ODict(lst) + conn.set_cookies(od) + signals.flow_change.send(self, flow = self.flow) + + def set_setcookies(self, data, conn): + conn.set_cookies(data) + signals.flow_change.send(self, flow = self.flow) + def edit(self, part): - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + if self.tab_offset == TAB_REQ: message = self.flow.request else: if not self.flow.response: self.flow.response = HTTPResponse( self.flow.request.httpversion, - 200, "OK", flow.ODictCaseless(), "" + 200, "OK", odict.ODictCaseless(), "" ) self.flow.response.reply = controller.DummyReply() message = self.flow.response self.flow.backup() + if message == self.flow.request and part == "c": + self.master.view_grideditor( + grideditor.CookieEditor( + self.master, + message.get_cookies().lst, + self.set_cookies, + message + ) + ) + if message == self.flow.response and part == "c": + self.master.view_grideditor( + grideditor.SetCookieEditor( + self.master, + message.get_cookies(), + self.set_setcookies, + message + ) + ) if part == "r": with decoded(message): - # Fix an issue caused by some editors when editing a request/response body. - # Many editors make it hard to save a file without a terminating newline on the last - # line. When editing message bodies, this can cause problems. For now, I just - # strip the newlines off the end of the body when we return from an editor. + # Fix an issue caused by some editors when editing a + # request/response body. Many editors make it hard to save a + # file without a terminating newline on the last line. When + # editing message bodies, this can cause problems. For now, I + # just strip the newlines off the end of the body when we return + # from an editor. c = self.master.spawn_editor(message.content or "") message.content = c.rstrip("\n") elif part == "f": if not message.get_form_urlencoded() and message.content: - self.master.prompt_onekey( - "Existing body is not a URL-encoded form. Clear and edit?", - [ + signals.status_prompt_onekey.send( + prompt = "Existing body is not a URL-encoded form. Clear and edit?", + keys = [ ("yes", "y"), ("no", "n"), ], - self.edit_form_confirm, - message + callback = self.edit_form_confirm, + args = (message,) ) else: self.edit_form(message) elif part == "h": - self.master.view_grideditor(grideditor.HeaderEditor(self.master, message.headers.lst, self.set_headers, message)) + self.master.view_grideditor( + grideditor.HeaderEditor( + self.master, + message.headers.lst, + self.set_headers, + message + ) + ) elif part == "p": p = message.get_path_components() - p = [[i] for i in p] - self.master.view_grideditor(grideditor.PathEditor(self.master, p, self.set_path_components, message)) + self.master.view_grideditor( + grideditor.PathEditor( + self.master, + p, + self.set_path_components, + message + ) + ) elif part == "q": - self.master.view_grideditor(grideditor.QueryEditor(self.master, message.get_query().lst, self.set_query, message)) - elif part == "u" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - self.master.prompt_edit("URL", message.url, self.set_url) - elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - self.master.prompt_onekey("Method", common.METHOD_OPTIONS, self.edit_method) - elif part == "c" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: - self.master.prompt_edit("Code", str(message.code), self.set_resp_code) - elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: - self.master.prompt_edit("Message", message.msg, self.set_resp_msg) - self.master.refresh_flow(self.flow) + self.master.view_grideditor( + grideditor.QueryEditor( + self.master, + message.get_query().lst, + self.set_query, message + ) + ) + elif part == "u": + signals.status_prompt.send( + prompt = "URL", + text = message.url, + callback = self.set_url + ) + elif part == "m": + signals.status_prompt_onekey.send( + prompt = "Method", + keys = common.METHOD_OPTIONS, + callback = self.edit_method + ) + elif part == "o": + signals.status_prompt.send( + prompt = "Code", + text = str(message.code), + callback = self.set_resp_code + ) + elif part == "m": + signals.status_prompt.send( + prompt = "Message", + text = message.msg, + callback = self.set_resp_msg + ) + signals.flow_change.send(self, flow = self.flow) def _view_nextprev_flow(self, np, flow): try: @@ -606,9 +437,10 @@ class FlowView(urwid.WidgetWrap): else: new_flow, new_idx = self.state.get_prev(idx) if new_flow is None: - self.master.statusbar.message("No more flows!") - return - self.master.view_flow(new_flow) + signals.status_message.send(message="No more flows!") + else: + signals.pop_view_state.send(self) + self.master.view_flow(new_flow, self.tab_offset) def view_next_flow(self, flow): return self._view_nextprev_flow("next", flow) @@ -619,42 +451,38 @@ class FlowView(urwid.WidgetWrap): def change_this_display_mode(self, t): self.state.add_flow_setting( self.flow, - (self.state.view_flow_mode, "prettyview"), + (self.tab_offset, "prettyview"), contentview.get_by_shortcut(t) ) - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def delete_body(self, t): if t == "m": val = CONTENT_MISSING else: val = None - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + if self.tab_offset == TAB_REQ: self.flow.request.content = val else: self.flow.response.content = val - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def keypress(self, size, key): + key = super(self.__class__, self).keypress(size, key) + if key == " ": self.view_next_flow(self.flow) return key = common.shortcuts(key) - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + if self.tab_offset == TAB_REQ: conn = self.flow.request - else: + elif self.tab_offset == TAB_RESP: conn = self.flow.response + else: + conn = None - if key == "q": - self.master.view_flowlist() - key = None - elif key == "tab": - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - self.view_response() - else: - self.view_request() - elif key in ("up", "down", "page up", "page down"): + if key in ("up", "down", "page up", "page down"): # Why doesn't this just work?? self._w.keypress(size, key) elif key == "a": @@ -663,15 +491,10 @@ class FlowView(urwid.WidgetWrap): elif key == "A": self.master.accept_all() self.master.view_flow(self.flow) - elif key == "b": - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - common.ask_save_body("q", self.master, self.state, self.flow) - else: - common.ask_save_body("s", self.master, self.state, self.flow) elif key == "d": if self.state.flow_count() == 1: self.master.view_flowlist() - elif self.state.view.index(self.flow) == len(self.state.view)-1: + elif self.state.view.index(self.flow) == len(self.state.view) - 1: self.view_prev_flow(self.flow) else: self.view_next_flow(self.flow) @@ -681,134 +504,143 @@ class FlowView(urwid.WidgetWrap): elif key == "D": f = self.master.duplicate_flow(self.flow) self.master.view_flow(f) - self.master.statusbar.message("Duplicated.") - elif key == "e": - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - self.master.prompt_onekey( - "Edit request", - ( - ("query", "q"), - ("path", "p"), - ("url", "u"), - ("header", "h"), - ("form", "f"), - ("raw body", "r"), - ("method", "m"), - ), - self.edit - ) - else: - self.master.prompt_onekey( - "Edit response", - ( - ("code", "c"), - ("message", "m"), - ("header", "h"), - ("raw body", "r"), - ), - self.edit - ) - key = None - elif key == "f": - self.master.statusbar.message("Loading all body data...") - self.state.add_flow_setting( - self.flow, - (self.state.view_flow_mode, "fullcontents"), - True - ) - self.master.refresh_flow(self.flow) - self.master.statusbar.message("") - elif key == "g": - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - scope = "q" - else: - scope = "s" - common.ask_copy_part(scope, self.flow, self.master, self.state) - elif key == "m": - p = list(contentview.view_prompts) - p.insert(0, ("Clear", "C")) - self.master.prompt_onekey( - "Display mode", - p, - self.change_this_display_mode - ) - key = None + signals.status_message.send(message="Duplicated.") elif key == "p": self.view_prev_flow(self.flow) elif key == "r": r = self.master.replay_request(self.flow) if r: - self.master.statusbar.message(r) - self.master.refresh_flow(self.flow) + signals.status_message.send(message=r) + signals.flow_change.send(self, flow = self.flow) elif key == "V": if not self.flow.modified(): - self.master.statusbar.message("Flow not modified.") + signals.status_message.send(message="Flow not modified.") return self.state.revert(self.flow) - self.master.refresh_flow(self.flow) - self.master.statusbar.message("Reverted.") + signals.flow_change.send(self, flow = self.flow) + signals.status_message.send(message="Reverted.") elif key == "W": - self.master.path_prompt( - "Save this flow: ", - self.state.last_saveload, - self.master.save_one_flow, - self.flow + signals.status_prompt_path.send( + prompt = "Save this flow", + callback = self.master.save_one_flow, + args = (self.flow,) ) - elif key == "v": - if conn and conn.content: - t = conn.headers["content-type"] or [None] - t = t[0] - if os.environ.has_key("EDITOR") or os.environ.has_key("PAGER"): - self.master.spawn_external_viewer(conn.content, t) - else: - self.master.statusbar.message("Error! Set $EDITOR or $PAGER.") elif key == "|": - self.master.path_prompt( - "Send flow to script: ", self.state.last_script, - self.master.run_script_once, self.flow + signals.status_prompt_path.send( + prompt = "Send flow to script", + callback = self.master.run_script_once, + args = (self.flow,) ) - elif key == "x": - self.master.prompt_onekey( - "Delete body", - ( - ("completely", "c"), - ("mark as missing", "m"), - ), - self.delete_body + + if not conn and key in set(list("befgmxvz")): + signals.status_message.send( + message = "Tab to the request or response", + expire = 1 ) - key = None - elif key == "X": - self.master.view_flowdetails(self.flow) - elif key == "z": - if conn: + elif conn: + if key == "b": + if self.tab_offset == TAB_REQ: + common.ask_save_body( + "q", self.master, self.state, self.flow + ) + else: + common.ask_save_body( + "s", self.master, self.state, self.flow + ) + elif key == "e": + if self.tab_offset == TAB_REQ: + signals.status_prompt_onekey.send( + prompt = "Edit request", + keys = ( + ("cookies", "c"), + ("query", "q"), + ("path", "p"), + ("url", "u"), + ("header", "h"), + ("form", "f"), + ("raw body", "r"), + ("method", "m"), + ), + callback = self.edit + ) + else: + signals.status_prompt_onekey.send( + prompt = "Edit response", + keys = ( + ("cookies", "c"), + ("code", "o"), + ("message", "m"), + ("header", "h"), + ("raw body", "r"), + ), + callback = self.edit + ) + key = None + elif key == "f": + signals.status_message.send(message="Loading all body data...") + self.state.add_flow_setting( + self.flow, + (self.tab_offset, "fullcontents"), + True + ) + signals.flow_change.send(self, flow = self.flow) + signals.status_message.send(message="") + elif key == "P": + if self.tab_offset == TAB_REQ: + scope = "q" + else: + scope = "s" + common.ask_copy_part(scope, self.flow, self.master, self.state) + elif key == "m": + p = list(contentview.view_prompts) + p.insert(0, ("Clear", "C")) + signals.status_prompt_onekey.send( + self, + prompt = "Display mode", + keys = p, + callback = self.change_this_display_mode + ) + key = None + elif key == "x": + signals.status_prompt_onekey.send( + prompt = "Delete body", + keys = ( + ("completely", "c"), + ("mark as missing", "m"), + ), + callback = self.delete_body + ) + key = None + elif key == "v": + if conn.content: + t = conn.headers["content-type"] or [None] + t = t[0] + if "EDITOR" in os.environ or "PAGER" in os.environ: + self.master.spawn_external_viewer(conn.content, t) + else: + signals.status_message.send( + message = "Error! Set $EDITOR or $PAGER." + ) + elif key == "z": self.flow.backup() e = conn.headers.get_first("content-encoding", "identity") if e != "identity": if not conn.decode(): - self.master.statusbar.message("Could not decode - invalid data?") + signals.status_message.send( + message = "Could not decode - invalid data?" + ) else: - self.master.prompt_onekey( - "Select encoding: ", - ( + signals.status_prompt_onekey.send( + prompt = "Select encoding: ", + keys = ( ("gzip", "z"), ("deflate", "d"), ), - self.encode_callback, - conn + callback = self.encode_callback, + args = (conn,) ) - self.master.refresh_flow(self.flow) - elif key == "/": - last_search_string = self.state.get_flow_setting(self.flow, "last_search_string") - search_prompt = "Search body ["+last_search_string+"]: " if last_search_string else "Search body: " - self.master.prompt(search_prompt, - None, - self.search) - elif key == "n": - self.search_again(backwards=False) - elif key == "N": - self.search_again(backwards=True) - else: - return key + signals.flow_change.send(self, flow = self.flow) + return key def encode_callback(self, key, conn): encoding_map = { @@ -816,4 +648,4 @@ class FlowView(urwid.WidgetWrap): "d": "deflate", } conn.encode(encoding_map[key]) - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) diff --git a/libmproxy/console/grideditor.py b/libmproxy/console/grideditor.py index fe3df509..b20e54e4 100644 --- a/libmproxy/console/grideditor.py +++ b/libmproxy/console/grideditor.py @@ -5,31 +5,99 @@ import re import os import urwid -from . import common +from . import common, signals from .. import utils, filt, script -from netlib import http_uastrings +from netlib import http_uastrings, http_cookies, odict -footer = [ +FOOTER = [ ('heading_key', "enter"), ":edit ", ('heading_key', "q"), ":back ", ] -footer_editing = [ +FOOTER_EDITING = [ ('heading_key', "esc"), ":stop editing ", ] -class SText(urwid.WidgetWrap): - def __init__(self, txt, focused, error): +class TextColumn: + subeditor = None + + def __init__(self, heading): + self.heading = heading + + def text(self, obj): + return SEscaped(obj or "") + + def blank(self): + return "" + + def keypress(self, key, editor): + if key == "r": + if editor.walker.get_current_value() is not None: + signals.status_prompt_path.send( + self, + prompt = "Read file", + callback = editor.read_file + ) + elif key == "R": + if editor.walker.get_current_value() is not None: + signals.status_prompt_path.send( + editor, + prompt = "Read unescaped file", + callback = editor.read_file, + args = (True,) + ) + elif key == "e": + o = editor.walker.get_current_value() + if o is not None: + n = editor.master.spawn_editor(o.encode("string-escape")) + n = utils.clean_hanging_newline(n) + editor.walker.set_current_value(n, False) + editor.walker._modified() + elif key in ["enter"]: + editor.walker.start_edit() + else: + return key + + +class SubgridColumn: + def __init__(self, heading, subeditor): + self.heading = heading + self.subeditor = subeditor + + def text(self, obj): + p = http_cookies._format_pairs(obj, sep="\n") + return urwid.Text(p) + + def blank(self): + return [] + + def keypress(self, key, editor): + if key in "rRe": + signals.status_message.send( + self, + message = "Press enter to edit this field.", + expire = 1000 + ) + return + elif key in ["enter"]: + editor.master.view_grideditor( + self.subeditor( + editor.master, + editor.walker.get_current_value(), + editor.set_subeditor_value, + editor.walker.focus, + editor.walker.focus_col + ) + ) + else: + return key + + +class SEscaped(urwid.WidgetWrap): + def __init__(self, txt): txt = txt.encode("string-escape") w = urwid.Text(txt, wrap="any") - if focused: - if error: - w = urwid.AttrWrap(w, "focusfield_error") - else: - w = urwid.AttrWrap(w, "focusfield") - elif error: - w = urwid.AttrWrap(w, "field_error") urwid.WidgetWrap.__init__(self, w) def get_text(self): @@ -50,7 +118,7 @@ class SEdit(urwid.WidgetWrap): urwid.WidgetWrap.__init__(self, w) def get_text(self): - return self._w.get_text()[0] + return self._w.get_text()[0].strip() def selectable(self): return True @@ -67,9 +135,15 @@ class GridRow(urwid.WidgetWrap): self.editing = SEdit(v) self.fields.append(self.editing) else: - self.fields.append( - SText(v, True if focused == i else False, i in errors) - ) + w = self.editor.columns[i].text(v) + if focused == i: + if i in errors: + w = urwid.AttrWrap(w, "focusfield_error") + else: + w = urwid.AttrWrap(w, "focusfield") + elif i in errors: + w = urwid.AttrWrap(w, "field_error") + self.fields.append(w) fspecs = self.fields[:] if len(self.fields) > 1: @@ -101,6 +175,7 @@ class GridWalker(urwid.ListWalker): and errors is a set with an entry of each offset in rows that is an error. """ + def __init__(self, lst, editor): self.lst = [(i, set([])) for i in lst] self.editor = editor @@ -125,31 +200,43 @@ class GridWalker(urwid.ListWalker): try: val = val.decode("string-escape") except ValueError: - self.editor.master.statusbar.message( - "Invalid Python-style string encoding.", 1000 + signals.status_message.send( + self, + message = "Invalid Python-style string encoding.", + expire = 1000 ) return errors = self.lst[self.focus][1] emsg = self.editor.is_error(self.focus_col, val) if emsg: - self.editor.master.statusbar.message(emsg, 1000) + signals.status_message.send(message = emsg, expire = 1) errors.add(self.focus_col) else: errors.discard(self.focus_col) - - row = list(self.lst[self.focus][0]) - row[self.focus_col] = val - self.lst[self.focus] = [tuple(row), errors] + self.set_value(val, self.focus, self.focus_col, errors) + + def set_value(self, val, focus, focus_col, errors=None): + if not errors: + errors = set([]) + row = list(self.lst[focus][0]) + row[focus_col] = val + self.lst[focus] = [tuple(row), errors] + self._modified() def delete_focus(self): if self.lst: del self.lst[self.focus] - self.focus = min(len(self.lst)-1, self.focus) + self.focus = min(len(self.lst) - 1, self.focus) self._modified() def _insert(self, pos): self.focus = pos - self.lst.insert(self.focus, [[""]*self.editor.columns, set([])]) + self.lst.insert( + self.focus, + [ + [c.blank() for c in self.editor.columns], set([]) + ] + ) self.focus_col = 0 self.start_edit() @@ -160,16 +247,17 @@ class GridWalker(urwid.ListWalker): return self._insert(min(self.focus + 1, len(self.lst))) def start_edit(self): - if self.lst: + col = self.editor.columns[self.focus_col] + if self.lst and not col.subeditor: self.editing = GridRow( self.focus_col, True, self.editor, self.lst[self.focus] ) - self.editor.master.statusbar.update(footer_editing) + self.editor.master.loop.widget.footer.update(FOOTER_EDITING) self._modified() def stop_edit(self): if self.editing: - self.editor.master.statusbar.update(footer) + self.editor.master.loop.widget.footer.update(FOOTER) self.set_current_value(self.editing.get_edit_value(), False) self.editing = False self._modified() @@ -179,14 +267,14 @@ class GridWalker(urwid.ListWalker): self._modified() def right(self): - self.focus_col = min(self.focus_col + 1, self.editor.columns-1) + self.focus_col = min(self.focus_col + 1, len(self.editor.columns) - 1) self._modified() def tab_next(self): self.stop_edit() - if self.focus_col < self.editor.columns-1: + if self.focus_col < len(self.editor.columns) - 1: self.focus_col += 1 - elif self.focus != len(self.lst)-1: + elif self.focus != len(self.lst) - 1: self.focus_col = 0 self.focus += 1 self._modified() @@ -207,16 +295,17 @@ class GridWalker(urwid.ListWalker): def set_focus(self, focus): self.stop_edit() self.focus = focus + self._modified() def get_next(self, pos): - if pos+1 >= len(self.lst): + if pos + 1 >= len(self.lst): return None, None - return GridRow(None, False, self.editor, self.lst[pos+1]), pos+1 + return GridRow(None, False, self.editor, self.lst[pos + 1]), pos + 1 def get_prev(self, pos): - if pos-1 < 0: + if pos - 1 < 0: return None, None - return GridRow(None, False, self.editor, self.lst[pos-1]), pos-1 + return GridRow(None, False, self.editor, self.lst[pos - 1]), pos - 1 class GridListBox(urwid.ListBox): @@ -231,17 +320,16 @@ FIRST_WIDTH_MIN = 20 class GridEditor(urwid.WidgetWrap): title = None columns = None - headings = None def __init__(self, master, value, callback, *cb_args, **cb_kwargs): - value = copy.deepcopy(value) + value = self.data_in(copy.deepcopy(value)) self.master, self.value, self.callback = master, value, callback self.cb_args, self.cb_kwargs = cb_args, cb_kwargs first_width = 20 if value: for r in value: - assert len(r) == self.columns + assert len(r) == len(self.columns) first_width = max(len(r), first_width) self.first_width = min(first_width, FIRST_WIDTH_MAX) @@ -250,9 +338,9 @@ class GridEditor(urwid.WidgetWrap): title = urwid.AttrWrap(title, "heading") headings = [] - for i, h in enumerate(self.headings): - c = urwid.Text(h) - if i == 0 and len(self.headings) > 1: + for i, col in enumerate(self.columns): + c = urwid.Text(col.heading) + if i == 0 and len(self.columns) > 1: headings.append(("fixed", first_width + 2, c)) else: headings.append(c) @@ -268,7 +356,7 @@ class GridEditor(urwid.WidgetWrap): self.lb, header = urwid.Pile([title, h]) ) - self.master.statusbar.update("") + self.master.loop.widget.footer.update("") self.show_empty_msg() def show_empty_msg(self): @@ -300,9 +388,12 @@ class GridEditor(urwid.WidgetWrap): d = file(p, "rb").read() self.walker.set_current_value(d, unescaped) self.walker._modified() - except IOError, v: + except IOError as v: return str(v) + def set_subeditor_value(self, val, focus, focus_col): + self.walker.set_value(val, focus, focus_col) + def keypress(self, size, key): if self.walker.editing: if key in ["esc"]: @@ -317,13 +408,18 @@ class GridEditor(urwid.WidgetWrap): return None key = common.shortcuts(key) + column = self.columns[self.walker.focus_col] if key in ["q", "esc"]: res = [] for i in self.walker.lst: - if not i[1] and any([x.strip() for x in i[0]]): + if not i[1] and any([x for x in i[0]]): res.append(i[0]) - self.callback(res, *self.cb_args, **self.cb_kwargs) - self.master.pop_view() + self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs) + signals.pop_view_state.send(self) + elif key == "G": + self.walker.set_focus(0) + elif key == "g": + self.walker.set_focus(len(self.walker.lst) - 1) elif key in ["h", "left"]: self.walker.left() elif key in ["l", "right"]: @@ -336,26 +432,22 @@ class GridEditor(urwid.WidgetWrap): self.walker.insert() elif key == "d": self.walker.delete_focus() - elif key == "r": - if self.walker.get_current_value() is not None: - self.master.path_prompt("Read file: ", "", self.read_file) - elif key == "R": - if self.walker.get_current_value() is not None: - self.master.path_prompt( - "Read unescaped file: ", "", self.read_file, True - ) - elif key == "e": - o = self.walker.get_current_value() - if o is not None: - n = self.master.spawn_editor(o.encode("string-escape")) - n = utils.clean_hanging_newline(n) - self.walker.set_current_value(n, False) - self.walker._modified() - elif key in ["enter"]: - self.walker.start_edit() - elif not self.handle_key(key): + elif column.keypress(key, self) and not self.handle_key(key): return self._w.keypress(size, key) + def data_out(self, data): + """ + Called on raw list data, before data is returned through the + callback. + """ + return data + + def data_in(self, data): + """ + Called to prepare provided data. + """ + return data + def is_error(self, col, val): """ Return False, or a string error message. @@ -373,10 +465,10 @@ class GridEditor(urwid.WidgetWrap): ("a", "add row after cursor"), ("d", "delete row"), ("e", "spawn external editor on current field"), - ("q", "return to flow view"), + ("q", "save changes and exit editor"), ("r", "read value from file"), ("R", "read unescaped value from file"), - ("esc", "return to flow view/exit field edit mode"), + ("esc", "save changes and exit editor"), ("tab", "next field"), ("enter", "edit field"), ] @@ -396,14 +488,18 @@ class GridEditor(urwid.WidgetWrap): class QueryEditor(GridEditor): title = "Editing query" - columns = 2 - headings = ("Key", "Value") + columns = [ + TextColumn("Key"), + TextColumn("Value") + ] class HeaderEditor(GridEditor): title = "Editing headers" - columns = 2 - headings = ("Key", "Value") + columns = [ + TextColumn("Key"), + TextColumn("Value") + ] def make_help(self): h = GridEditor.make_help(self) @@ -431,24 +527,29 @@ class HeaderEditor(GridEditor): def handle_key(self, key): if key == "U": - self.master.prompt_onekey( - "Add User-Agent header:", - [(i[0], i[1]) for i in http_uastrings.UASTRINGS], - self.set_user_agent, + signals.status_prompt_onekey.send( + prompt = "Add User-Agent header:", + keys = [(i[0], i[1]) for i in http_uastrings.UASTRINGS], + callback = self.set_user_agent, ) return True class URLEncodedFormEditor(GridEditor): title = "Editing URL-encoded form" - columns = 2 - headings = ("Key", "Value") + columns = [ + TextColumn("Key"), + TextColumn("Value") + ] class ReplaceEditor(GridEditor): title = "Editing replacement patterns" - columns = 3 - headings = ("Filter", "Regex", "Replacement") + columns = [ + TextColumn("Filter"), + TextColumn("Regex"), + TextColumn("Replacement"), + ] def is_error(self, col, val): if col == 0: @@ -464,8 +565,11 @@ class ReplaceEditor(GridEditor): class SetHeadersEditor(GridEditor): title = "Editing header set patterns" - columns = 3 - headings = ("Filter", "Header", "Value") + columns = [ + TextColumn("Filter"), + TextColumn("Header"), + TextColumn("Value"), + ] def is_error(self, col, val): if col == 0: @@ -500,39 +604,105 @@ class SetHeadersEditor(GridEditor): def handle_key(self, key): if key == "U": - self.master.prompt_onekey( - "Add User-Agent header:", - [(i[0], i[1]) for i in http_uastrings.UASTRINGS], - self.set_user_agent, + signals.status_prompt_onekey.send( + prompt = "Add User-Agent header:", + keys = [(i[0], i[1]) for i in http_uastrings.UASTRINGS], + callback = self.set_user_agent, ) return True class PathEditor(GridEditor): title = "Editing URL path components" - columns = 1 - headings = ("Component",) + columns = [ + TextColumn("Component"), + ] + + def data_in(self, data): + return [[i] for i in data] + + def data_out(self, data): + return [i[0] for i in data] class ScriptEditor(GridEditor): title = "Editing scripts" - columns = 1 - headings = ("Command",) + columns = [ + TextColumn("Command"), + ] def is_error(self, col, val): try: script.Script.parse_command(val) - except script.ScriptError, v: + except script.ScriptError as v: return str(v) class HostPatternEditor(GridEditor): title = "Editing host patterns" - columns = 1 - headings = ("Regex (matched on hostname:port / ip:port)",) + columns = [ + TextColumn("Regex (matched on hostname:port / ip:port)") + ] def is_error(self, col, val): try: re.compile(val, re.IGNORECASE) except re.error as e: return "Invalid regex: %s" % str(e) + + def data_in(self, data): + return [[i] for i in data] + + def data_out(self, data): + return [i[0] for i in data] + + +class CookieEditor(GridEditor): + title = "Editing request Cookie header" + columns = [ + TextColumn("Name"), + TextColumn("Value"), + ] + + +class CookieAttributeEditor(GridEditor): + title = "Editing Set-Cookie attributes" + columns = [ + TextColumn("Name"), + TextColumn("Value"), + ] + + def data_out(self, data): + ret = [] + for i in data: + if not i[1]: + ret.append([i[0], None]) + else: + ret.append(i) + return ret + + +class SetCookieEditor(GridEditor): + title = "Editing response SetCookie header" + columns = [ + TextColumn("Name"), + TextColumn("Value"), + SubgridColumn("Attributes", CookieAttributeEditor), + ] + + def data_in(self, data): + flattened = [] + for k, v in data.items(): + flattened.append([k, v[0], v[1].lst]) + return flattened + + def data_out(self, data): + vals = [] + for i in data: + vals.append( + [ + i[0], + [i[1], odict.ODictCaseless(i[2])] + ] + ) + return odict.ODict(vals) diff --git a/libmproxy/console/help.py b/libmproxy/console/help.py index 6bb49a92..4e81a566 100644 --- a/libmproxy/console/help.py +++ b/libmproxy/console/help.py @@ -2,18 +2,17 @@ from __future__ import absolute_import import urwid -from . import common +from . import common, signals from .. import filt, version footer = [ - ("heading", 'mitmproxy v%s '%version.VERSION), + ("heading", 'mitmproxy v%s ' % version.VERSION), ('heading_key', "q"), ":back ", ] class HelpView(urwid.ListBox): - def __init__(self, master, help_context, state): - self.master, self.state = master, state + def __init__(self, help_context): self.help_context = help_context or [] urwid.ListBox.__init__( self, @@ -29,101 +28,26 @@ class HelpView(urwid.ListBox): keys = [ ("j, k", "down, up"), ("h, l", "left, right (in some contexts)"), + ("g, G", "go to end, beginning"), ("space", "page down"), ("pg up/down", "page up/down"), ("arrows", "up, down, left, right"), ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + text.extend( + common.format_keyvals( + keys, + key="key", + val="text", + indent=4)) text.append(urwid.Text([("head", "\n\nGlobal keys:\n")])) keys = [ ("c", "client replay"), - ("H", "edit global header set patterns"), - ("I", "set ignore pattern"), ("i", "set interception pattern"), - ("M", "change global default display mode"), - (None, - common.highlight_key("automatic", "a") + - [("text", ": automatic detection")] - ), - (None, - common.highlight_key("hex", "e") + - [("text", ": Hex")] - ), - (None, - common.highlight_key("html", "h") + - [("text", ": HTML")] - ), - (None, - common.highlight_key("image", "i") + - [("text", ": Image")] - ), - (None, - common.highlight_key("javascript", "j") + - [("text", ": JavaScript")] - ), - (None, - common.highlight_key("json", "s") + - [("text", ": JSON")] - ), - (None, - common.highlight_key("css", "c") + - [("text", ": CSS")] - ), - (None, - common.highlight_key("urlencoded", "u") + - [("text", ": URL-encoded data")] - ), - (None, - common.highlight_key("raw", "r") + - [("text", ": raw data")] - ), - (None, - common.highlight_key("xml", "x") + - [("text", ": XML")] - ), - (None, - common.highlight_key("wbxml", "w") + - [("text", ": WBXML")] - ), - (None, - common.highlight_key("amf", "f") + - [("text", ": AMF (requires PyAMF)")] - ), - ("o", "toggle options:"), - (None, - common.highlight_key("anticache", "a") + - [("text", ": prevent cached responses")] - ), - (None, - common.highlight_key("anticomp", "c") + - [("text", ": prevent compressed responses")] - ), - (None, - common.highlight_key("showhost", "h") + - [("text", ": use Host header for URL display")] - ), - (None, - common.highlight_key("killextra", "k") + - [("text", ": kill requests not part of server replay")] - ), - (None, - common.highlight_key("norefresh", "n") + - [("text", ": disable server replay response refresh")] - ), - (None, - common.highlight_key("upstream certs", "u") + - [("text", ": sniff cert info from upstream server")] - ), - - ("q", "quit / return to flow list"), + ("o", "options"), + ("q", "quit / return to previous page"), ("Q", "quit without confirm prompt"), - ("R", "edit replacement patterns"), - ("s", "add/remove scripts"), ("S", "server replay"), - ("t", "set sticky cookie expression"), - ("T", "set tcp proxying pattern"), - ("u", "set sticky auth expression"), ] text.extend( common.format_keyvals(keys, key="key", val="text", indent=4) @@ -133,15 +57,15 @@ class HelpView(urwid.ListBox): f = [] for i in filt.filt_unary: f.append( - ("~%s"%i.code, i.help) + ("~%s" % i.code, i.help) ) for i in filt.filt_rex: f.append( - ("~%s regex"%i.code, i.help) + ("~%s regex" % i.code, i.help) ) for i in filt.filt_int: f.append( - ("~%s int"%i.code, i.help) + ("~%s int" % i.code, i.help) ) f.sort() f.extend( @@ -156,7 +80,7 @@ class HelpView(urwid.ListBox): text.append( urwid.Text( - [ + [ "\n", ("text", " Regexes are Python-style.\n"), ("text", " Regexes can be specified as quoted strings.\n"), @@ -164,13 +88,13 @@ class HelpView(urwid.ListBox): ("text", " Expressions with no operators are regex matches against URL.\n"), ("text", " Default binary operator is &.\n"), ("head", "\n Examples:\n"), - ] + ] ) ) examples = [ - ("google\.com", "Url containing \"google.com"), - ("~q ~b test", "Requests where body contains \"test\""), - ("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."), + ("google\.com", "Url containing \"google.com"), + ("~q ~b test", "Requests where body contains \"test\""), + ("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."), ] text.extend( common.format_keyvals(examples, key="key", val="text", indent=4) @@ -180,11 +104,12 @@ class HelpView(urwid.ListBox): def keypress(self, size, key): key = common.shortcuts(key) if key == "q": - self.master.statusbar = self.state[0] - self.master.body = self.state[1] - self.master.header = self.state[2] - self.master.loop.widget = self.master.make_view() + signals.pop_view_state.send(self) return None elif key == "?": key = None + elif key == "G": + self.set_focus(0) + elif key == "g": + self.set_focus(len(self.body.contents)) return urwid.ListBox.keypress(self, size, key) diff --git a/libmproxy/console/options.py b/libmproxy/console/options.py new file mode 100644 index 00000000..58a4d469 --- /dev/null +++ b/libmproxy/console/options.py @@ -0,0 +1,269 @@ +import urwid + +from . import common, signals, grideditor, contentview +from . import select, palettes + +footer = [ + ('heading_key', "enter/space"), ":toggle ", + ('heading_key', "C"), ":clear all ", +] + + +def _mkhelp(): + text = [] + keys = [ + ("enter/space", "activate option"), + ("C", "clear all options"), + ] + text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + return text +help_context = _mkhelp() + + +class Options(urwid.WidgetWrap): + def __init__(self, master): + self.master = master + self.lb = select.Select( + [ + select.Heading("Traffic Manipulation"), + select.Option( + "Header Set Patterns", + "H", + lambda: master.setheaders.count(), + self.setheaders + ), + select.Option( + "Ignore Patterns", + "I", + lambda: master.server.config.check_ignore, + self.ignorepatterns + ), + select.Option( + "Replacement Patterns", + "R", + lambda: master.replacehooks.count(), + self.replacepatterns + ), + select.Option( + "Scripts", + "S", + lambda: master.scripts, + self.scripts + ), + + select.Heading("Interface"), + select.Option( + "Default Display Mode", + "M", + self.has_default_displaymode, + self.default_displaymode + ), + select.Option( + "Palette", + "P", + lambda: self.master.palette != palettes.DEFAULT, + self.palette + ), + select.Option( + "Show Host", + "w", + lambda: master.showhost, + self.toggle_showhost + ), + + select.Heading("Network"), + select.Option( + "No Upstream Certs", + "U", + lambda: master.server.config.no_upstream_cert, + self.toggle_upstream_cert + ), + select.Option( + "TCP Proxying", + "T", + lambda: master.server.config.check_tcp, + self.tcp_proxy + ), + + select.Heading("Utility"), + select.Option( + "Anti-Cache", + "a", + lambda: master.anticache, + self.toggle_anticache + ), + select.Option( + "Anti-Compression", + "o", + lambda: master.anticomp, + self.toggle_anticomp + ), + select.Option( + "Kill Extra", + "x", + lambda: master.killextra, + self.toggle_killextra + ), + select.Option( + "No Refresh", + "f", + lambda: not master.refresh_server_playback, + self.toggle_refresh_server_playback + ), + select.Option( + "Sticky Auth", + "A", + lambda: master.stickyauth_txt, + self.sticky_auth + ), + select.Option( + "Sticky Cookies", + "t", + lambda: master.stickycookie_txt, + self.sticky_cookie + ), + ] + ) + title = urwid.Text("Options") + title = urwid.Padding(title, align="left", width=("relative", 100)) + title = urwid.AttrWrap(title, "heading") + self._w = urwid.Frame( + self.lb, + header = title + ) + self.master.loop.widget.footer.update("") + signals.update_settings.connect(self.sig_update_settings) + + def sig_update_settings(self, sender): + self.lb.walker._modified() + + def keypress(self, size, key): + if key == "C": + self.clearall() + return None + return super(self.__class__, self).keypress(size, key) + + def clearall(self): + self.master.anticache = False + self.master.anticomp = False + self.master.killextra = False + self.master.showhost = False + self.master.refresh_server_playback = True + self.master.server.config.no_upstream_cert = False + self.master.setheaders.clear() + self.master.replacehooks.clear() + self.master.set_ignore_filter([]) + self.master.set_tcp_filter([]) + self.master.scripts = [] + self.master.set_stickyauth(None) + self.master.set_stickycookie(None) + self.master.state.default_body_view = contentview.get("Auto") + + signals.update_settings.send(self) + signals.status_message.send( + message = "All select.Options cleared", + expire = 1 + ) + + def toggle_anticache(self): + self.master.anticache = not self.master.anticache + + def toggle_anticomp(self): + self.master.anticomp = not self.master.anticomp + + def toggle_killextra(self): + self.master.killextra = not self.master.killextra + + def toggle_showhost(self): + self.master.showhost = not self.master.showhost + + def toggle_refresh_server_playback(self): + self.master.refresh_server_playback = not self.master.refresh_server_playback + + def toggle_upstream_cert(self): + self.master.server.config.no_upstream_cert = not self.master.server.config.no_upstream_cert + signals.update_settings.send(self) + + def setheaders(self): + def _set(*args, **kwargs): + self.master.setheaders.set(*args, **kwargs) + signals.update_settings.send(self) + self.master.view_grideditor( + grideditor.SetHeadersEditor( + self.master, + self.master.setheaders.get_specs(), + _set + ) + ) + + def ignorepatterns(self): + def _set(ignore): + self.master.set_ignore_filter(ignore) + signals.update_settings.send(self) + self.master.view_grideditor( + grideditor.HostPatternEditor( + self.master, + self.master.get_ignore_filter(), + _set + ) + ) + + def replacepatterns(self): + def _set(*args, **kwargs): + self.master.replacehooks.set(*args, **kwargs) + signals.update_settings.send(self) + self.master.view_grideditor( + grideditor.ReplaceEditor( + self.master, + self.master.replacehooks.get_specs(), + _set + ) + ) + + def scripts(self): + self.master.view_grideditor( + grideditor.ScriptEditor( + self.master, + [[i.command] for i in self.master.scripts], + self.master.edit_scripts + ) + ) + + def default_displaymode(self): + signals.status_prompt_onekey.send( + prompt = "Global default display mode", + keys = contentview.view_prompts, + callback = self.master.change_default_display_mode + ) + + def has_default_displaymode(self): + return self.master.state.default_body_view.name != "Auto" + + def tcp_proxy(self): + def _set(tcp): + self.master.set_tcp_filter(tcp) + signals.update_settings.send(self) + self.master.view_grideditor( + grideditor.HostPatternEditor( + self.master, + self.master.get_tcp_filter(), + _set + ) + ) + + def sticky_auth(self): + signals.status_prompt.send( + prompt = "Sticky auth filter", + text = self.master.stickyauth_txt, + callback = self.master.set_stickyauth + ) + + def sticky_cookie(self): + signals.status_prompt.send( + prompt = "Sticky cookie filter", + text = self.master.stickycookie_txt, + callback = self.master.set_stickycookie + ) + + def palette(self): + self.master.view_palette_picker() diff --git a/libmproxy/console/palettepicker.py b/libmproxy/console/palettepicker.py new file mode 100644 index 00000000..7e2c10cd --- /dev/null +++ b/libmproxy/console/palettepicker.py @@ -0,0 +1,81 @@ +import urwid + +from . import select, common, palettes, signals + +footer = [ + ('heading_key', "enter/space"), ":select", +] + + +def _mkhelp(): + text = [] + keys = [ + ("enter/space", "select"), + ] + text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + return text +help_context = _mkhelp() + + +class PalettePicker(urwid.WidgetWrap): + def __init__(self, master): + self.master = master + low, high = [], [] + for k, v in palettes.palettes.items(): + if v.high: + high.append(k) + else: + low.append(k) + high.sort() + low.sort() + + options = [ + select.Heading("High Colour") + ] + + def mkopt(name): + return select.Option( + i, + None, + lambda: self.master.palette == name, + lambda: self.select(name) + ) + + for i in high: + options.append(mkopt(i)) + options.append(select.Heading("Low Colour")) + for i in low: + options.append(mkopt(i)) + + options.extend( + [ + select.Heading("Options"), + select.Option( + "Transparent", + "T", + lambda: master.palette_transparent, + self.toggle_palette_transparent + ) + ] + ) + + self.lb = select.Select(options) + title = urwid.Text("Palettes") + title = urwid.Padding(title, align="left", width=("relative", 100)) + title = urwid.AttrWrap(title, "heading") + self._w = urwid.Frame( + self.lb, + header = title + ) + signals.update_settings.connect(self.sig_update_settings) + + def sig_update_settings(self, sender): + self.lb.walker._modified() + + def select(self, name): + self.master.set_palette(name) + + def toggle_palette_transparent(self): + self.master.palette_transparent = not self.master.palette_transparent + self.master.set_palette(self.master.palette) + signals.update_settings.send(self) diff --git a/libmproxy/console/palettes.py b/libmproxy/console/palettes.py index cfb2702c..d897a0a2 100644 --- a/libmproxy/console/palettes.py +++ b/libmproxy/console/palettes.py @@ -1,4 +1,3 @@ - # Low-color themes should ONLY use the standard foreground and background # colours listed here: # @@ -6,9 +5,9 @@ # - class Palette: _fields = [ + 'background', 'title', # Status bar & heading @@ -17,11 +16,15 @@ class Palette: # Help 'key', 'head', 'text', + # Options + 'option_selected', 'option_active', 'option_active_selected', + 'option_selected_key', + # List and Connections 'method', 'focus', 'code_200', 'code_300', 'code_400', 'code_500', 'code_other', 'error', - 'header', 'highlight', 'intercept', 'replay', + 'header', 'highlight', 'intercept', 'replay', 'mark', # Hex view 'offset', @@ -31,15 +34,33 @@ class Palette: ] high = None - def palette(self): + def palette(self, transparent): l = [] + highback, lowback = None, None + if not transparent: + if self.high and self.high.get("background"): + highback = self.high["background"][1] + lowback = self.low["background"][1] + for i in self._fields: - v = [i] - v.extend(self.low[i]) - if self.high and i in self.high: - v.append(None) - v.extend(self.high[i]) - l.append(tuple(v)) + if transparent and i == "background": + l.append(["background", "default", "default"]) + else: + v = [i] + low = list(self.low[i]) + if lowback and low[1] == "default": + low[1] = lowback + v.extend(low) + if self.high and i in self.high: + v.append(None) + high = list(self.high[i]) + if highback and high[1] == "default": + high[1] = highback + v.extend(high) + elif highback and self.low[i][1] == "default": + high = [None, low[0], highback] + v.extend(high) + l.append(tuple(v)) return l @@ -48,18 +69,25 @@ class LowDark(Palette): Low-color dark background """ low = dict( + background = ('white', 'black'), title = ('white,bold', 'default'), # Status bar & heading - heading = ('light gray', 'dark blue'), + heading = ('white', 'dark blue'), heading_key = ('light cyan', 'dark blue'), - heading_inactive = ('white', 'dark gray'), + heading_inactive = ('dark gray', 'light gray'), # Help key = ('light cyan', 'default'), head = ('white,bold', 'default'), text = ('light gray', 'default'), + # Options + option_selected = ('black', 'light gray'), + option_selected_key = ('light cyan', 'light gray'), + option_active = ('light red', 'default'), + option_active_selected = ('light red', 'light gray'), + # List and Connections method = ('dark cyan', 'default'), focus = ('yellow', 'default'), @@ -76,6 +104,7 @@ class LowDark(Palette): highlight = ('white,bold', 'default'), intercept = ('brown', 'default'), replay = ('light green', 'default'), + mark = ('light red', 'default'), # Hex view offset = ('dark cyan', 'default'), @@ -92,6 +121,10 @@ class Dark(LowDark): high = dict( heading_inactive = ('g58', 'g11'), intercept = ('#f60', 'default'), + + option_selected = ('g85', 'g45'), + option_selected_key = ('light cyan', 'g50'), + option_active_selected = ('light red', 'g50'), ) @@ -100,18 +133,25 @@ class LowLight(Palette): Low-color light background """ low = dict( - title = ('dark magenta,bold', 'light blue'), + background = ('black', 'white'), + title = ('dark magenta', 'default'), # Status bar & heading - heading = ('light gray', 'dark blue'), - heading_key = ('light cyan', 'dark blue'), + heading = ('white', 'black'), + heading_key = ('dark blue', 'black'), heading_inactive = ('black', 'light gray'), # Help - key = ('dark blue,bold', 'default'), - head = ('black,bold', 'default'), + key = ('dark blue', 'default'), + head = ('black', 'default'), text = ('dark gray', 'default'), + # Options + option_selected = ('black', 'light gray'), + option_selected_key = ('dark blue', 'light gray'), + option_active = ('light red', 'default'), + option_active_selected = ('light red', 'light gray'), + # List and Connections method = ('dark cyan', 'default'), focus = ('black', 'default'), @@ -128,6 +168,7 @@ class LowLight(Palette): highlight = ('black,bold', 'default'), intercept = ('brown', 'default'), replay = ('dark green', 'default'), + mark = ('dark red', 'default'), # Hex view offset = ('dark blue', 'default'), @@ -142,10 +183,15 @@ class LowLight(Palette): class Light(LowLight): high = dict( + background = ('black', 'g100'), heading = ('g99', '#08f'), heading_key = ('#0ff,bold', '#08f'), heading_inactive = ('g35', 'g85'), replay = ('#0a0,bold', 'default'), + + option_selected = ('black', 'g85'), + option_selected_key = ('dark blue', 'g85'), + option_active_selected = ('light red', 'g85'), ) @@ -155,10 +201,10 @@ sol_base03 = "h234" sol_base02 = "h235" sol_base01 = "h240" sol_base00 = "h241" -sol_base0 = "h244" -sol_base1 = "h245" -sol_base2 = "h254" -sol_base3 = "h230" +sol_base0 = "h244" +sol_base1 = "h245" +sol_base2 = "h254" +sol_base3 = "h230" sol_yellow = "h136" sol_orange = "h166" sol_red = "h160" @@ -167,9 +213,12 @@ sol_violet = "h61" sol_blue = "h33" sol_cyan = "h37" sol_green = "h64" + + class SolarizedLight(LowLight): high = dict( - title = (sol_blue, 'default'), + background = (sol_base00, sol_base3), + title = (sol_cyan, 'default'), text = (sol_base00, 'default'), # Status bar & heading @@ -181,6 +230,12 @@ class SolarizedLight(LowLight): key = (sol_blue, 'default',), head = (sol_base00, 'default'), + # Options + option_selected = (sol_base03, sol_base2), + option_selected_key = (sol_blue, sol_base2), + option_active = (sol_orange, 'default'), + option_active_selected = (sol_orange, sol_base2), + # List and Connections method = (sol_cyan, 'default'), focus = (sol_base01, 'default'), @@ -193,7 +248,7 @@ class SolarizedLight(LowLight): error = (sol_red, 'default'), - header = (sol_base01, 'default'), + header = (sol_blue, 'default'), highlight = (sol_base01, 'default'), intercept = (sol_red, 'default',), replay = (sol_green, 'default',), @@ -211,17 +266,24 @@ class SolarizedLight(LowLight): class SolarizedDark(LowDark): high = dict( + background = (sol_base2, sol_base03), title = (sol_blue, 'default'), - text = (sol_base0, 'default'), + text = (sol_base1, 'default'), # Status bar & heading - heading = (sol_base03, sol_base1), - heading_key = (sol_blue+",bold", sol_base1), + heading = (sol_base2, sol_base01), + heading_key = (sol_blue + ",bold", sol_base01), heading_inactive = (sol_base1, sol_base02), # Help key = (sol_blue, 'default',), - head = (sol_base00, 'default'), + head = (sol_base2, 'default'), + + # Options + option_selected = (sol_base03, sol_base00), + option_selected_key = (sol_blue, sol_base00), + option_active = (sol_orange, 'default'), + option_active_selected = (sol_orange, sol_base00), # List and Connections method = (sol_cyan, 'default'), @@ -235,7 +297,7 @@ class SolarizedDark(LowDark): error = (sol_red, 'default'), - header = (sol_base01, 'default'), + header = (sol_blue, 'default'), highlight = (sol_base01, 'default'), intercept = (sol_red, 'default',), replay = (sol_green, 'default',), @@ -251,6 +313,7 @@ class SolarizedDark(LowDark): ) +DEFAULT = "dark" palettes = { "lowlight": LowLight(), "lowdark": LowDark(), diff --git a/libmproxy/console/pathedit.py b/libmproxy/console/pathedit.py new file mode 100644 index 00000000..dccec14a --- /dev/null +++ b/libmproxy/console/pathedit.py @@ -0,0 +1,69 @@ +import glob +import os.path + +import urwid + + +class _PathCompleter: + def __init__(self, _testing=False): + """ + _testing: disables reloading of the lookup table to make testing + possible. + """ + self.lookup, self.offset = None, None + self.final = None + self._testing = _testing + + def reset(self): + self.lookup = None + self.offset = -1 + + def complete(self, txt): + """ + Returns the next completion for txt, or None if there is no + completion. + """ + path = os.path.expanduser(txt) + if not self.lookup: + if not self._testing: + # Lookup is a set of (display value, actual value) tuples. + self.lookup = [] + if os.path.isdir(path): + files = glob.glob(os.path.join(path, "*")) + prefix = txt + else: + files = glob.glob(path + "*") + prefix = os.path.dirname(txt) + prefix = prefix or "./" + for f in files: + display = os.path.join(prefix, os.path.basename(f)) + if os.path.isdir(f): + display += "/" + self.lookup.append((display, f)) + if not self.lookup: + self.final = path + return path + self.lookup.sort() + self.offset = -1 + self.lookup.append((txt, txt)) + self.offset += 1 + if self.offset >= len(self.lookup): + self.offset = 0 + ret = self.lookup[self.offset] + self.final = ret[1] + return ret[0] + + +class PathEdit(urwid.Edit, _PathCompleter): + def __init__(self, *args, **kwargs): + urwid.Edit.__init__(self, *args, **kwargs) + _PathCompleter.__init__(self) + + def keypress(self, size, key): + if key == "tab": + comp = self.complete(self.get_edit_text()) + self.set_edit_text(comp) + self.set_edit_pos(len(comp)) + else: + self.reset() + return urwid.Edit.keypress(self, size, key) diff --git a/libmproxy/console/searchable.py b/libmproxy/console/searchable.py new file mode 100644 index 00000000..627d595d --- /dev/null +++ b/libmproxy/console/searchable.py @@ -0,0 +1,91 @@ +import urwid + +from . import signals + + +class Highlight(urwid.AttrMap): + def __init__(self, t): + urwid.AttrMap.__init__( + self, + urwid.Text(t.text), + "focusfield", + ) + self.backup = t + + +class Searchable(urwid.ListBox): + def __init__(self, state, contents): + self.walker = urwid.SimpleFocusListWalker(contents) + urwid.ListBox.__init__(self, self.walker) + self.state = state + self.search_offset = 0 + self.current_highlight = None + self.search_term = None + + def keypress(self, size, key): + if key == "/": + signals.status_prompt.send( + prompt = "Search for", + text = "", + callback = self.set_search + ) + elif key == "n": + self.find_next(False) + elif key == "N": + self.find_next(True) + elif key == "G": + self.set_focus(0) + self.walker._modified() + elif key == "g": + self.set_focus(len(self.walker) - 1) + self.walker._modified() + else: + return super(self.__class__, self).keypress(size, key) + + def set_search(self, text): + self.state.last_search = text + self.search_term = text or None + self.find_next(False) + + def set_highlight(self, offset): + if self.current_highlight is not None: + old = self.body[self.current_highlight] + self.body[self.current_highlight] = old.backup + if offset is None: + self.current_highlight = None + else: + self.body[offset] = Highlight(self.body[offset]) + self.current_highlight = offset + + def get_text(self, w): + if isinstance(w, urwid.Text): + return w.text + elif isinstance(w, Highlight): + return w.backup.text + else: + return None + + def find_next(self, backwards): + if not self.search_term: + if self.state.last_search: + self.search_term = self.state.last_search + else: + self.set_highlight(None) + return + # Start search at focus + 1 + if backwards: + rng = xrange(len(self.body) - 1, -1, -1) + else: + rng = xrange(1, len(self.body) + 1) + for i in rng: + off = (self.focus_position + i) % len(self.body) + w = self.body[off] + txt = self.get_text(w) + if txt and self.search_term in txt: + self.set_highlight(off) + self.set_focus(off, coming_from="above") + self.body._modified() + return + else: + self.set_highlight(None) + signals.status_message.send(message="Search not found.", expire=1) diff --git a/libmproxy/console/select.py b/libmproxy/console/select.py new file mode 100644 index 00000000..bf96a785 --- /dev/null +++ b/libmproxy/console/select.py @@ -0,0 +1,115 @@ +import urwid + +from . import common + + +class _OptionWidget(urwid.WidgetWrap): + def __init__(self, option, text, shortcut, active, focus): + self.option = option + textattr = "text" + keyattr = "key" + if focus and active: + textattr = "option_active_selected" + keyattr = "option_selected_key" + elif focus: + textattr = "option_selected" + keyattr = "option_selected_key" + elif active: + textattr = "option_active" + if shortcut: + text = common.highlight_key( + text, + shortcut, + textattr = textattr, + keyattr = keyattr + ) + opt = urwid.Text(text, align="left") + opt = urwid.AttrWrap(opt, textattr) + opt = urwid.Padding(opt, align = "center", width = 40) + urwid.WidgetWrap.__init__(self, opt) + + def keypress(self, size, key): + return key + + def selectable(self): + return True + + +class OptionWalker(urwid.ListWalker): + def __init__(self, options): + urwid.ListWalker.__init__(self) + self.options = options + self.focus = 0 + + def set_focus(self, pos): + self.focus = pos + + def get_focus(self): + return self.options[self.focus].render(True), self.focus + + def get_next(self, pos): + if pos >= len(self.options) - 1: + return None, None + return self.options[pos + 1].render(False), pos + 1 + + def get_prev(self, pos): + if pos <= 0: + return None, None + return self.options[pos - 1].render(False), pos - 1 + + +class Heading: + def __init__(self, text): + self.text = text + + def render(self, focus): + opt = urwid.Text("\n" + self.text, align="left") + opt = urwid.AttrWrap(opt, "title") + opt = urwid.Padding(opt, align = "center", width = 40) + return opt + + +_neg = lambda: False + + +class Option: + def __init__(self, text, shortcut, getstate=None, activate=None): + self.text = text + self.shortcut = shortcut + self.getstate = getstate or _neg + self.activate = activate or _neg + + def render(self, focus): + return _OptionWidget( + self, + self.text, + self.shortcut, + self.getstate(), + focus) + + +class Select(urwid.ListBox): + def __init__(self, options): + self.walker = OptionWalker(options) + urwid.ListBox.__init__( + self, + self.walker + ) + self.options = options + self.keymap = {} + for i in options: + if hasattr(i, "shortcut") and i.shortcut: + if i.shortcut in self.keymap: + raise ValueError("Duplicate shortcut key: %s" % i.shortcut) + self.keymap[i.shortcut] = i + + def keypress(self, size, key): + if key == "enter" or key == " ": + self.get_focus()[0].option.activate() + return None + key = common.shortcuts(key) + if key in self.keymap: + self.keymap[key].activate() + self.set_focus(self.options.index(self.keymap[key])) + return None + return super(self.__class__, self).keypress(size, key) diff --git a/libmproxy/console/signals.py b/libmproxy/console/signals.py new file mode 100644 index 00000000..52f72d34 --- /dev/null +++ b/libmproxy/console/signals.py @@ -0,0 +1,41 @@ +import blinker + +# Show a status message in the action bar +sig_add_event = blinker.Signal() +def add_event(e, level): + sig_add_event.send( + None, + e=e, + level=level + ) + +# Show a status message in the action bar +status_message = blinker.Signal() + +# Prompt for input +status_prompt = blinker.Signal() + +# Prompt for a path +status_prompt_path = blinker.Signal() + +# Prompt for a single keystroke +status_prompt_onekey = blinker.Signal() + +# Call a callback in N seconds +call_in = blinker.Signal() + +# Focus the body, footer or header of the main window +focus = blinker.Signal() + +# Fired when settings change +update_settings = blinker.Signal() + +# Fired when a flow changes +flow_change = blinker.Signal() + +# Fired when the flow list or focus changes +flowlist_change = blinker.Signal() + +# Pop and push view state onto a stack +pop_view_state = blinker.Signal() +push_view_state = blinker.Signal() diff --git a/libmproxy/console/statusbar.py b/libmproxy/console/statusbar.py new file mode 100644 index 00000000..7eb2131b --- /dev/null +++ b/libmproxy/console/statusbar.py @@ -0,0 +1,254 @@ +import os.path + +import urwid + +import netlib.utils +from . import pathedit, signals, common +from .. import utils + + +class ActionBar(urwid.WidgetWrap): + def __init__(self): + urwid.WidgetWrap.__init__(self, None) + self.clear() + signals.status_message.connect(self.sig_message) + signals.status_prompt.connect(self.sig_prompt) + signals.status_prompt_path.connect(self.sig_path_prompt) + signals.status_prompt_onekey.connect(self.sig_prompt_onekey) + + self.last_path = "" + + self.prompting = False + self.onekey = False + self.pathprompt = False + + def sig_message(self, sender, message, expire=None): + w = urwid.Text(message) + self._w = w + if expire: + def cb(*args): + if w == self._w: + self.clear() + signals.call_in.send(seconds=expire, callback=cb) + + def prep_prompt(self, p): + return p.strip() + ": " + + def sig_prompt(self, sender, prompt, text, callback, args=()): + signals.focus.send(self, section="footer") + self._w = urwid.Edit(self.prep_prompt(prompt), text or "") + self.prompting = (callback, args) + + def sig_path_prompt(self, sender, prompt, callback, args=()): + signals.focus.send(self, section="footer") + self._w = pathedit.PathEdit( + self.prep_prompt(prompt), + os.path.dirname(self.last_path) + ) + self.pathprompt = True + self.prompting = (callback, args) + + def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()): + """ + Keys are a set of (word, key) tuples. The appropriate key in the + word is highlighted. + """ + signals.focus.send(self, section="footer") + prompt = [prompt, " ("] + mkup = [] + for i, e in enumerate(keys): + mkup.extend(common.highlight_key(e[0], e[1])) + if i < len(keys) - 1: + mkup.append(",") + prompt.extend(mkup) + prompt.append(")? ") + self.onekey = set(i[1] for i in keys) + self._w = urwid.Edit(prompt, "") + self.prompting = (callback, args) + + def selectable(self): + return True + + def keypress(self, size, k): + if self.prompting: + if k == "esc": + self.prompt_done() + elif self.onekey: + if k == "enter": + self.prompt_done() + elif k in self.onekey: + self.prompt_execute(k) + elif k == "enter": + self.prompt_execute(self._w.get_edit_text()) + else: + if common.is_keypress(k): + self._w.keypress(size, k) + else: + return k + + def clear(self): + self._w = urwid.Text("") + + def prompt_done(self): + self.prompting = False + self.onekey = False + self.pathprompt = False + signals.status_message.send(message="") + signals.focus.send(self, section="body") + + def prompt_execute(self, txt): + if self.pathprompt: + self.last_path = txt + p, args = self.prompting + self.prompt_done() + msg = p(txt, *args) + if msg: + signals.status_message.send(message=msg, expire=1) + + +class StatusBar(urwid.WidgetWrap): + def __init__(self, master, helptext): + self.master, self.helptext = master, helptext + self.ab = ActionBar() + self.ib = urwid.WidgetWrap(urwid.Text("")) + self._w = urwid.Pile([self.ib, self.ab]) + signals.update_settings.connect(self.sig_update_settings) + signals.flowlist_change.connect(self.sig_update_settings) + self.redraw() + + def sig_update_settings(self, sender): + self.redraw() + + def keypress(self, *args, **kwargs): + return self.ab.keypress(*args, **kwargs) + + def get_status(self): + r = [] + + if self.master.setheaders.count(): + r.append("[") + r.append(("heading_key", "H")) + r.append("eaders]") + if self.master.replacehooks.count(): + r.append("[") + r.append(("heading_key", "R")) + r.append("eplacing]") + if self.master.client_playback: + r.append("[") + r.append(("heading_key", "cplayback")) + r.append(":%s to go]" % self.master.client_playback.count()) + if self.master.server_playback: + r.append("[") + r.append(("heading_key", "splayback")) + if self.master.nopop: + r.append(":%s in file]" % self.master.server_playback.count()) + else: + r.append(":%s to go]" % self.master.server_playback.count()) + if self.master.get_ignore_filter(): + r.append("[") + r.append(("heading_key", "I")) + r.append("gnore:%d]" % len(self.master.get_ignore_filter())) + if self.master.get_tcp_filter(): + r.append("[") + r.append(("heading_key", "T")) + r.append("CP:%d]" % len(self.master.get_tcp_filter())) + if self.master.state.intercept_txt: + r.append("[") + r.append(("heading_key", "i")) + r.append(":%s]" % self.master.state.intercept_txt) + if self.master.state.limit_txt: + r.append("[") + r.append(("heading_key", "l")) + r.append(":%s]" % self.master.state.limit_txt) + if self.master.stickycookie_txt: + r.append("[") + r.append(("heading_key", "t")) + r.append(":%s]" % self.master.stickycookie_txt) + if self.master.stickyauth_txt: + r.append("[") + r.append(("heading_key", "u")) + r.append(":%s]" % self.master.stickyauth_txt) + if self.master.state.default_body_view.name != "Auto": + r.append("[") + r.append(("heading_key", "M")) + r.append(":%s]" % self.master.state.default_body_view.name) + + opts = [] + if self.master.anticache: + opts.append("anticache") + if self.master.anticomp: + opts.append("anticomp") + if self.master.showhost: + opts.append("showhost") + if not self.master.refresh_server_playback: + opts.append("norefresh") + if self.master.killextra: + opts.append("killextra") + if self.master.server.config.no_upstream_cert: + opts.append("no-upstream-cert") + if self.master.state.follow_focus: + opts.append("following") + if self.master.stream_large_bodies: + opts.append( + "stream:%s" % netlib.utils.pretty_size( + self.master.stream_large_bodies.max_size + ) + ) + + if opts: + r.append("[%s]" % (":".join(opts))) + + if self.master.server.config.mode in ["reverse", "upstream"]: + dst = self.master.server.config.mode.dst + scheme = "https" if dst[0] else "http" + if dst[1] != dst[0]: + scheme += "2https" if dst[1] else "http" + r.append("[dest:%s]" % utils.unparse_url(scheme, *dst[2:])) + if self.master.scripts: + r.append("[") + r.append(("heading_key", "s")) + r.append("cripts:%s]" % len(self.master.scripts)) + # r.append("[lt:%0.3f]"%self.master.looptime) + + if self.master.stream: + r.append("[W:%s]" % self.master.stream_path) + + return r + + def redraw(self): + fc = self.master.state.flow_count() + if self.master.state.focus is None: + offset = 0 + else: + offset = min(self.master.state.focus + 1, fc) + t = [ + ('heading', ("[%s/%s]" % (offset, fc)).ljust(9)) + ] + + if self.master.server.bound: + host = self.master.server.address.host + if host == "0.0.0.0": + host = "*" + boundaddr = "[%s:%s]" % (host, self.master.server.address.port) + else: + boundaddr = "" + t.extend(self.get_status()) + status = urwid.AttrWrap(urwid.Columns([ + urwid.Text(t), + urwid.Text( + [ + self.helptext, + boundaddr + ], + align="right" + ), + ]), "heading") + self.ib._w = status + + def update(self, text): + self.helptext = text + self.redraw() + self.master.loop.draw_screen() + + def selectable(self): + return True diff --git a/libmproxy/console/tabs.py b/libmproxy/console/tabs.py new file mode 100644 index 00000000..fea9bbde --- /dev/null +++ b/libmproxy/console/tabs.py @@ -0,0 +1,69 @@ +import urwid +import signals + + +class Tab(urwid.WidgetWrap): + def __init__(self, offset, content, attr, onclick): + """ + onclick is called on click with the tab offset as argument + """ + p = urwid.Text(content, align="center") + p = urwid.Padding(p, align="center", width=("relative", 100)) + p = urwid.AttrWrap(p, attr) + urwid.WidgetWrap.__init__(self, p) + self.offset = offset + self.onclick = onclick + + def mouse_event(self, size, event, button, col, row, focus): + if event == "mouse press" and button == 1: + self.onclick(self.offset) + return True + + +class Tabs(urwid.WidgetWrap): + def __init__(self, tabs, tab_offset=0): + urwid.WidgetWrap.__init__(self, "") + self.tab_offset = tab_offset + self.tabs = tabs + self.show() + + def change_tab(self, offset): + self.tab_offset = offset + self.show() + + def keypress(self, size, key): + n = len(self.tabs) + if key in ["tab", "l"]: + self.change_tab((self.tab_offset + 1) % n) + elif key == "h": + self.change_tab((self.tab_offset - 1) % n) + return self._w.keypress(size, key) + + def show(self): + headers = [] + for i in range(len(self.tabs)): + txt = self.tabs[i][0]() + if i == self.tab_offset: + headers.append( + Tab( + i, + txt, + "heading", + self.change_tab + ) + ) + else: + headers.append( + Tab( + i, + txt, + "heading_inactive", + self.change_tab + ) + ) + headers = urwid.Columns(headers, dividechars=1) + self._w = urwid.Frame( + body = self.tabs[self.tab_offset][1](), + header = headers + ) + self._w.set_focus("body") diff --git a/libmproxy/console/window.py b/libmproxy/console/window.py new file mode 100644 index 00000000..8754ed57 --- /dev/null +++ b/libmproxy/console/window.py @@ -0,0 +1,89 @@ +import urwid +from . import signals + + +class Window(urwid.Frame): + def __init__(self, master, body, header, footer, helpctx): + urwid.Frame.__init__( + self, + urwid.AttrWrap(body, "background"), + header = urwid.AttrWrap(header, "background") if header else None, + footer = urwid.AttrWrap(footer, "background") if footer else None + ) + self.master = master + self.helpctx = helpctx + signals.focus.connect(self.sig_focus) + + def sig_focus(self, sender, section): + self.focus_position = section + + def mouse_event(self, *args, **kwargs): + # args: (size, event, button, col, row) + k = super(self.__class__, self).mouse_event(*args, **kwargs) + if not k: + if args[1] == "mouse drag": + signals.status_message.send( + message = "Hold down alt or ctrl to select text.", + expire = 1 + ) + elif args[1] == "mouse press" and args[2] == 4: + self.keypress(args[0], "up") + elif args[1] == "mouse press" and args[2] == 5: + self.keypress(args[0], "down") + else: + return False + return True + + def keypress(self, size, k): + k = super(self.__class__, self).keypress(size, k) + if k == "?": + self.master.view_help(self.helpctx) + elif k == "c": + if not self.master.client_playback: + signals.status_prompt_path.send( + self, + prompt = "Client replay", + callback = self.master.client_playback_path + ) + else: + signals.status_prompt_onekey.send( + self, + prompt = "Stop current client replay?", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = self.master.stop_client_playback_prompt, + ) + elif k == "i": + signals.status_prompt.send( + self, + prompt = "Intercept filter", + text = self.master.state.intercept_txt, + callback = self.master.set_intercept + ) + elif k == "o": + self.master.view_options() + elif k == "Q": + raise urwid.ExitMainLoop + elif k == "q": + signals.pop_view_state.send(self) + elif k == "S": + if not self.master.server_playback: + signals.status_prompt_path.send( + self, + prompt = "Server replay path", + callback = self.master.server_playback_path + ) + else: + signals.status_prompt_onekey.send( + self, + prompt = "Stop current server replay?", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = self.master.stop_server_playback_prompt, + ) + else: + return k |