diff options
Diffstat (limited to 'libmproxy/console')
-rw-r--r-- | libmproxy/console/__init__.py | 718 | ||||
-rw-r--r-- | libmproxy/console/common.py | 135 | ||||
-rw-r--r-- | libmproxy/console/contentview.py | 90 | ||||
-rw-r--r-- | libmproxy/console/flowdetailview.py | 11 | ||||
-rw-r--r-- | libmproxy/console/flowlist.py | 150 | ||||
-rw-r--r-- | libmproxy/console/flowview.py | 413 | ||||
-rw-r--r-- | libmproxy/console/grideditor.py | 117 | ||||
-rw-r--r-- | libmproxy/console/help.py | 21 | ||||
-rw-r--r-- | libmproxy/console/palettes.py | 447 | ||||
-rw-r--r-- | libmproxy/console/pathedit.py | 69 | ||||
-rw-r--r-- | libmproxy/console/signals.py | 30 | ||||
-rw-r--r-- | libmproxy/console/statusbar.py | 254 | ||||
-rw-r--r-- | libmproxy/console/window.py | 142 |
13 files changed, 1465 insertions, 1132 deletions
diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py index 0db06832..9375f973 100644 --- a/libmproxy/console/__init__.py +++ b/libmproxy/console/__init__.py @@ -1,259 +1,25 @@ from __future__ import absolute_import -import mailcap, mimetypes, tempfile, os, subprocess, glob, time, shlex, stat -import os.path, sys, weakref, traceback -import urwid -from .. import controller, utils, flow, script, proxy -from . import flowlist, flowview, help, common, grideditor, palettes, contentview, flowdetailview - -EVENTLOG_SIZE = 500 - - -class Stop(Exception): pass - - -class _PathCompleter: - def __init__(self, _testing=False): - """ - _testing: disables reloading of the lookup table to make testing possible. - """ - self.lookup, self.offset = None, None - self.final = None - self._testing = _testing - def reset(self): - self.lookup = None - self.offset = -1 +import mailcap +import mimetypes +import tempfile +import os +import os.path +import shlex +import signal +import stat +import subprocess +import sys +import traceback +import urwid +import weakref - def complete(self, txt): - """ - Returns the next completion for txt, or None if there is no completion. - """ - path = os.path.expanduser(txt) - if not self.lookup: - if not self._testing: - # Lookup is a set of (display value, actual value) tuples. - self.lookup = [] - if os.path.isdir(path): - files = glob.glob(os.path.join(path, "*")) - prefix = txt - else: - files = glob.glob(path+"*") - prefix = os.path.dirname(txt) - prefix = prefix or "./" - for f in files: - display = os.path.join(prefix, os.path.basename(f)) - if os.path.isdir(f): - display += "/" - self.lookup.append((display, f)) - if not self.lookup: - self.final = path - return path - self.lookup.sort() - self.offset = -1 - self.lookup.append((txt, txt)) - self.offset += 1 - if self.offset >= len(self.lookup): - self.offset = 0 - ret = self.lookup[self.offset] - self.final = ret[1] - return ret[0] - -#begin nocover - -class PathEdit(urwid.Edit, _PathCompleter): - def __init__(self, *args, **kwargs): - urwid.Edit.__init__(self, *args, **kwargs) - _PathCompleter.__init__(self) - - def keypress(self, size, key): - if key == "tab": - comp = self.complete(self.get_edit_text()) - self.set_edit_text(comp) - self.set_edit_pos(len(comp)) - else: - self.reset() - return urwid.Edit.keypress(self, size, key) +from .. import controller, flow, script +from . import flowlist, flowview, help, common, window, signals +from . import grideditor, palettes, contentview, flowdetailview, statusbar +EVENTLOG_SIZE = 500 -class ActionBar(common.WWrap): - def __init__(self): - self.message("") - - def selectable(self): - return True - - def path_prompt(self, prompt, text): - self.expire = None - self.w = PathEdit(prompt, text) - - def prompt(self, prompt, text = ""): - self.expire = None - # A (partial) workaround for this Urwid issue: - # https://github.com/Nic0/tyrs/issues/115 - # We can remove it once veryone is beyond 1.0.1 - if isinstance(prompt, basestring): - prompt = unicode(prompt) - self.w = urwid.Edit(prompt, text or "") - - def message(self, message, expire=None): - self.expire = expire - self.w = urwid.Text(message) - - -class StatusBar(common.WWrap): - def __init__(self, master, helptext): - self.master, self.helptext = master, helptext - self.ab = ActionBar() - self.ib = common.WWrap(urwid.Text("")) - self.w = urwid.Pile([self.ib, self.ab]) - - def get_status(self): - r = [] - - if self.master.setheaders.count(): - r.append("[") - r.append(("heading_key", "H")) - r.append("eaders]") - if self.master.replacehooks.count(): - r.append("[") - r.append(("heading_key", "R")) - r.append("eplacing]") - if self.master.client_playback: - r.append("[") - r.append(("heading_key", "cplayback")) - r.append(":%s to go]"%self.master.client_playback.count()) - if self.master.server_playback: - r.append("[") - r.append(("heading_key", "splayback")) - if self.master.nopop: - r.append(":%s in file]"%self.master.server_playback.count()) - else: - r.append(":%s to go]"%self.master.server_playback.count()) - if self.master.get_ignore_filter(): - r.append("[") - r.append(("heading_key", "I")) - r.append("gnore:%d]" % len(self.master.get_ignore_filter())) - if self.master.get_tcp_filter(): - r.append("[") - r.append(("heading_key", "T")) - r.append("CP:%d]" % len(self.master.get_tcp_filter())) - if self.master.state.intercept_txt: - r.append("[") - r.append(("heading_key", "i")) - r.append(":%s]"%self.master.state.intercept_txt) - if self.master.state.limit_txt: - r.append("[") - r.append(("heading_key", "l")) - r.append(":%s]"%self.master.state.limit_txt) - if self.master.stickycookie_txt: - r.append("[") - r.append(("heading_key", "t")) - r.append(":%s]"%self.master.stickycookie_txt) - if self.master.stickyauth_txt: - r.append("[") - r.append(("heading_key", "u")) - r.append(":%s]"%self.master.stickyauth_txt) - if self.master.state.default_body_view.name != "Auto": - r.append("[") - r.append(("heading_key", "M")) - r.append(":%s]"%self.master.state.default_body_view.name) - - opts = [] - if self.master.anticache: - opts.append("anticache") - if self.master.anticomp: - opts.append("anticomp") - if self.master.showhost: - opts.append("showhost") - if not self.master.refresh_server_playback: - opts.append("norefresh") - if self.master.killextra: - opts.append("killextra") - if self.master.server.config.no_upstream_cert: - opts.append("no-upstream-cert") - if self.master.state.follow_focus: - opts.append("following") - if self.master.stream_large_bodies: - opts.append("stream:%s" % utils.pretty_size(self.master.stream_large_bodies.max_size)) - - if opts: - r.append("[%s]"%(":".join(opts))) - - if self.master.server.config.mode in ["reverse", "upstream"]: - dst = self.master.server.config.mode.dst - scheme = "https" if dst[0] else "http" - if dst[1] != dst[0]: - scheme += "2https" if dst[1] else "http" - r.append("[dest:%s]"%utils.unparse_url(scheme, *dst[2:])) - if self.master.scripts: - r.append("[") - r.append(("heading_key", "s")) - r.append("cripts:%s]"%len(self.master.scripts)) - # r.append("[lt:%0.3f]"%self.master.looptime) - - if self.master.stream: - r.append("[W:%s]"%self.master.stream_path) - - return r - - def redraw(self): - if self.ab.expire and time.time() > self.ab.expire: - self.message("") - - fc = self.master.state.flow_count() - if self.master.state.focus is None: - offset = 0 - else: - offset = min(self.master.state.focus + 1, fc) - t = [ - ('heading', ("[%s/%s]"%(offset, fc)).ljust(9)) - ] - - if self.master.server.bound: - host = self.master.server.address.host - if host == "0.0.0.0": - host = "*" - boundaddr = "[%s:%s]"%(host, self.master.server.address.port) - else: - boundaddr = "" - t.extend(self.get_status()) - status = urwid.AttrWrap(urwid.Columns([ - urwid.Text(t), - urwid.Text( - [ - self.helptext, - boundaddr - ], - align="right" - ), - ]), "heading") - self.ib.set_w(status) - - def update(self, text): - self.helptext = text - self.redraw() - self.master.drawscreen() - - def selectable(self): - return True - - def get_edit_text(self): - return self.ab.w.get_edit_text() - - def path_prompt(self, prompt, text): - return self.ab.path_prompt(prompt, text) - - def prompt(self, prompt, text = ""): - self.ab.prompt(prompt, text) - - def message(self, msg, expire=None): - if expire: - expire = time.time() + float(expire)/1000 - self.ab.message(msg, expire) - self.master.drawscreen() - - -#end nocover class ConsoleState(flow.State): def __init__(self): @@ -262,13 +28,14 @@ class ConsoleState(flow.State): self.follow_focus = None self.default_body_view = contentview.get("Auto") - self.view_mode = common.VIEW_LIST self.view_flow_mode = common.VIEW_FLOW_REQUEST - self.last_script = "" - self.last_saveload = "" self.flowsettings = weakref.WeakKeyDictionary() + def __setattr__(self, name, value): + self.__dict__[name] = value + signals.update_settings.send(self) + def add_flow_setting(self, flow, key, value): d = self.flowsettings.setdefault(flow, {}) d[key] = value @@ -337,7 +104,6 @@ class ConsoleState(flow.State): super(ConsoleState, self).clear() - class Options(object): attributes = [ "app", @@ -367,6 +133,7 @@ class Options(object): "nopop", "palette", ] + def __init__(self, **kwargs): for k, v in kwargs.items(): setattr(self, k, v) @@ -375,14 +142,11 @@ class Options(object): setattr(self, i, None) -#begin nocover - - class ConsoleMaster(flow.FlowMaster): palette = [] + def __init__(self, server, options): flow.FlowMaster.__init__(self, server, ConsoleState()) - self.looptime = 0 self.stream_path = None self.options = options @@ -423,8 +187,6 @@ class ConsoleMaster(flow.FlowMaster): self.eventlog = options.eventlog self.eventlist = urwid.SimpleListWalker([]) - self.statusbar = None - if options.client_replay: self.client_playback_path(options.client_replay) @@ -439,13 +201,37 @@ class ConsoleMaster(flow.FlowMaster): sys.exit(1) if options.outfile: - err = self.start_stream_to_path(options.outfile[0], options.outfile[1]) + err = self.start_stream_to_path( + options.outfile[0], + options.outfile[1] + ) if err: print >> sys.stderr, "Stream file error:", err sys.exit(1) + self.view_stack = [] + if options.app: self.start_app(self.options.app_host, self.options.app_port) + signals.call_in.connect(self.sig_call_in) + signals.pop_view_state.connect(self.sig_pop_view_state) + signals.push_view_state.connect(self.sig_push_view_state) + + def __setattr__(self, name, value): + self.__dict__[name] = value + signals.update_settings.send(self) + + def sig_call_in(self, sender, seconds, callback, args=()): + def cb(*_): + return callback(*args) + self.loop.set_alarm_in(seconds, cb) + + def sig_pop_view_state(self, sender): + if self.view_stack: + self.loop.widget = self.view_stack.pop() + + def sig_push_view_state(self, sender): + self.view_stack.append(self.loop.widget) def start_stream_to_path(self, path, mode="wb"): path = os.path.expanduser(path) @@ -472,7 +258,9 @@ class ConsoleMaster(flow.FlowMaster): try: s = script.Script(command, self) except script.ScriptError, v: - self.statusbar.message("Error loading script.") + signals.status_message.send( + message = "Error loading script." + ) self.add_event("Error loading script:\n%s"%v.args[0], "error") return @@ -483,22 +271,20 @@ class ConsoleMaster(flow.FlowMaster): if f.error: self._run_script_method("error", s, f) s.unload() - self.refresh_flow(f) - self.state.last_script = command + signals.flow_change.send(self, flow = f) def set_script(self, command): if not command: return ret = self.load_script(command) if ret: - self.statusbar.message(ret) - self.state.last_script = command + signals.status_message.send(message=ret) def toggle_eventlog(self): self.eventlog = not self.eventlog self.view_flowlist() - def _readflow(self, paths): + def _readflows(self, path): """ Utitility function that reads a list of flows or prints an error to the UI if that fails. @@ -507,28 +293,30 @@ class ConsoleMaster(flow.FlowMaster): - a list of flows, otherwise. """ try: - return flow.read_flows_from_paths(paths) + return flow.read_flows_from_paths(path) except flow.FlowReadError as e: - if not self.statusbar: - print >> sys.stderr, e.strerror - sys.exit(1) - else: - self.statusbar.message(e.strerror) - return None + signals.status_message.send(message=e.strerror) def client_playback_path(self, path): - flows = self._readflow(path) + if not isinstance(path, list): + path = [path] + flows = self._readflows(path) if flows: self.start_client_playback(flows, False) def server_playback_path(self, path): - flows = self._readflow(path) + if not isinstance(path, list): + path = [path] + flows = self._readflows(path) if flows: self.start_server_playback( flows, self.killextra, self.rheaders, False, self.nopop, - self.options.replay_ignore_params, self.options.replay_ignore_content, self.options.replay_ignore_payload_params + self.options.replay_ignore_params, + self.options.replay_ignore_content, + self.options.replay_ignore_payload_params, + self.options.replay_ignore_host ) def spawn_editor(self, data): @@ -545,9 +333,11 @@ class ConsoleMaster(flow.FlowMaster): try: subprocess.call(cmd) except: - self.statusbar.message("Can't start editor: %s" % " ".join(c)) + signals.status_message.send( + message = "Can't start editor: %s" % " ".join(c) + ) else: - data = open(name,"rb").read() + data = open(name, "rb").read() self.ui.start() os.unlink(name) return data @@ -584,27 +374,32 @@ class ConsoleMaster(flow.FlowMaster): try: subprocess.call(cmd, shell=shell) except: - self.statusbar.message("Can't start external viewer: %s" % " ".join(c)) + signals.status_message.send( + message="Can't start external viewer: %s" % " ".join(c) + ) self.ui.start() os.unlink(name) def set_palette(self, name): self.palette = palettes.palettes[name] + def ticker(self, *userdata): + changed = self.tick(self.masterq, timeout=0) + if changed: + self.loop.draw_screen() + signals.update_settings.send() + self.loop.set_alarm_in(0.01, self.ticker) + def run(self): self.ui = urwid.raw_display.Screen() self.ui.set_terminal_properties(256) - self.ui.register_palette(self.palette) + self.ui.register_palette(self.palette.palette()) self.flow_list_walker = flowlist.FlowListWalker(self, self.state) - self.view = None - self.statusbar = None - self.header = None - self.body = None self.help_context = None - self.prompting = False - self.onekey = False - - self.view_flowlist() + self.loop = urwid.MainLoop( + urwid.SolidFill("x"), + screen = self.ui, + ) self.server.start_slave( controller.Slave, @@ -624,46 +419,60 @@ class ConsoleMaster(flow.FlowMaster): print >> sys.stderr, "Could not load file:", ret sys.exit(1) + self.loop.set_alarm_in(0.01, self.ticker) + + # It's not clear why we need to handle this explicitly - without this, + # mitmproxy hangs on keyboard interrupt. Remove if we ever figure it + # out. + def exit(s, f): + raise urwid.ExitMainLoop + signal.signal(signal.SIGINT, exit) + + self.loop.set_alarm_in( + 0.0001, + lambda *args: self.view_flowlist() + ) + try: - self.ui.run_wrapper(self.loop) + self.loop.run() except Exception: - self.ui.stop() + self.loop.stop() sys.stdout.flush() print >> sys.stderr, traceback.format_exc() print >> sys.stderr, "mitmproxy has crashed!" - print >> sys.stderr, "Please lodge a bug report at: https://github.com/mitmproxy/mitmproxy" - print >> sys.stderr, "Shutting down..." + print >> sys.stderr, "Please lodge a bug report at:" + print >> sys.stderr, "\thttps://github.com/mitmproxy/mitmproxy" + print >> sys.stderr, "Shutting down..." sys.stderr.flush() self.shutdown() - def make_view(self): - self.view = urwid.Frame( - self.body, - header = self.header, - footer = self.statusbar - ) - self.view.set_focus("body") - def view_help(self): - h = help.HelpView(self, self.help_context, (self.statusbar, self.body, self.header)) - self.statusbar = StatusBar(self, help.footer) - self.body = h - self.header = None - self.make_view() + signals.push_view_state.send(self) + self.loop.widget = window.Window( + self, + help.HelpView(self.help_context), + None, + statusbar.StatusBar(self, help.footer) + ) def view_flowdetails(self, flow): - h = flowdetailview.FlowDetailsView(self, flow, (self.statusbar, self.body, self.header)) - self.statusbar = StatusBar(self, flowdetailview.footer) - self.body = h - self.header = None - self.make_view() + signals.push_view_state.send(self) + self.loop.widget = window.Window( + self, + flowdetailview.FlowDetailsView(low), + None, + statusbar.StatusBar(self, flowdetailview.footer) + ) def view_grideditor(self, ge): - self.body = ge - self.header = None + signals.push_view_state.send(self) self.help_context = ge.make_help() - self.statusbar = StatusBar(self, grideditor.footer) - self.make_view() + self.loop.widget = window.Window( + self, + ge, + None, + statusbar.StatusBar(self, grideditor.FOOTER) + ) def view_flowlist(self): if self.ui.started: @@ -672,27 +481,31 @@ class ConsoleMaster(flow.FlowMaster): self.state.set_focus(self.state.flow_count()) if self.eventlog: - self.body = flowlist.BodyPile(self) + body = flowlist.BodyPile(self) else: - self.body = flowlist.FlowListBox(self) - self.statusbar = StatusBar(self, flowlist.footer) - self.header = None - self.state.view_mode = common.VIEW_LIST + body = flowlist.FlowListBox(self) - self.make_view() self.help_context = flowlist.help_context + self.loop.widget = window.Window( + self, + body, + None, + statusbar.StatusBar(self, flowlist.footer) + ) + self.loop.draw_screen() def view_flow(self, flow): - self.body = flowview.FlowView(self, self.state, flow) - self.header = flowview.FlowViewHeader(self, flow) - self.statusbar = StatusBar(self, flowview.footer) + signals.push_view_state.send(self) self.state.set_focus_flow(flow) - self.state.view_mode = common.VIEW_FLOW - self.make_view() self.help_context = flowview.help_context + self.loop.widget = window.Window( + self, + flowview.FlowView(self, self.state, flow), + flowview.FlowViewHeader(self, flow), + statusbar.StatusBar(self, flowview.footer) + ) def _write_flows(self, path, flows): - self.state.last_saveload = path if not path: return path = os.path.expanduser(path) @@ -703,7 +516,7 @@ class ConsoleMaster(flow.FlowMaster): fw.add(i) f.close() except IOError, v: - self.statusbar.message(v.strerror) + signals.status_message.send(message=v.strerror) def save_one_flow(self, path, flow): return self._write_flows(path, [flow]) @@ -718,7 +531,6 @@ class ConsoleMaster(flow.FlowMaster): return ret or "Flows loaded from %s"%path def load_flows_path(self, path): - self.state.last_saveload = path reterr = None try: flow.FlowMaster.load_flows_file(self, path) @@ -728,55 +540,6 @@ class ConsoleMaster(flow.FlowMaster): self.sync_list_view() return reterr - def path_prompt(self, prompt, text, callback, *args): - self.statusbar.path_prompt(prompt, text) - self.view.set_focus("footer") - self.prompting = (callback, args) - - def prompt(self, prompt, text, callback, *args): - self.statusbar.prompt(prompt, text) - self.view.set_focus("footer") - self.prompting = (callback, args) - - def prompt_edit(self, prompt, text, callback): - self.statusbar.prompt(prompt + ": ", text) - self.view.set_focus("footer") - self.prompting = (callback, []) - - def prompt_onekey(self, prompt, keys, callback, *args): - """ - Keys are a set of (word, key) tuples. The appropriate key in the - word is highlighted. - """ - prompt = [prompt, " ("] - mkup = [] - for i, e in enumerate(keys): - mkup.extend(common.highlight_key(e[0], e[1])) - if i < len(keys)-1: - mkup.append(",") - prompt.extend(mkup) - prompt.append(")? ") - self.onekey = "".join(i[1] for i in keys) - self.prompt(prompt, "", callback, *args) - - def prompt_done(self): - self.prompting = False - self.onekey = False - self.view.set_focus("body") - self.statusbar.message("") - - def prompt_execute(self, txt=None): - if not txt: - txt = self.statusbar.get_edit_text() - p, args = self.prompting - self.prompt_done() - msg = p(txt, *args) - if msg: - self.statusbar.message(msg, 1000) - - def prompt_cancel(self): - self.prompt_done() - def accept_all(self): self.state.accept_all(self) @@ -793,18 +556,6 @@ class ConsoleMaster(flow.FlowMaster): self.state.default_body_view = v self.refresh_focus() - def drawscreen(self): - size = self.ui.get_cols_rows() - canvas = self.view.render(size, focus=1) - self.ui.draw_screen(size, canvas) - return size - - def pop_view(self): - if self.state.view_mode == common.VIEW_FLOW: - self.view_flow(self.state.view[self.state.focus]) - else: - self.view_flowlist() - def edit_scripts(self, scripts): commands = [x[0] for x in scripts] # remove outer array if commands == [s.command for s in self.scripts]: @@ -822,170 +573,6 @@ class ConsoleMaster(flow.FlowMaster): patterns = (x[0] for x in tcp) self.set_tcp_filter(patterns) - def loop(self): - changed = True - try: - while not self.should_exit.is_set(): - startloop = time.time() - if changed: - self.statusbar.redraw() - size = self.drawscreen() - changed = self.tick(self.masterq, timeout=0.1) - self.ui.set_input_timeouts(max_wait=0) - keys = self.ui.get_input() - if keys: - changed = True - for k in keys: - if self.prompting: - if k == "esc": - self.prompt_cancel() - elif self.onekey: - if k == "enter": - self.prompt_cancel() - elif k in self.onekey: - self.prompt_execute(k) - elif k == "enter": - self.prompt_execute() - else: - self.view.keypress(size, k) - else: - k = self.view.keypress(size, k) - if k: - self.statusbar.message("") - if k == "?": - self.view_help() - elif k == "c": - if not self.client_playback: - self.path_prompt( - "Client replay: ", - self.state.last_saveload, - self.client_playback_path - ) - else: - self.prompt_onekey( - "Stop current client replay?", - ( - ("yes", "y"), - ("no", "n"), - ), - self.stop_client_playback_prompt, - ) - elif k == "H": - self.view_grideditor( - grideditor.SetHeadersEditor( - self, - self.setheaders.get_specs(), - self.setheaders.set - ) - ) - elif k == "I": - self.view_grideditor( - grideditor.HostPatternEditor( - self, - [[x] for x in self.get_ignore_filter()], - self.edit_ignore_filter - ) - ) - elif k == "T": - self.view_grideditor( - grideditor.HostPatternEditor( - self, - [[x] for x in self.get_tcp_filter()], - self.edit_tcp_filter - ) - ) - elif k == "i": - self.prompt( - "Intercept filter: ", - self.state.intercept_txt, - self.set_intercept - ) - elif k == "Q": - raise Stop - elif k == "q": - self.prompt_onekey( - "Quit", - ( - ("yes", "y"), - ("no", "n"), - ), - self.quit, - ) - elif k == "M": - self.prompt_onekey( - "Global default display mode", - contentview.view_prompts, - self.change_default_display_mode - ) - elif k == "R": - self.view_grideditor( - grideditor.ReplaceEditor( - self, - self.replacehooks.get_specs(), - self.replacehooks.set - ) - ) - elif k == "s": - self.view_grideditor( - grideditor.ScriptEditor( - self, - [[i.command] for i in self.scripts], - self.edit_scripts - ) - ) - #if self.scripts: - # self.load_script(None) - #else: - # self.path_prompt( - # "Set script: ", - # self.state.last_script, - # self.set_script - # ) - elif k == "S": - if not self.server_playback: - self.path_prompt( - "Server replay path: ", - self.state.last_saveload, - self.server_playback_path - ) - else: - self.prompt_onekey( - "Stop current server replay?", - ( - ("yes", "y"), - ("no", "n"), - ), - self.stop_server_playback_prompt, - ) - elif k == "o": - self.prompt_onekey( - "Options", - ( - ("anticache", "a"), - ("anticomp", "c"), - ("showhost", "h"), - ("killextra", "k"), - ("norefresh", "n"), - ("no-upstream-certs", "u"), - ), - self._change_options - ) - elif k == "t": - self.prompt( - "Sticky cookie filter: ", - self.stickycookie_txt, - self.set_stickycookie - ) - elif k == "u": - self.prompt( - "Sticky auth filter: ", - self.stickyauth_txt, - self.set_stickyauth - ) - self.looptime = time.time() - startloop - except (Stop, KeyboardInterrupt): - pass - def stop_client_playback_prompt(self, a): if a != "n": self.stop_client_playback() @@ -996,7 +583,7 @@ class ConsoleMaster(flow.FlowMaster): def quit(self, a): if a != "n": - raise Stop + raise urwid.ExitMainLoop def _change_options(self, a): if a == "a": @@ -1012,7 +599,9 @@ class ConsoleMaster(flow.FlowMaster): elif a == "n": self.refresh_server_playback = not self.refresh_server_playback elif a == "u": - self.server.config.no_upstream_cert = not self.server.config.no_upstream_cert + self.server.config.no_upstream_cert =\ + not self.server.config.no_upstream_cert + signals.update_settings.send(self) def shutdown(self): self.state.killall(self) @@ -1039,15 +628,10 @@ class ConsoleMaster(flow.FlowMaster): def refresh_focus(self): if self.state.view: - self.refresh_flow(self.state.view[self.state.focus]) - - def refresh_flow(self, c): - if hasattr(self.header, "refresh_flow"): - self.header.refresh_flow(c) - if hasattr(self.body, "refresh_flow"): - self.body.refresh_flow(c) - if hasattr(self.statusbar, "refresh_flow"): - self.statusbar.refresh_flow(c) + signals.flow_change.send( + self, + flow = self.state.view[self.state.focus] + ) def process_flow(self, f): if self.state.intercept and f.match(self.state.intercept) and not f.request.is_replay: @@ -1055,7 +639,7 @@ class ConsoleMaster(flow.FlowMaster): else: f.reply() self.sync_list_view() - self.refresh_flow(f) + signals.flow_change.send(self, flow = f) def clear_events(self): self.eventlist[:] = [] diff --git a/libmproxy/console/common.py b/libmproxy/console/common.py index fa21c93e..bc8a2aad 100644 --- a/libmproxy/console/common.py +++ b/libmproxy/console/common.py @@ -1,18 +1,18 @@ from __future__ import absolute_import + import urwid import urwid.util import os + from .. import utils from ..protocol.http import CONTENT_MISSING, decoded +from . import signals try: import pyperclip except: pyperclip = False -VIEW_LIST = 0 -VIEW_FLOW = 1 - VIEW_FLOW_REQUEST = 0 VIEW_FLOW_RESPONSE = 1 @@ -29,6 +29,14 @@ METHOD_OPTIONS = [ ] +def is_keypress(k): + """ + Is this input event a keypress? + """ + if isinstance(k, basestring): + return True + + def highlight_key(s, k): l = [] parts = s.split(k, 1) @@ -41,6 +49,8 @@ def highlight_key(s, k): KEY_MAX = 30 + + def format_keyvals(lst, key="key", val="text", indent=0): """ Format a list of (key, value) tuples. @@ -103,10 +113,8 @@ else: SYMBOL_RETURN = u"<-" - def raw_format_flow(f, focus, extended, padding): f = dict(f) - pile = [] req = [] if extended: @@ -122,7 +130,7 @@ def raw_format_flow(f, focus, extended, padding): req.append(fcol(SYMBOL_REPLAY, "replay")) req.append(fcol(f["req_method"], "method")) - preamble = sum(i[1] for i in req) + len(req) -1 + preamble = sum(i[1] for i in req) + len(req) - 1 if f["intercepted"] and not f["acked"]: uc = "intercept" @@ -182,23 +190,19 @@ def raw_format_flow(f, focus, extended, padding): def save_data(path, data, master, state): if not path: return - state.last_saveload = path path = os.path.expanduser(path) try: with file(path, "wb") as f: f.write(data) except IOError, v: - master.statusbar.message(v.strerror) + signals.status_message.send(message=v.strerror) def ask_save_path(prompt, data, master, state): - master.path_prompt( - prompt, - state.last_saveload, - save_data, - data, - master, - state + signals.status_prompt_path.send( + prompt = prompt, + callback = save_data, + args = (data, master, state) ) @@ -238,28 +242,27 @@ def copy_flow(part, scope, flow, master, state): if not data: if scope == "q": - master.statusbar.message("No request content to copy.") + signals.status_message.send(message="No request content to copy.") elif scope == "s": - master.statusbar.message("No response content to copy.") + signals.status_message.send(message="No response content to copy.") else: - master.statusbar.message("No contents to copy.") + signals.status_message.send(message="No contents to copy.") return try: master.add_event(str(len(data))) pyperclip.copy(data) - except RuntimeError: + except (RuntimeError, UnicodeDecodeError): def save(k): if k == "y": - ask_save_path("Save data: ", data, master, state) - - master.prompt_onekey( - "Cannot copy binary data to clipboard. Save as file?", - ( + ask_save_path("Save data", data, master, state) + signals.status_prompt_onekey.send( + prompt = "Cannot copy binary data to clipboard. Save as file?", + keys = ( ("yes", "y"), ("no", "n"), ), - save + callback = save ) @@ -271,39 +274,35 @@ def ask_copy_part(scope, flow, master, state): if scope != "s": choices.append(("url", "u")) - master.prompt_onekey( - "Copy", - choices, - copy_flow, - scope, - flow, - master, - state + signals.status_prompt_onekey.send( + prompt = "Copy", + keys = choices, + callback = copy_flow, + args = (scope, flow, master, state) ) def ask_save_body(part, master, state, flow): """ - Save either the request or the response body to disk. - part can either be "q" (request), "s" (response) or None (ask user if necessary). + Save either the request or the response body to disk. part can either be + "q" (request), "s" (response) or None (ask user if necessary). """ request_has_content = flow.request and flow.request.content response_has_content = flow.response and flow.response.content if part is None: - # We first need to determine whether we want to save the request or the response content. + # We first need to determine whether we want to save the request or the + # response content. if request_has_content and response_has_content: - master.prompt_onekey( - "Save", - ( + signals.status_prompt_onekey.send( + prompt = "Save", + keys = ( ("request", "q"), ("response", "s"), ), - ask_save_body, - master, - state, - flow + callback = ask_save_body, + args = (master, state, flow) ) elif response_has_content: ask_save_body("s", master, state, flow) @@ -311,18 +310,24 @@ def ask_save_body(part, master, state, flow): ask_save_body("q", master, state, flow) elif part == "q" and request_has_content: - ask_save_path("Save request content: ", flow.request.get_decoded_content(), master, state) + ask_save_path( + "Save request content", + flow.request.get_decoded_content(), + master, + state + ) elif part == "s" and response_has_content: - ask_save_path("Save response content: ", flow.response.get_decoded_content(), master, state) + ask_save_path( + "Save response content", + flow.response.get_decoded_content(), + master, + state + ) else: - master.statusbar.message("No content to save.") + signals.status_message.send(message="No content to save.") -class FlowCache: - @utils.LRUCache(200) - def format_flow(self, *args): - return raw_format_flow(*args) -flowcache = FlowCache() +flowcache = utils.LRUCache(800) def format_flow(f, focus, extended=False, hostheader=False, padding=2): @@ -348,7 +353,6 @@ def format_flow(f, focus, extended=False, hostheader=False, padding=2): duration = 0 if f.response.timestamp_end and f.request.timestamp_start: duration = f.response.timestamp_end - f.request.timestamp_start - size = f.response.size() roundtrip = utils.pretty_duration(duration) d.update(dict( @@ -362,26 +366,7 @@ def format_flow(f, focus, extended=False, hostheader=False, padding=2): d["resp_ctype"] = t[0].split(";")[0] else: d["resp_ctype"] = "" - return flowcache.format_flow(tuple(sorted(d.items())), focus, extended, padding) - - -def int_version(v): - SIG = 3 - v = urwid.__version__.split("-")[0].split(".") - x = 0 - for i in range(min(SIG, len(v))): - x += int(v[i]) * 10**(SIG-i) - return x - - -# We have to do this to be portable over 0.9.8 and 0.9.9 If compatibility -# becomes a pain to maintain, we'll just mandate 0.9.9 or newer. -class WWrap(urwid.WidgetWrap): - if int_version(urwid.__version__) >= 990: - def set_w(self, x): - self._w = x - def get_w(self): - return self._w - w = property(get_w, set_w) - - + return flowcache.get( + raw_format_flow, + tuple(sorted(d.items())), focus, extended, padding + ) diff --git a/libmproxy/console/contentview.py b/libmproxy/console/contentview.py index 84e9946d..12ed5b64 100644 --- a/libmproxy/console/contentview.py +++ b/libmproxy/console/contentview.py @@ -1,14 +1,23 @@ from __future__ import absolute_import -import logging, subprocess, re, cStringIO, traceback, json, urwid +import cStringIO +import json +import logging +import lxml.html +import lxml.etree from PIL import Image from PIL.ExifTags import TAGS +import re +import subprocess +import traceback +import urwid -import lxml.html, lxml.etree import netlib.utils + from . import common from .. import utils, encoding, flow from ..contrib import jsbeautifier, html2text from ..contrib.wbxml.ASCommandResponse import ASCommandResponse + try: import pyamf from pyamf import remoting, flex @@ -62,6 +71,7 @@ class ViewAuto: name = "Auto" prompt = ("auto", "a") content_types = [] + def __call__(self, hdrs, content, limit): ctype = hdrs.get_first("content-type") if ctype: @@ -78,6 +88,7 @@ class ViewRaw: name = "Raw" prompt = ("raw", "r") content_types = [] + def __call__(self, hdrs, content, limit): txt = _view_text(content[:limit], len(content), limit) return "Raw", txt @@ -87,6 +98,7 @@ class ViewHex: name = "Hex" prompt = ("hex", "e") content_types = [] + def __call__(self, hdrs, content, limit): txt = [] for offset, hexa, s in netlib.utils.hexdump(content[:limit]): @@ -105,8 +117,14 @@ class ViewXML: name = "XML" prompt = ("xml", "x") content_types = ["text/xml"] + def __call__(self, hdrs, content, limit): - parser = lxml.etree.XMLParser(remove_blank_text=True, resolve_entities=False, strip_cdata=False, recover=False) + parser = lxml.etree.XMLParser( + remove_blank_text=True, + resolve_entities=False, + strip_cdata=False, + recover=False + ) try: document = lxml.etree.fromstring(content, parser) except lxml.etree.XMLSyntaxError: @@ -121,18 +139,18 @@ class ViewXML: lxml.etree.tostring(p) ) p = p.getprevious() - doctype=docinfo.doctype + doctype = docinfo.doctype if prev: doctype += "\n".join(prev).strip() doctype = doctype.strip() s = lxml.etree.tostring( - document, - pretty_print=True, - xml_declaration=True, - doctype=doctype or None, - encoding = docinfo.encoding - ) + document, + pretty_print=True, + xml_declaration=True, + doctype=doctype or None, + encoding = docinfo.encoding + ) txt = [] for i in s[:limit].strip().split("\n"): @@ -147,6 +165,7 @@ class ViewJSON: name = "JSON" prompt = ("json", "s") content_types = ["application/json"] + def __call__(self, hdrs, content, limit): lines = utils.pretty_json(content) if lines: @@ -167,12 +186,20 @@ class ViewHTML: name = "HTML" prompt = ("html", "h") content_types = ["text/html"] + def __call__(self, hdrs, content, limit): if utils.isXML(content): - parser = lxml.etree.HTMLParser(strip_cdata=True, remove_blank_text=True) + parser = lxml.etree.HTMLParser( + strip_cdata=True, + remove_blank_text=True + ) d = lxml.html.fromstring(content, parser=parser) docinfo = d.getroottree().docinfo - s = lxml.etree.tostring(d, pretty_print=True, doctype=docinfo.doctype) + s = lxml.etree.tostring( + d, + pretty_print=True, + doctype=docinfo.doctype + ) return "HTML", _view_text(s[:limit], len(s), limit) @@ -180,6 +207,7 @@ class ViewHTMLOutline: name = "HTML Outline" prompt = ("html outline", "o") content_types = ["text/html"] + def __call__(self, hdrs, content, limit): content = content.decode("utf-8") h = html2text.HTML2Text(baseurl="") @@ -194,14 +222,15 @@ class ViewURLEncoded: name = "URL-encoded" prompt = ("urlencoded", "u") content_types = ["application/x-www-form-urlencoded"] + def __call__(self, hdrs, content, limit): lines = utils.urldecode(content) if lines: body = common.format_keyvals( - [(k+":", v) for (k, v) in lines], - key = "header", - val = "text" - ) + [(k+":", v) for (k, v) in lines], + key = "header", + val = "text" + ) return "URLEncoded form", body @@ -209,6 +238,7 @@ class ViewMultipart: name = "Multipart Form" prompt = ("multipart", "m") content_types = ["multipart/form-data"] + def __call__(self, hdrs, content, limit): v = utils.multipartdecode(hdrs, content) if v: @@ -302,12 +332,14 @@ class ViewJavaScript: "application/javascript", "text/javascript" ] + def __call__(self, hdrs, content, limit): opts = jsbeautifier.default_options() opts.indent_size = 2 res = jsbeautifier.beautify(content[:limit], opts) return "JavaScript", _view_text(res, len(res), limit) + class ViewCSS: name = "CSS" prompt = ("css", "c") @@ -335,6 +367,7 @@ class ViewImage: "image/vnd.microsoft.icon", "image/x-icon", ] + def __call__(self, hdrs, content, limit): try: img = Image.open(cStringIO.StringIO(content)) @@ -360,14 +393,17 @@ class ViewImage: ) clean = [] for i in parts: - clean.append([netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])]) - fmt = common.format_keyvals( - clean, - key = "header", - val = "text" + clean.append( + [netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])] ) + fmt = common.format_keyvals( + clean, + key = "header", + val = "text" + ) return "%s image"%img.format, fmt + class ViewProtobuf: """Human friendly view of protocol buffers The view uses the protoc compiler to decode the binary @@ -383,7 +419,10 @@ class ViewProtobuf: @staticmethod def is_available(): try: - p = subprocess.Popen(["protoc", "--version"], stdout=subprocess.PIPE) + p = subprocess.Popen( + ["protoc", "--version"], + stdout=subprocess.PIPE + ) out, _ = p.communicate() return out.startswith("libprotoc") except: @@ -407,6 +446,7 @@ class ViewProtobuf: txt = _view_text(decoded[:limit], len(decoded), limit) return "Protobuf", txt + class ViewWBXML: name = "WBXML" prompt = ("wbxml", "w") @@ -416,14 +456,14 @@ class ViewWBXML: ] def __call__(self, hdrs, content, limit): - + try: parser = ASCommandResponse(content) parsedContent = parser.xmlString txt = _view_text(parsedContent, len(parsedContent), limit) return "WBXML", txt except: - return None + return None views = [ ViewAuto(), @@ -492,7 +532,7 @@ def get_content_view(viewmode, hdrItems, content, limit, logfunc, is_request): # Third-party viewers can fail in unexpected ways... except Exception: s = traceback.format_exc() - s = "Content viewer failed: \n" + s + s = "Content viewer failed: \n" + s logfunc(s, "error") ret = None if not ret: diff --git a/libmproxy/console/flowdetailview.py b/libmproxy/console/flowdetailview.py index 4164c416..8bfdae4a 100644 --- a/libmproxy/console/flowdetailview.py +++ b/libmproxy/console/flowdetailview.py @@ -1,6 +1,6 @@ from __future__ import absolute_import import urwid -from . import common +from . import common, signals from .. import utils footer = [ @@ -8,8 +8,8 @@ footer = [ ] class FlowDetailsView(urwid.ListBox): - def __init__(self, master, flow, state): - self.master, self.flow, self.state = master, flow, state + def __init__(self, flow): + self.flow = flow urwid.ListBox.__init__( self, self.flowtext() @@ -18,10 +18,7 @@ class FlowDetailsView(urwid.ListBox): def keypress(self, size, key): key = common.shortcuts(key) if key == "q": - self.master.statusbar = self.state[0] - self.master.body = self.state[1] - self.master.header = self.state[2] - self.master.make_view() + signals.pop_view_state.send(self) return None elif key == "?": key = None diff --git a/libmproxy/console/flowlist.py b/libmproxy/console/flowlist.py index 9e7c6d69..946bd97b 100644 --- a/libmproxy/console/flowlist.py +++ b/libmproxy/console/flowlist.py @@ -1,7 +1,8 @@ from __future__ import absolute_import import urwid from netlib import http -from . import common +from . import common, signals + def _mkhelp(): text = [] @@ -35,6 +36,7 @@ footer = [ ('heading_key', "?"), ":help ", ] + class EventListBox(urwid.ListBox): def __init__(self, master): self.master = master @@ -60,7 +62,10 @@ class BodyPile(urwid.Pile): self, [ FlowListBox(master), - urwid.Frame(EventListBox(master), header = self.inactive_header) + urwid.Frame( + EventListBox(master), + header = self.inactive_header + ) ] ) self.master = master @@ -80,39 +85,41 @@ class BodyPile(urwid.Pile): # This is essentially a copypasta from urwid.Pile's keypress handler. # So much for "closed for modification, but open for extension". item_rows = None - if len(size)==2: - item_rows = self.get_item_rows( size, focus=True ) + if len(size) == 2: + item_rows = self.get_item_rows(size, focus = True) i = self.widget_list.index(self.focus_item) - tsize = self.get_item_size(size,i,True,item_rows) - return self.focus_item.keypress( tsize, key ) + tsize = self.get_item_size(size, i, True, item_rows) + return self.focus_item.keypress(tsize, key) -class ConnectionItem(common.WWrap): +class ConnectionItem(urwid.WidgetWrap): def __init__(self, master, state, flow, focus): self.master, self.state, self.flow = master, state, flow self.f = focus w = self.get_text() - common.WWrap.__init__(self, w) + urwid.WidgetWrap.__init__(self, w) def get_text(self): - return common.format_flow(self.flow, self.f, hostheader=self.master.showhost) + return common.format_flow( + self.flow, + self.f, + hostheader = self.master.showhost + ) def selectable(self): return True def save_flows_prompt(self, k): if k == "a": - self.master.path_prompt( - "Save all flows to: ", - self.state.last_saveload, - self.master.save_flows + signals.status_prompt_path.send( + prompt = "Save all flows to", + callback = self.master.save_flows ) else: - self.master.path_prompt( - "Save this flow to: ", - self.state.last_saveload, - self.master.save_one_flow, - self.flow + signals.status_prompt_path.send( + prompt = "Save this flow to", + callback = self.master.save_one_flow, + args = (self.flow,) ) def stop_server_playback_prompt(self, a): @@ -125,22 +132,25 @@ class ConnectionItem(common.WWrap): [i.copy() for i in self.master.state.view], self.master.killextra, self.master.rheaders, False, self.master.nopop, - self.master.options.replay_ignore_params, self.master.options.replay_ignore_content, - self.master.options.replay_ignore_payload_params + self.master.options.replay_ignore_params, + self.master.options.replay_ignore_content, + self.master.options.replay_ignore_payload_params, + self.master.options.replay_ignore_host ) elif k == "t": self.master.start_server_playback( [self.flow.copy()], self.master.killextra, self.master.rheaders, False, self.master.nopop, - self.master.options.replay_ignore_params, self.master.options.replay_ignore_content, - self.master.options.replay_ignore_payload_params + self.master.options.replay_ignore_params, + self.master.options.replay_ignore_content, + self.master.options.replay_ignore_payload_params, + self.master.options.replay_ignore_host ) else: - self.master.path_prompt( - "Server replay path: ", - self.state.last_saveload, - self.master.server_playback_path + signals.status_prompt_path.send( + prompt = "Server replay path", + callback = self.master.server_playback_path ) def keypress(self, (maxcol,), key): @@ -158,43 +168,44 @@ class ConnectionItem(common.WWrap): elif key == "r": r = self.master.replay_request(self.flow) if r: - self.master.statusbar.message(r) + signals.status_message.send(message=r) self.master.sync_list_view() elif key == "S": if not self.master.server_playback: - self.master.prompt_onekey( - "Server Replay", - ( + signals.status_prompt_onekey.send( + prompt = "Server Replay", + keys = ( ("all flows", "a"), ("this flow", "t"), ("file", "f"), ), - self.server_replay_prompt, + callback = self.server_replay_prompt, ) else: - self.master.prompt_onekey( - "Stop current server replay?", - ( + signals.status_prompt_onekey.send( + prompt = "Stop current server replay?", + keys = ( ("yes", "y"), ("no", "n"), ), - self.stop_server_playback_prompt, + callback = self.stop_server_playback_prompt, ) elif key == "V": if not self.flow.modified(): - self.master.statusbar.message("Flow not modified.") + signals.status_message.send(message="Flow not modified.") return self.state.revert(self.flow) self.master.sync_list_view() - self.master.statusbar.message("Reverted.") + signals.status_message.send(message="Reverted.") elif key == "w": - self.master.prompt_onekey( - "Save", - ( + signals.status_prompt_onekey.send( + self, + prompt = "Save", + keys = ( ("all flows", "a"), ("this flow", "t"), ), - self.save_flows_prompt, + callback = self.save_flows_prompt, ) elif key == "X": self.flow.kill(self.master) @@ -202,11 +213,10 @@ class ConnectionItem(common.WWrap): if self.flow.request: self.master.view_flow(self.flow) elif key == "|": - self.master.path_prompt( - "Send flow to script: ", - self.state.last_script, - self.master.run_script_once, - self.flow + signals.status_prompt_path.send( + prompt = "Send flow to script", + callback = self.master.run_script_once, + args = (self.flow,) ) elif key == "g": common.ask_copy_part("a", self.flow, self.master, self.state) @@ -249,11 +259,16 @@ class FlowListBox(urwid.ListBox): def get_method_raw(self, k): if k: - self.get_url(k) + self.get_url(k) def get_method(self, k): if k == "e": - self.master.prompt("Method:", "", self.get_method_raw) + signals.status_prompt.send( + self, + prompt = "Method", + text = "", + callback = self.get_method_raw + ) else: method = "" for i in common.METHOD_OPTIONS: @@ -261,13 +276,18 @@ class FlowListBox(urwid.ListBox): method = i[0].upper() self.get_url(method) - def get_url(self,method): - self.master.prompt("URL:", "http://www.example.com/", self.new_request, method) + def get_url(self, method): + signals.status_prompt.send( + prompt = "URL", + text = "http://www.example.com/", + callback = self.new_request, + args = (method,) + ) def new_request(self, url, method): parts = http.parse_url(str(url)) if not parts: - self.master.statusbar.message("Invalid Url") + signals.status_message.send(message="Invalid Url") return scheme, host, port, path = parts f = self.master.create_request(method, scheme, host, port, path) @@ -283,25 +303,33 @@ class FlowListBox(urwid.ListBox): elif key == "e": self.master.toggle_eventlog() elif key == "l": - self.master.prompt("Limit: ", self.master.state.limit_txt, self.master.set_limit) + signals.status_prompt.send( + prompt = "Limit", + text = self.master.state.limit_txt, + callback = self.master.set_limit + ) elif key == "L": - self.master.path_prompt( - "Load flows: ", - self.master.state.last_saveload, - self.master.load_flows_callback + signals.status_prompt_path.send( + self, + prompt = "Load flows", + callback = self.master.load_flows_callback ) elif key == "n": - self.master.prompt_onekey("Method", common.METHOD_OPTIONS, self.get_method) + signals.status_prompt_onekey.send( + prompt = "Method", + keys = common.METHOD_OPTIONS, + callback = self.get_method + ) elif key == "F": self.master.toggle_follow_flows() elif key == "W": if self.master.stream: self.master.stop_stream() else: - self.master.path_prompt( - "Stream flows to: ", - self.master.state.last_saveload, - self.master.start_stream_to_path + signals.status_prompt_path.send( + self, + prompt = "Stream flows to", + callback = self.master.start_stream_to_path ) else: return urwid.ListBox.keypress(self, size, key) diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py index 5c91512c..1aebb0f0 100644 --- a/libmproxy/console/flowview.py +++ b/libmproxy/console/flowview.py @@ -1,7 +1,7 @@ from __future__ import absolute_import import os, sys, copy import urwid -from . import common, grideditor, contentview +from . import common, grideditor, contentview, signals from .. import utils, flow, controller from ..protocol.http import HTTPRequest, HTTPResponse, CONTENT_MISSING, decoded @@ -19,7 +19,7 @@ def _mkhelp(): ("D", "duplicate flow"), ("e", "edit request/response"), ("f", "load full body data"), - ("g", "copy response(content/headers) to clipboard"), + ("g", "copy response(content/headers) to clipboard"), ("m", "change body display mode for this entity"), (None, common.highlight_key("automatic", "a") + @@ -84,32 +84,33 @@ footer = [ ] -class FlowViewHeader(common.WWrap): +class FlowViewHeader(urwid.WidgetWrap): def __init__(self, master, f): self.master, self.flow = master, f - self.w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost) - - def refresh_flow(self, f): - if f == self.flow: - self.w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost) - - -class CallbackCache: - @utils.LRUCache(200) - def _callback(self, method, *args, **kwargs): - return getattr(self.obj, method)(*args, **kwargs) + self._w = common.format_flow( + f, + False, + extended=True, + padding=0, + hostheader=self.master.showhost + ) + signals.flow_change.connect(self.sig_flow_change) + + def sig_flow_change(self, sender, flow): + if flow == self.flow: + self._w = common.format_flow( + flow, + False, + extended=True, + padding=0, + hostheader=self.master.showhost + ) - def callback(self, obj, method, *args, **kwargs): - # obj varies! - self.obj = obj - return self._callback(method, *args, **kwargs) -cache = CallbackCache() +cache = utils.LRUCache(200) -class FlowView(common.WWrap): - REQ = 0 - RESP = 1 +class FlowView(urwid.WidgetWrap): highlight_color = "focusfield" def __init__(self, master, state, flow): @@ -119,37 +120,39 @@ class FlowView(common.WWrap): self.view_response() else: self.view_request() + signals.flow_change.connect(self.sig_flow_change) - def _cached_content_view(self, viewmode, hdrItems, content, limit, is_request): - return contentview.get_content_view(viewmode, hdrItems, content, limit, self.master.add_event, is_request) + def sig_flow_change(self, sender, flow): + if flow == self.flow: + if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response: + self.view_response() + else: + self.view_request() def content_view(self, viewmode, conn): - full = self.state.get_flow_setting( - self.flow, - (self.state.view_flow_mode, "fullcontents"), - False - ) - if full: - limit = sys.maxint + if conn.content == CONTENT_MISSING: + msg, body = "", [urwid.Text([("error", "[content missing]")])] + return (msg, body) else: - limit = contentview.VIEW_CUTOFF - description, text_objects = cache.callback( - self, "_cached_content_view", - viewmode, - tuple(tuple(i) for i in conn.headers.lst), - conn.content, - limit, - isinstance(conn, HTTPRequest) - ) - return (description, text_objects) - - def cont_view_handle_missing(self, conn, viewmode): - if conn.content == CONTENT_MISSING: - msg, body = "", [urwid.Text([("error", "[content missing]")])] + full = self.state.get_flow_setting( + self.flow, + (self.state.view_flow_mode, "fullcontents"), + False + ) + if full: + limit = sys.maxint else: - msg, body = self.content_view(viewmode, conn) - - return (msg, body) + limit = contentview.VIEW_CUTOFF + description, text_objects = cache.get( + contentview.get_content_view, + viewmode, + tuple(tuple(i) for i in conn.headers.lst), + conn.content, + limit, + self.master.add_event, + isinstance(conn, HTTPRequest) + ) + return (description, text_objects) def viewmode_get(self, override): return self.state.default_body_view if override is None else override @@ -170,7 +173,7 @@ class FlowView(common.WWrap): ) override = self.override_get() viewmode = self.viewmode_get(override) - msg, body = self.cont_view_handle_missing(conn, viewmode) + msg, body = self.content_view(viewmode, conn) return headers, msg, body def conn_text_merge(self, headers, msg, body): @@ -207,7 +210,8 @@ class FlowView(common.WWrap): def conn_text(self, conn): """ - Same as conn_text_raw, but returns result wrapped in a listbox ready for usage. + Same as conn_text_raw, but returns result wrapped in a listbox ready for + usage. """ headers, msg, body = self.conn_text_raw(conn) merged = self.conn_text_merge(headers, msg, body) @@ -278,14 +282,16 @@ class FlowView(common.WWrap): """ runs the previous search again, forwards or backwards. """ - last_search_string = self.state.get_flow_setting(self.flow, "last_search_string") + last_search_string = self.state.get_flow_setting( + self.flow, "last_search_string" + ) if last_search_string: message = self.search(last_search_string, backwards) if message: - self.master.statusbar.message(message) + signals.status_message.send(message=message) else: message = "no previous searches have been made" - self.master.statusbar.message(message) + signals.status_message.send(message=message) return message @@ -319,7 +325,11 @@ class FlowView(common.WWrap): # generate the body, highlight the words and get focus headers, msg, body = self.conn_text_raw(text) try: - body, focus_position = self.search_highlight_text(body, search_string, backwards=backwards) + body, focus_position = self.search_highlight_text( + body, + search_string, + backwards=backwards + ) except SearchError: return "Search not supported in this view." @@ -331,12 +341,16 @@ class FlowView(common.WWrap): merged = self.conn_text_merge(headers, msg, body) list_box = urwid.ListBox(merged) list_box.set_focus(focus_position + 2) - self.w = self.wrap_body(const, list_box) - self.master.statusbar.redraw() + self._w = self.wrap_body(const, list_box) + signals.update_settings.send(self) self.last_displayed_body = list_box - wrapped, wrapped_message = self.search_wrapped_around(last_find_line, last_search_index, backwards) + wrapped, wrapped_message = self.search_wrapped_around( + last_find_line, + last_search_index, + backwards + ) if wrapped: return wrapped_message @@ -344,9 +358,15 @@ class FlowView(common.WWrap): def search_get_start(self, search_string): start_line = 0 start_index = 0 - last_search_string = self.state.get_flow_setting(self.flow, "last_search_string") + last_search_string = self.state.get_flow_setting( + self.flow, + "last_search_string" + ) if search_string == last_search_string: - start_line = self.state.get_flow_setting(self.flow, "last_find_line") + start_line = self.state.get_flow_setting( + self.flow, + "last_find_line" + ) start_index = self.state.get_flow_setting(self.flow, "last_search_index") @@ -391,7 +411,10 @@ class FlowView(common.WWrap): found = False text_objects = copy.deepcopy(text_objects) - loop_range = self.search_get_range(len(text_objects), start_line, backwards) + loop_range = self.search_get_range( + len(text_objects), + start_line, backwards + ) for i in loop_range: text_object = text_objects[i] @@ -403,10 +426,19 @@ class FlowView(common.WWrap): if i != start_line: start_index = 0 - find_index = self.search_find(text, search_string, start_index, backwards) + find_index = self.search_find( + text, + search_string, + start_index, + backwards + ) if find_index != -1: - new_text = self.search_highlight_object(text, find_index, search_string) + new_text = self.search_highlight_object( + text, + find_index, + search_string + ) text_objects[i] = new_text found = True @@ -424,14 +456,26 @@ class FlowView(common.WWrap): focus_pos = None else: if not backwards: - self.state.add_flow_setting(self.flow, "last_search_index", 0) - self.state.add_flow_setting(self.flow, "last_find_line", 0) + self.state.add_flow_setting( + self.flow, "last_search_index", 0 + ) + self.state.add_flow_setting( + self.flow, "last_find_line", 0 + ) else: - self.state.add_flow_setting(self.flow, "last_search_index", None) - self.state.add_flow_setting(self.flow, "last_find_line", len(text_objects) - 1) + self.state.add_flow_setting( + self.flow, "last_search_index", None + ) + self.state.add_flow_setting( + self.flow, "last_find_line", len(text_objects) - 1 + ) - text_objects, focus_pos = self.search_highlight_text(text_objects, - search_string, looping=True, backwards=backwards) + text_objects, focus_pos = self.search_highlight_text( + text_objects, + search_string, + looping=True, + backwards=backwards + ) return text_objects, focus_pos @@ -455,8 +499,7 @@ class FlowView(common.WWrap): def view_request(self): self.state.view_flow_mode = common.VIEW_FLOW_REQUEST body = self.conn_text(self.flow.request) - self.w = self.wrap_body(common.VIEW_FLOW_REQUEST, body) - self.master.statusbar.redraw() + self._w = self.wrap_body(common.VIEW_FLOW_REQUEST, body) def view_response(self): self.state.view_flow_mode = common.VIEW_FLOW_RESPONSE @@ -475,29 +518,25 @@ class FlowView(common.WWrap): ) ] ) - self.w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body) - self.master.statusbar.redraw() - - def refresh_flow(self, c=None): - if c == self.flow: - if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response: - self.view_response() - else: - self.view_request() + self._w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body) def set_method_raw(self, m): if m: self.flow.request.method = m - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def edit_method(self, m): if m == "e": - self.master.prompt_edit("Method", self.flow.request.method, self.set_method_raw) + signals.status_prompt.send( + prompt = "Method", + text = self.flow.request.method, + callback = self.set_method_raw + ) else: for i in common.METHOD_OPTIONS: if i[1] == m: self.flow.request.method = i[0].upper() - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def set_url(self, url): request = self.flow.request @@ -505,7 +544,7 @@ class FlowView(common.WWrap): request.url = str(url) except ValueError: return "Invalid URL." - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def set_resp_code(self, code): response = self.flow.response @@ -516,28 +555,37 @@ class FlowView(common.WWrap): import BaseHTTPServer if BaseHTTPServer.BaseHTTPRequestHandler.responses.has_key(int(code)): response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[int(code)][0] - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def set_resp_msg(self, msg): response = self.flow.response response.msg = msg - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def set_headers(self, lst, conn): conn.headers = flow.ODictCaseless(lst) + signals.flow_change.send(self, flow = self.flow) def set_query(self, lst, conn): conn.set_query(flow.ODict(lst)) + signals.flow_change.send(self, flow = self.flow) def set_path_components(self, lst, conn): conn.set_path_components([i[0] for i in lst]) + signals.flow_change.send(self, flow = self.flow) def set_form(self, lst, conn): conn.set_form_urlencoded(flow.ODict(lst)) + signals.flow_change.send(self, flow = self.flow) def edit_form(self, conn): self.master.view_grideditor( - grideditor.URLEncodedFormEditor(self.master, conn.get_form_urlencoded().lst, self.set_form, conn) + grideditor.URLEncodedFormEditor( + self.master, + conn.get_form_urlencoded().lst, + self.set_form, + conn + ) ) def edit_form_confirm(self, key, conn): @@ -559,42 +607,80 @@ class FlowView(common.WWrap): self.flow.backup() if part == "r": with decoded(message): - # Fix an issue caused by some editors when editing a request/response body. - # Many editors make it hard to save a file without a terminating newline on the last - # line. When editing message bodies, this can cause problems. For now, I just - # strip the newlines off the end of the body when we return from an editor. + # Fix an issue caused by some editors when editing a + # request/response body. Many editors make it hard to save a + # file without a terminating newline on the last line. When + # editing message bodies, this can cause problems. For now, I + # just strip the newlines off the end of the body when we return + # from an editor. c = self.master.spawn_editor(message.content or "") message.content = c.rstrip("\n") elif part == "f": if not message.get_form_urlencoded() and message.content: - self.master.prompt_onekey( - "Existing body is not a URL-encoded form. Clear and edit?", - [ + signals.status_prompt_onekey.send( + prompt = "Existing body is not a URL-encoded form. Clear and edit?", + keys = [ ("yes", "y"), ("no", "n"), ], - self.edit_form_confirm, - message + callback = self.edit_form_confirm, + args = (message,) ) else: self.edit_form(message) elif part == "h": - self.master.view_grideditor(grideditor.HeaderEditor(self.master, message.headers.lst, self.set_headers, message)) + self.master.view_grideditor( + grideditor.HeaderEditor( + self.master, + message.headers.lst, + self.set_headers, + message + ) + ) elif part == "p": p = message.get_path_components() p = [[i] for i in p] - self.master.view_grideditor(grideditor.PathEditor(self.master, p, self.set_path_components, message)) + self.master.view_grideditor( + grideditor.PathEditor( + self.master, + p, + self.set_path_components, + message + ) + ) elif part == "q": - self.master.view_grideditor(grideditor.QueryEditor(self.master, message.get_query().lst, self.set_query, message)) + self.master.view_grideditor( + grideditor.QueryEditor( + self.master, + message.get_query().lst, + self.set_query, message + ) + ) elif part == "u" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - self.master.prompt_edit("URL", message.url, self.set_url) + signals.status_prompt.send( + prompt = "URL", + text = message.url, + callback = self.set_url + ) elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - self.master.prompt_onekey("Method", common.METHOD_OPTIONS, self.edit_method) + signals.status_prompt_onekey.send( + prompt = "Method", + keys = common.METHOD_OPTIONS, + callback = self.edit_method + ) elif part == "c" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: - self.master.prompt_edit("Code", str(message.code), self.set_resp_code) + signals.status_prompt.send( + prompt = "Code", + text = str(message.code), + callback = self.set_resp_code + ) elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: - self.master.prompt_edit("Message", message.msg, self.set_resp_msg) - self.master.refresh_flow(self.flow) + signals.status_prompt.send( + prompt = "Message", + text = message.msg, + callback = self.set_resp_msg + ) + signals.flow_change.send(self, flow = self.flow) def _view_nextprev_flow(self, np, flow): try: @@ -606,9 +692,10 @@ class FlowView(common.WWrap): else: new_flow, new_idx = self.state.get_prev(idx) if new_flow is None: - self.master.statusbar.message("No more flows!") - return - self.master.view_flow(new_flow) + signals.status_message.send(message="No more flows!") + else: + signals.pop_view_state.send(self) + self.master.view_flow(new_flow) def view_next_flow(self, flow): return self._view_nextprev_flow("next", flow) @@ -622,7 +709,7 @@ class FlowView(common.WWrap): (self.state.view_flow_mode, "prettyview"), contentview.get_by_shortcut(t) ) - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def delete_body(self, t): if t == "m": @@ -633,7 +720,7 @@ class FlowView(common.WWrap): self.flow.request.content = val else: self.flow.response.content = val - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) def keypress(self, size, key): if key == " ": @@ -647,8 +734,8 @@ class FlowView(common.WWrap): conn = self.flow.response if key == "q": - self.master.view_flowlist() - key = None + signals.pop_view_state.send(self) + return None elif key == "tab": if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: self.view_response() @@ -656,7 +743,7 @@ class FlowView(common.WWrap): self.view_request() elif key in ("up", "down", "page up", "page down"): # Why doesn't this just work?? - self.w.keypress(size, key) + self._w.keypress(size, key) elif key == "a": self.flow.accept_intercept(self.master) self.master.view_flow(self.flow) @@ -681,12 +768,12 @@ class FlowView(common.WWrap): elif key == "D": f = self.master.duplicate_flow(self.flow) self.master.view_flow(f) - self.master.statusbar.message("Duplicated.") + signals.status_message.send(message="Duplicated.") elif key == "e": if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - self.master.prompt_onekey( - "Edit request", - ( + signals.status_prompt_onekey.send( + prompt = "Edit request", + keys = ( ("query", "q"), ("path", "p"), ("url", "u"), @@ -695,29 +782,29 @@ class FlowView(common.WWrap): ("raw body", "r"), ("method", "m"), ), - self.edit + callback = self.edit ) else: - self.master.prompt_onekey( - "Edit response", - ( + signals.status_prompt_onekey.send( + prompt = "Edit response", + keys = ( ("code", "c"), ("message", "m"), ("header", "h"), ("raw body", "r"), ), - self.edit + callback = self.edit ) key = None elif key == "f": - self.master.statusbar.message("Loading all body data...") + signals.status_message.send(message="Loading all body data...") self.state.add_flow_setting( self.flow, (self.state.view_flow_mode, "fullcontents"), True ) - self.master.refresh_flow(self.flow) - self.master.statusbar.message("") + signals.flow_change.send(self, flow = self.flow) + signals.status_message.send(message="") elif key == "g": if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: scope = "q" @@ -727,10 +814,11 @@ class FlowView(common.WWrap): elif key == "m": p = list(contentview.view_prompts) p.insert(0, ("Clear", "C")) - self.master.prompt_onekey( - "Display mode", - p, - self.change_this_display_mode + signals.status_prompt_onekey.send( + self, + prompt = "Display mode", + keys = p, + callback = self.change_this_display_mode ) key = None elif key == "p": @@ -738,21 +826,20 @@ class FlowView(common.WWrap): elif key == "r": r = self.master.replay_request(self.flow) if r: - self.master.statusbar.message(r) - self.master.refresh_flow(self.flow) + signals.status_message.send(message=r) + signals.flow_change.send(self, flow = self.flow) elif key == "V": if not self.flow.modified(): - self.master.statusbar.message("Flow not modified.") + signals.status_message.send(message="Flow not modified.") return self.state.revert(self.flow) - self.master.refresh_flow(self.flow) - self.master.statusbar.message("Reverted.") + signals.flow_change.send(self, flow = self.flow) + signals.status_message.send(message="Reverted.") elif key == "W": - self.master.path_prompt( - "Save this flow: ", - self.state.last_saveload, - self.master.save_one_flow, - self.flow + signals.status_prompt_path.send( + prompt = "Save this flow", + callback = self.master.save_one_flow, + args = (self.flow,) ) elif key == "v": if conn and conn.content: @@ -761,20 +848,23 @@ class FlowView(common.WWrap): if os.environ.has_key("EDITOR") or os.environ.has_key("PAGER"): self.master.spawn_external_viewer(conn.content, t) else: - self.master.statusbar.message("Error! Set $EDITOR or $PAGER.") + signals.status_message.send( + message = "Error! Set $EDITOR or $PAGER." + ) elif key == "|": - self.master.path_prompt( - "Send flow to script: ", self.state.last_script, - self.master.run_script_once, self.flow + signals.status_prompt_path.send( + prompt = "Send flow to script", + callback = self.master.run_script_once, + args = (self.flow,) ) elif key == "x": - self.master.prompt_onekey( - "Delete body", - ( + signals.status_prompt_onekey.send( + prompt = "Delete body", + keys = ( ("completely", "c"), ("mark as missing", "m"), ), - self.delete_body + callback = self.delete_body ) key = None elif key == "X": @@ -785,24 +875,31 @@ class FlowView(common.WWrap): e = conn.headers.get_first("content-encoding", "identity") if e != "identity": if not conn.decode(): - self.master.statusbar.message("Could not decode - invalid data?") + signals.status_message.send( + message = "Could not decode - invalid data?" + ) else: - self.master.prompt_onekey( - "Select encoding: ", - ( + signals.status_prompt_onekey.send( + prompt = "Select encoding: ", + keys = ( ("gzip", "z"), ("deflate", "d"), ), - self.encode_callback, - conn + callback = self.encode_callback, + args = (conn,) ) - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) elif key == "/": - last_search_string = self.state.get_flow_setting(self.flow, "last_search_string") - search_prompt = "Search body ["+last_search_string+"]: " if last_search_string else "Search body: " - self.master.prompt(search_prompt, - None, - self.search) + last_search_string = self.state.get_flow_setting( + self.flow, + "last_search_string" + ) + search_prompt = "Search body ["+last_search_string+"]" if last_search_string else "Search body" + signals.status_prompt.send( + prompt = search_prompt, + text = "", + callback = self.search + ) elif key == "n": self.search_again(backwards=False) elif key == "N": @@ -816,4 +913,4 @@ class FlowView(common.WWrap): "d": "deflate", } conn.encode(encoding_map[key]) - self.master.refresh_flow(self.flow) + signals.flow_change.send(self, flow = self.flow) diff --git a/libmproxy/console/grideditor.py b/libmproxy/console/grideditor.py index 438d0ad7..4bcc0171 100644 --- a/libmproxy/console/grideditor.py +++ b/libmproxy/console/grideditor.py @@ -1,21 +1,25 @@ from __future__ import absolute_import -import copy, re, os + +import copy +import re +import os import urwid -from . import common + +from . import common, signals from .. import utils, filt, script from netlib import http_uastrings -footer = [ +FOOTER = [ ('heading_key', "enter"), ":edit ", ('heading_key', "q"), ":back ", ] -footer_editing = [ +FOOTER_EDITING = [ ('heading_key', "esc"), ":stop editing ", ] -class SText(common.WWrap): +class SText(urwid.WidgetWrap): def __init__(self, txt, focused, error): txt = txt.encode("string-escape") w = urwid.Text(txt, wrap="any") @@ -26,10 +30,10 @@ class SText(common.WWrap): w = urwid.AttrWrap(w, "focusfield") elif error: w = urwid.AttrWrap(w, "field_error") - common.WWrap.__init__(self, w) + urwid.WidgetWrap.__init__(self, w) def get_text(self): - return self.w.get_text()[0] + return self._w.get_text()[0] def keypress(self, size, key): return key @@ -38,21 +42,21 @@ class SText(common.WWrap): return True -class SEdit(common.WWrap): +class SEdit(urwid.WidgetWrap): def __init__(self, txt): txt = txt.encode("string-escape") w = urwid.Edit(edit_text=txt, wrap="any", multiline=True) w = urwid.AttrWrap(w, "editfield") - common.WWrap.__init__(self, w) + urwid.WidgetWrap.__init__(self, w) def get_text(self): - return self.w.get_text()[0] + return self._w.get_text()[0] def selectable(self): return True -class GridRow(common.WWrap): +class GridRow(urwid.WidgetWrap): def __init__(self, focused, editing, editor, values): self.focused, self.editing, self.editor = focused, editing, editor @@ -76,14 +80,14 @@ class GridRow(common.WWrap): ) if focused is not None: w.set_focus_column(focused) - common.WWrap.__init__(self, w) + urwid.WidgetWrap.__init__(self, w) def get_edit_value(self): return self.editing.get_text() def keypress(self, s, k): if self.editing: - w = self.w.column_widths(s)[self.focused] + w = self._w.column_widths(s)[self.focused] k = self.editing.keypress((w,), k) return k @@ -121,12 +125,14 @@ class GridWalker(urwid.ListWalker): try: val = val.decode("string-escape") except ValueError: - self.editor.master.statusbar.message("Invalid Python-style string encoding.", 1000) + signals.status_message.send( + self, message = "Invalid Python-style string encoding.", expure = 1000 + ) return errors = self.lst[self.focus][1] emsg = self.editor.is_error(self.focus_col, val) if emsg: - self.editor.master.statusbar.message(emsg, 1000) + signals.status_message.send(message = emsg, expire = 1) errors.add(self.focus_col) else: errors.discard(self.focus_col) @@ -155,13 +161,15 @@ class GridWalker(urwid.ListWalker): def start_edit(self): if self.lst: - self.editing = GridRow(self.focus_col, True, self.editor, self.lst[self.focus]) - self.editor.master.statusbar.update(footer_editing) + self.editing = GridRow( + self.focus_col, True, self.editor, self.lst[self.focus] + ) + self.editor.master.loop.widget.footer.update(FOOTER_EDITING) self._modified() def stop_edit(self): if self.editing: - self.editor.master.statusbar.update(footer) + self.editor.master.loop.widget.footer.update(FOOTER) self.set_current_value(self.editing.get_edit_value(), False) self.editing = False self._modified() @@ -187,7 +195,12 @@ class GridWalker(urwid.ListWalker): if self.editing: return self.editing, self.focus elif self.lst: - return GridRow(self.focus_col, False, self.editor, self.lst[self.focus]), self.focus + return GridRow( + self.focus_col, + False, + self.editor, + self.lst[self.focus] + ), self.focus else: return None, None @@ -213,10 +226,13 @@ class GridListBox(urwid.ListBox): FIRST_WIDTH_MAX = 40 FIRST_WIDTH_MIN = 20 -class GridEditor(common.WWrap): + + +class GridEditor(urwid.WidgetWrap): title = None columns = None headings = None + def __init__(self, master, value, callback, *cb_args, **cb_kwargs): value = copy.deepcopy(value) self.master, self.value, self.callback = master, value, callback @@ -248,18 +264,18 @@ class GridEditor(common.WWrap): self.walker = GridWalker(self.value, self) self.lb = GridListBox(self.walker) - self.w = urwid.Frame( + self._w = urwid.Frame( self.lb, header = urwid.Pile([title, h]) ) - self.master.statusbar.update("") + self.master.loop.widget.footer.update("") self.show_empty_msg() def show_empty_msg(self): if self.walker.lst: - self.w.set_footer(None) + self._w.set_footer(None) else: - self.w.set_footer( + self._w.set_footer( urwid.Text( [ ("highlight", "No values. Press "), @@ -297,7 +313,7 @@ class GridEditor(common.WWrap): if self.walker.focus == pf and self.walker.focus_col != pfc: self.walker.start_edit() else: - self.w.keypress(size, key) + self._w.keypress(size, key) return None key = common.shortcuts(key) @@ -307,7 +323,7 @@ class GridEditor(common.WWrap): if not i[1] and any([x.strip() for x in i[0]]): res.append(i[0]) self.callback(res, *self.cb_args, **self.cb_kwargs) - self.master.pop_view() + signals.pop_view_state.send(self) elif key in ["h", "left"]: self.walker.left() elif key in ["l", "right"]: @@ -322,10 +338,19 @@ class GridEditor(common.WWrap): self.walker.delete_focus() elif key == "r": if self.walker.get_current_value() is not None: - self.master.path_prompt("Read file: ", "", self.read_file) + signals.status_prompt_path.send( + self, + prompt = "Read file", + callback = self.read_file + ) elif key == "R": if self.walker.get_current_value() is not None: - self.master.path_prompt("Read unescaped file: ", "", self.read_file, True) + signals.status_prompt_path.send( + self, + prompt = "Read unescaped file", + callback = self.read_file, + args = (True,) + ) elif key == "e": o = self.walker.get_current_value() if o is not None: @@ -336,7 +361,7 @@ class GridEditor(common.WWrap): elif key in ["enter"]: self.walker.start_edit() elif not self.handle_key(key): - return self.w.keypress(size, key) + return self._w.keypress(size, key) def is_error(self, col, val): """ @@ -362,7 +387,9 @@ class GridEditor(common.WWrap): ("tab", "next field"), ("enter", "edit field"), ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + text.extend( + common.format_keyvals(keys, key="key", val="text", indent=4) + ) text.append( urwid.Text( [ @@ -384,6 +411,7 @@ class HeaderEditor(GridEditor): title = "Editing headers" columns = 2 headings = ("Key", "Value") + def make_help(self): h = GridEditor.make_help(self) text = [] @@ -391,7 +419,9 @@ class HeaderEditor(GridEditor): keys = [ ("U", "add User-Agent header"), ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + text.extend( + common.format_keyvals(keys, key="key", val="text", indent=4) + ) text.append(urwid.Text([("text", "\n")])) text.extend(h) return text @@ -408,10 +438,10 @@ class HeaderEditor(GridEditor): def handle_key(self, key): if key == "U": - self.master.prompt_onekey( - "Add User-Agent header:", - [(i[0], i[1]) for i in http_uastrings.UASTRINGS], - self.set_user_agent, + signals.status_prompt_onekey.send( + prompt = "Add User-Agent header:", + keys = [(i[0], i[1]) for i in http_uastrings.UASTRINGS], + callback = self.set_user_agent, ) return True @@ -426,6 +456,7 @@ class ReplaceEditor(GridEditor): title = "Editing replacement patterns" columns = 3 headings = ("Filter", "Regex", "Replacement") + def is_error(self, col, val): if col == 0: if not filt.parse(val): @@ -442,6 +473,7 @@ class SetHeadersEditor(GridEditor): title = "Editing header set patterns" columns = 3 headings = ("Filter", "Header", "Value") + def is_error(self, col, val): if col == 0: if not filt.parse(val): @@ -455,7 +487,9 @@ class SetHeadersEditor(GridEditor): keys = [ ("U", "add User-Agent header"), ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + text.extend( + common.format_keyvals(keys, key="key", val="text", indent=4) + ) text.append(urwid.Text([("text", "\n")])) text.extend(h) return text @@ -473,10 +507,10 @@ class SetHeadersEditor(GridEditor): def handle_key(self, key): if key == "U": - self.master.prompt_onekey( - "Add User-Agent header:", - [(i[0], i[1]) for i in http_uastrings.UASTRINGS], - self.set_user_agent, + signals.status_prompt_onekey.send( + prompt = "Add User-Agent header:", + keys = [(i[0], i[1]) for i in http_uastrings.UASTRINGS], + callback = self.set_user_agent, ) return True @@ -491,6 +525,7 @@ class ScriptEditor(GridEditor): title = "Editing scripts" columns = 1 headings = ("Command",) + def is_error(self, col, val): try: script.Script.parse_command(val) @@ -507,4 +542,4 @@ class HostPatternEditor(GridEditor): try: re.compile(val, re.IGNORECASE) except re.error as e: - return "Invalid regex: %s" % str(e)
\ No newline at end of file + return "Invalid regex: %s" % str(e) diff --git a/libmproxy/console/help.py b/libmproxy/console/help.py index 27288a36..73cd8a50 100644 --- a/libmproxy/console/help.py +++ b/libmproxy/console/help.py @@ -1,6 +1,8 @@ from __future__ import absolute_import + import urwid -from . import common + +from . import common, signals from .. import filt, version footer = [ @@ -8,9 +10,9 @@ footer = [ ('heading_key', "q"), ":back ", ] + class HelpView(urwid.ListBox): - def __init__(self, master, help_context, state): - self.master, self.state = master, state + def __init__(self, help_context): self.help_context = help_context or [] urwid.ListBox.__init__( self, @@ -122,7 +124,9 @@ class HelpView(urwid.ListBox): ("T", "set tcp proxying pattern"), ("u", "set sticky auth expression"), ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + text.extend( + common.format_keyvals(keys, key="key", val="text", indent=4) + ) text.append(urwid.Text([("head", "\n\nFilter expressions:\n")])) f = [] @@ -167,16 +171,15 @@ class HelpView(urwid.ListBox): ("~q ~b test", "Requests where body contains \"test\""), ("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."), ] - text.extend(common.format_keyvals(examples, key="key", val="text", indent=4)) + text.extend( + common.format_keyvals(examples, key="key", val="text", indent=4) + ) return text def keypress(self, size, key): key = common.shortcuts(key) if key == "q": - self.master.statusbar = self.state[0] - self.master.body = self.state[1] - self.master.header = self.state[2] - self.master.make_view() + signals.pop_view_state.send(self) return None elif key == "?": key = None diff --git a/libmproxy/console/palettes.py b/libmproxy/console/palettes.py index 650cf261..cfb2702c 100644 --- a/libmproxy/console/palettes.py +++ b/libmproxy/console/palettes.py @@ -1,192 +1,261 @@ -palettes = { -# Default palette for dark background - 'dark': [ - # name, foreground, background, mono, foreground_high, background_high - # For details on the meaning of the elements refer to - # http://excess.org/urwid/reference.html#Screen-register_palette - - ('body', 'black', 'dark cyan'), - ('foot', 'light gray', 'default'), - ('title', 'white,bold', 'default',), - ('editline', 'white', 'default',), - - # Status bar & heading - ('heading', 'light gray', 'dark blue', None, 'g85', 'dark blue'), - ('heading_key', 'light cyan', 'dark blue', None, 'light cyan', 'dark blue'), - ('heading_inactive', 'white', 'dark gray', None, 'g58', 'g11'), - - # Help - ('key', 'light cyan', 'default'), - ('head', 'white,bold', 'default'), - ('text', 'light gray', 'default'), - - # List and Connections - ('method', 'dark cyan', 'default'), - ('focus', 'yellow', 'default'), - - ('code_200', 'light green', 'default'), - ('code_300', 'light blue', 'default'), - ('code_400', 'light red', 'default', None, '#f60', 'default'), - ('code_500', 'light red', 'default'), - ('code_other', 'dark red', 'default'), - - ('error', 'light red', 'default'), - - ('header', 'dark cyan', 'default'), - ('highlight', 'white,bold', 'default'), - ('intercept', 'brown', 'default', None, '#f60', 'default'), - ('replay', 'light green', 'default', None, '#0f0', 'default'), - ('ack', 'light red', 'default'), - - # Hex view - ('offset', 'dark cyan', 'default'), - - # Grid Editor - ('focusfield', 'black', 'light gray'), - ('focusfield_error', 'dark red', 'light gray'), - ('field_error', 'dark red', 'black'), - ('editfield', 'black', 'light cyan'), - ], - -# Palette for light background - 'light': [ - ('body', 'black', 'dark cyan'), - ('foot', 'dark gray', 'default'), - ('title', 'white,bold', 'light blue',), - ('editline', 'white', 'default',), - - # Status bar & heading - ('heading', 'white', 'light gray', None, 'g85', 'dark blue'), - ('heading_key', 'dark blue', 'light gray', None, 'light cyan', 'dark blue'), - ('heading_inactive', 'light gray', 'dark gray', None, 'dark gray', 'dark blue'), - - # Help - ('key', 'dark blue,bold', 'default'), - ('head', 'black,bold', 'default'), - ('text', 'dark gray', 'default'), - - # List and Connections - ('method', 'dark cyan', 'default'), - ('focus', 'black', 'default'), - - ('code_200', 'dark green', 'default'), - ('code_300', 'light blue', 'default'), - ('code_400', 'dark red', 'default', None, '#f60', 'default'), - ('code_500', 'dark red', 'default'), - ('code_other', 'light red', 'default'), - - ('error', 'light red', 'default'), - - ('header', 'dark blue', 'default'), - ('highlight', 'black,bold', 'default'), - ('intercept', 'brown', 'default', None, '#f60', 'default'), - ('replay', 'dark green', 'default', None, '#0f0', 'default'), - ('ack', 'dark red', 'default'), - - # Hex view - ('offset', 'dark blue', 'default'), - - # Grid Editor - ('focusfield', 'black', 'light gray'), - ('focusfield_error', 'dark red', 'light gray'), - ('field_error', 'dark red', 'black'), - ('editfield', 'black', 'light cyan'), - ], - -# Palettes for terminals that use the Solarized precision colors -# (http://ethanschoonover.com/solarized#the-values) - -# For dark backgrounds - 'solarized_dark': [ - ('body', 'dark cyan', 'default'), - ('foot', 'dark gray', 'default'), - ('title', 'white,bold', 'default',), - ('editline', 'white', 'default',), - - # Status bar & heading - ('heading', 'light gray', 'light cyan',), - ('heading_key', 'dark blue', 'white',), - ('heading_inactive', 'light cyan', 'light gray',), - - # Help - ('key', 'dark blue', 'default',), - ('head', 'white,underline', 'default'), - ('text', 'light cyan', 'default'), - - # List and Connections - ('method', 'dark cyan', 'default'), - ('focus', 'white', 'default'), - - ('code_200', 'dark green', 'default'), - ('code_300', 'light blue', 'default'), - ('code_400', 'dark red', 'default',), - ('code_500', 'dark red', 'default'), - ('code_other', 'light red', 'default'), - - ('error', 'light red', 'default'), - - ('header', 'yellow', 'default'), - ('highlight', 'white', 'default'), - ('intercept', 'brown', 'default',), - ('replay', 'dark green', 'default',), - ('ack', 'dark red', 'default'), - - # Hex view - ('offset', 'yellow', 'default'), - ('text', 'light cyan', 'default'), - - # Grid Editor - ('focusfield', 'white', 'light cyan'), - ('focusfield_error', 'dark red', 'light gray'), - ('field_error', 'dark red', 'black'), - ('editfield', 'black', 'light gray'), - ], - -# For light backgrounds - 'solarized_light': [ - ('body', 'dark cyan', 'default'), - ('foot', 'dark gray', 'default'), - ('title', 'white,bold', 'light cyan',), - ('editline', 'white', 'default',), - - # Status bar & heading - ('heading', 'light cyan', 'light gray',), - ('heading_key', 'dark blue', 'white',), - ('heading_inactive', 'white', 'light gray',), - - # Help - ('key', 'dark blue', 'default',), - ('head', 'black,underline', 'default'), - ('text', 'light cyan', 'default'), - - # List and Connections - ('method', 'dark cyan', 'default'), - ('focus', 'black', 'default'), - - ('code_200', 'dark green', 'default'), - ('code_300', 'light blue', 'default'), - ('code_400', 'dark red', 'default',), - ('code_500', 'dark red', 'default'), - ('code_other', 'light red', 'default'), - - ('error', 'light red', 'default'), - - ('header', 'light cyan', 'default'), - ('highlight', 'black,bold', 'default'), - ('intercept', 'brown', 'default',), - ('replay', 'dark green', 'default',), - ('ack', 'dark red', 'default'), - - # Hex view - ('offset', 'light cyan', 'default'), - ('text', 'yellow', 'default'), - - # Grid Editor - ('focusfield', 'black', 'light gray'), - ('focusfield_error', 'dark red', 'light gray'), - ('field_error', 'dark red', 'black'), - ('editfield', 'white', 'light cyan'), - ], +# Low-color themes should ONLY use the standard foreground and background +# colours listed here: +# +# http://urwid.org/manual/displayattributes.html +# + + + +class Palette: + _fields = [ + 'title', + + # Status bar & heading + 'heading', 'heading_key', 'heading_inactive', + + # Help + 'key', 'head', 'text', + + # List and Connections + 'method', 'focus', + 'code_200', 'code_300', 'code_400', 'code_500', 'code_other', + 'error', + 'header', 'highlight', 'intercept', 'replay', + + # Hex view + 'offset', + + # Grid Editor + 'focusfield', 'focusfield_error', 'field_error', 'editfield', + ] + high = None + + def palette(self): + l = [] + for i in self._fields: + v = [i] + v.extend(self.low[i]) + if self.high and i in self.high: + v.append(None) + v.extend(self.high[i]) + l.append(tuple(v)) + return l + + +class LowDark(Palette): + """ + Low-color dark background + """ + low = dict( + title = ('white,bold', 'default'), + + # Status bar & heading + heading = ('light gray', 'dark blue'), + heading_key = ('light cyan', 'dark blue'), + heading_inactive = ('white', 'dark gray'), + + # Help + key = ('light cyan', 'default'), + head = ('white,bold', 'default'), + text = ('light gray', 'default'), + + # List and Connections + method = ('dark cyan', 'default'), + focus = ('yellow', 'default'), + + code_200 = ('dark green', 'default'), + code_300 = ('light blue', 'default'), + code_400 = ('light red', 'default'), + code_500 = ('light red', 'default'), + code_other = ('dark red', 'default'), + + error = ('light red', 'default'), + + header = ('dark cyan', 'default'), + highlight = ('white,bold', 'default'), + intercept = ('brown', 'default'), + replay = ('light green', 'default'), + + # Hex view + offset = ('dark cyan', 'default'), + + # Grid Editor + focusfield = ('black', 'light gray'), + focusfield_error = ('dark red', 'light gray'), + field_error = ('dark red', 'default'), + editfield = ('white', 'default'), + ) + + +class Dark(LowDark): + high = dict( + heading_inactive = ('g58', 'g11'), + intercept = ('#f60', 'default'), + ) + + +class LowLight(Palette): + """ + Low-color light background + """ + low = dict( + title = ('dark magenta,bold', 'light blue'), + + # Status bar & heading + heading = ('light gray', 'dark blue'), + heading_key = ('light cyan', 'dark blue'), + heading_inactive = ('black', 'light gray'), + + # Help + key = ('dark blue,bold', 'default'), + head = ('black,bold', 'default'), + text = ('dark gray', 'default'), + + # List and Connections + method = ('dark cyan', 'default'), + focus = ('black', 'default'), + + code_200 = ('dark green', 'default'), + code_300 = ('light blue', 'default'), + code_400 = ('dark red', 'default'), + code_500 = ('dark red', 'default'), + code_other = ('light red', 'default'), + + error = ('light red', 'default'), + + header = ('dark blue', 'default'), + highlight = ('black,bold', 'default'), + intercept = ('brown', 'default'), + replay = ('dark green', 'default'), + + # Hex view + offset = ('dark blue', 'default'), + + # Grid Editor + focusfield = ('black', 'light gray'), + focusfield_error = ('dark red', 'light gray'), + field_error = ('dark red', 'black'), + editfield = ('black', 'default'), + ) + + +class Light(LowLight): + high = dict( + heading = ('g99', '#08f'), + heading_key = ('#0ff,bold', '#08f'), + heading_inactive = ('g35', 'g85'), + replay = ('#0a0,bold', 'default'), + ) + + +# Solarized palette in Urwid-style terminal high-colour offsets +# See: http://ethanschoonover.com/solarized +sol_base03 = "h234" +sol_base02 = "h235" +sol_base01 = "h240" +sol_base00 = "h241" +sol_base0 = "h244" +sol_base1 = "h245" +sol_base2 = "h254" +sol_base3 = "h230" +sol_yellow = "h136" +sol_orange = "h166" +sol_red = "h160" +sol_magenta = "h125" +sol_violet = "h61" +sol_blue = "h33" +sol_cyan = "h37" +sol_green = "h64" +class SolarizedLight(LowLight): + high = dict( + title = (sol_blue, 'default'), + text = (sol_base00, 'default'), + + # Status bar & heading + heading = (sol_base2, sol_base02), + heading_key = (sol_blue, sol_base03), + heading_inactive = (sol_base03, sol_base1), + + # Help + key = (sol_blue, 'default',), + head = (sol_base00, 'default'), + + # List and Connections + method = (sol_cyan, 'default'), + focus = (sol_base01, 'default'), + + code_200 = (sol_green, 'default'), + code_300 = (sol_blue, 'default'), + code_400 = (sol_orange, 'default',), + code_500 = (sol_red, 'default'), + code_other = (sol_magenta, 'default'), + + error = (sol_red, 'default'), + + header = (sol_base01, 'default'), + highlight = (sol_base01, 'default'), + intercept = (sol_red, 'default',), + replay = (sol_green, 'default',), + + # Hex view + offset = (sol_cyan, 'default'), + + # Grid Editor + focusfield = (sol_base00, sol_base2), + focusfield_error = (sol_red, sol_base2), + field_error = (sol_red, 'default'), + editfield = (sol_base01, 'default'), + ) + + +class SolarizedDark(LowDark): + high = dict( + title = (sol_blue, 'default'), + text = (sol_base0, 'default'), + + # Status bar & heading + heading = (sol_base03, sol_base1), + heading_key = (sol_blue+",bold", sol_base1), + heading_inactive = (sol_base1, sol_base02), + + # Help + key = (sol_blue, 'default',), + head = (sol_base00, 'default'), + + # List and Connections + method = (sol_cyan, 'default'), + focus = (sol_base1, 'default'), + + code_200 = (sol_green, 'default'), + code_300 = (sol_blue, 'default'), + code_400 = (sol_orange, 'default',), + code_500 = (sol_red, 'default'), + code_other = (sol_magenta, 'default'), + + error = (sol_red, 'default'), + + header = (sol_base01, 'default'), + highlight = (sol_base01, 'default'), + intercept = (sol_red, 'default',), + replay = (sol_green, 'default',), + + # Hex view + offset = (sol_cyan, 'default'), + + # Grid Editor + focusfield = (sol_base0, sol_base02), + focusfield_error = (sol_red, sol_base02), + field_error = (sol_red, 'default'), + editfield = (sol_base1, 'default'), + ) + +palettes = { + "lowlight": LowLight(), + "lowdark": LowDark(), + "light": Light(), + "dark": Dark(), + "solarized_light": SolarizedLight(), + "solarized_dark": SolarizedDark(), } diff --git a/libmproxy/console/pathedit.py b/libmproxy/console/pathedit.py new file mode 100644 index 00000000..53cda3be --- /dev/null +++ b/libmproxy/console/pathedit.py @@ -0,0 +1,69 @@ +import glob +import os.path + +import urwid + + +class _PathCompleter: + def __init__(self, _testing=False): + """ + _testing: disables reloading of the lookup table to make testing + possible. + """ + self.lookup, self.offset = None, None + self.final = None + self._testing = _testing + + def reset(self): + self.lookup = None + self.offset = -1 + + def complete(self, txt): + """ + Returns the next completion for txt, or None if there is no + completion. + """ + path = os.path.expanduser(txt) + if not self.lookup: + if not self._testing: + # Lookup is a set of (display value, actual value) tuples. + self.lookup = [] + if os.path.isdir(path): + files = glob.glob(os.path.join(path, "*")) + prefix = txt + else: + files = glob.glob(path+"*") + prefix = os.path.dirname(txt) + prefix = prefix or "./" + for f in files: + display = os.path.join(prefix, os.path.basename(f)) + if os.path.isdir(f): + display += "/" + self.lookup.append((display, f)) + if not self.lookup: + self.final = path + return path + self.lookup.sort() + self.offset = -1 + self.lookup.append((txt, txt)) + self.offset += 1 + if self.offset >= len(self.lookup): + self.offset = 0 + ret = self.lookup[self.offset] + self.final = ret[1] + return ret[0] + + +class PathEdit(urwid.Edit, _PathCompleter): + def __init__(self, *args, **kwargs): + urwid.Edit.__init__(self, *args, **kwargs) + _PathCompleter.__init__(self) + + def keypress(self, size, key): + if key == "tab": + comp = self.complete(self.get_edit_text()) + self.set_edit_text(comp) + self.set_edit_pos(len(comp)) + else: + self.reset() + return urwid.Edit.keypress(self, size, key) diff --git a/libmproxy/console/signals.py b/libmproxy/console/signals.py new file mode 100644 index 00000000..e4c11f5a --- /dev/null +++ b/libmproxy/console/signals.py @@ -0,0 +1,30 @@ +import blinker + +# Show a status message in the action bar +status_message = blinker.Signal() + +# Prompt for input +status_prompt = blinker.Signal() + +# Prompt for a path +status_prompt_path = blinker.Signal() + +# Prompt for a single keystroke +status_prompt_onekey = blinker.Signal() + +# Call a callback in N seconds +call_in = blinker.Signal() + +# Focus the body, footer or header of the main window +focus = blinker.Signal() + +# Fired when settings change +update_settings = blinker.Signal() + +# Fired when a flow changes +flow_change = blinker.Signal() + + +# Pop and push view state onto a stack +pop_view_state = blinker.Signal() +push_view_state = blinker.Signal() diff --git a/libmproxy/console/statusbar.py b/libmproxy/console/statusbar.py new file mode 100644 index 00000000..7fb15aa6 --- /dev/null +++ b/libmproxy/console/statusbar.py @@ -0,0 +1,254 @@ +import time +import os.path + +import urwid + +from . import pathedit, signals, common +from .. import utils + + +class ActionBar(urwid.WidgetWrap): + def __init__(self): + urwid.WidgetWrap.__init__(self, None) + self.clear() + signals.status_message.connect(self.sig_message) + signals.status_prompt.connect(self.sig_prompt) + signals.status_prompt_path.connect(self.sig_path_prompt) + signals.status_prompt_onekey.connect(self.sig_prompt_onekey) + + self.last_path = "" + + self.prompting = False + self.onekey = False + self.pathprompt = False + + + def sig_message(self, sender, message, expire=None): + w = urwid.Text(message) + self._w = w + if expire: + def cb(*args): + if w == self._w: + self.clear() + signals.call_in.send(seconds=expire, callback=cb) + + def prep_prompt(self, p): + return p.strip() + ": " + + def sig_prompt(self, sender, prompt, text, callback, args=()): + signals.focus.send(self, section="footer") + self._w = urwid.Edit(self.prep_prompt(prompt), text or "") + self.prompting = (callback, args) + + def sig_path_prompt(self, sender, prompt, callback, args=()): + signals.focus.send(self, section="footer") + self._w = pathedit.PathEdit( + self.prep_prompt(prompt), + os.path.dirname(self.last_path) + ) + self.pathprompt = True + self.prompting = (callback, args) + + def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()): + """ + Keys are a set of (word, key) tuples. The appropriate key in the + word is highlighted. + """ + signals.focus.send(self, section="footer") + prompt = [prompt, " ("] + mkup = [] + for i, e in enumerate(keys): + mkup.extend(common.highlight_key(e[0], e[1])) + if i < len(keys)-1: + mkup.append(",") + prompt.extend(mkup) + prompt.append(")? ") + self.onekey = set(i[1] for i in keys) + self._w = urwid.Edit(prompt, "") + self.prompting = (callback, args) + + def selectable(self): + return True + + def keypress(self, size, k): + if self.prompting: + if k == "esc": + self.prompt_done() + elif self.onekey: + if k == "enter": + self.prompt_done() + elif k in self.onekey: + self.prompt_execute(k) + elif k == "enter": + self.prompt_execute(self._w.get_edit_text()) + else: + if common.is_keypress(k): + self._w.keypress(size, k) + else: + return k + + def clear(self): + self._w = urwid.Text("") + + def prompt_done(self): + self.prompting = False + self.onekey = False + self.pathprompt = False + signals.status_message.send(message="") + signals.focus.send(self, section="body") + + def prompt_execute(self, txt): + if self.pathprompt: + self.last_path = txt + p, args = self.prompting + self.prompt_done() + msg = p(txt, *args) + if msg: + signals.status_message.send(message=msg, expire=1) + + +class StatusBar(urwid.WidgetWrap): + def __init__(self, master, helptext): + self.master, self.helptext = master, helptext + self.ab = ActionBar() + self.ib = urwid.WidgetWrap(urwid.Text("")) + self._w = urwid.Pile([self.ib, self.ab]) + signals.update_settings.connect(self.sig_update_settings) + self.redraw() + + def sig_update_settings(self, sender): + self.redraw() + + def keypress(self, *args, **kwargs): + return self.ab.keypress(*args, **kwargs) + + def get_status(self): + r = [] + + if self.master.setheaders.count(): + r.append("[") + r.append(("heading_key", "H")) + r.append("eaders]") + if self.master.replacehooks.count(): + r.append("[") + r.append(("heading_key", "R")) + r.append("eplacing]") + if self.master.client_playback: + r.append("[") + r.append(("heading_key", "cplayback")) + r.append(":%s to go]"%self.master.client_playback.count()) + if self.master.server_playback: + r.append("[") + r.append(("heading_key", "splayback")) + if self.master.nopop: + r.append(":%s in file]"%self.master.server_playback.count()) + else: + r.append(":%s to go]"%self.master.server_playback.count()) + if self.master.get_ignore_filter(): + r.append("[") + r.append(("heading_key", "I")) + r.append("gnore:%d]" % len(self.master.get_ignore_filter())) + if self.master.get_tcp_filter(): + r.append("[") + r.append(("heading_key", "T")) + r.append("CP:%d]" % len(self.master.get_tcp_filter())) + if self.master.state.intercept_txt: + r.append("[") + r.append(("heading_key", "i")) + r.append(":%s]"%self.master.state.intercept_txt) + if self.master.state.limit_txt: + r.append("[") + r.append(("heading_key", "l")) + r.append(":%s]"%self.master.state.limit_txt) + if self.master.stickycookie_txt: + r.append("[") + r.append(("heading_key", "t")) + r.append(":%s]"%self.master.stickycookie_txt) + if self.master.stickyauth_txt: + r.append("[") + r.append(("heading_key", "u")) + r.append(":%s]"%self.master.stickyauth_txt) + if self.master.state.default_body_view.name != "Auto": + r.append("[") + r.append(("heading_key", "M")) + r.append(":%s]"%self.master.state.default_body_view.name) + + opts = [] + if self.master.anticache: + opts.append("anticache") + if self.master.anticomp: + opts.append("anticomp") + if self.master.showhost: + opts.append("showhost") + if not self.master.refresh_server_playback: + opts.append("norefresh") + if self.master.killextra: + opts.append("killextra") + if self.master.server.config.no_upstream_cert: + opts.append("no-upstream-cert") + if self.master.state.follow_focus: + opts.append("following") + if self.master.stream_large_bodies: + opts.append( + "stream:%s" % utils.pretty_size( + self.master.stream_large_bodies.max_size + ) + ) + + if opts: + r.append("[%s]"%(":".join(opts))) + + if self.master.server.config.mode in ["reverse", "upstream"]: + dst = self.master.server.config.mode.dst + scheme = "https" if dst[0] else "http" + if dst[1] != dst[0]: + scheme += "2https" if dst[1] else "http" + r.append("[dest:%s]"%utils.unparse_url(scheme, *dst[2:])) + if self.master.scripts: + r.append("[") + r.append(("heading_key", "s")) + r.append("cripts:%s]"%len(self.master.scripts)) + # r.append("[lt:%0.3f]"%self.master.looptime) + + if self.master.stream: + r.append("[W:%s]"%self.master.stream_path) + + return r + + def redraw(self): + fc = self.master.state.flow_count() + if self.master.state.focus is None: + offset = 0 + else: + offset = min(self.master.state.focus + 1, fc) + t = [ + ('heading', ("[%s/%s]"%(offset, fc)).ljust(9)) + ] + + if self.master.server.bound: + host = self.master.server.address.host + if host == "0.0.0.0": + host = "*" + boundaddr = "[%s:%s]"%(host, self.master.server.address.port) + else: + boundaddr = "" + t.extend(self.get_status()) + status = urwid.AttrWrap(urwid.Columns([ + urwid.Text(t), + urwid.Text( + [ + self.helptext, + boundaddr + ], + align="right" + ), + ]), "heading") + self.ib._w = status + + def update(self, text): + self.helptext = text + self.redraw() + self.master.loop.draw_screen() + + def selectable(self): + return True diff --git a/libmproxy/console/window.py b/libmproxy/console/window.py new file mode 100644 index 00000000..d686f61d --- /dev/null +++ b/libmproxy/console/window.py @@ -0,0 +1,142 @@ +import urwid +from . import common, grideditor, signals, contentview + +class Window(urwid.Frame): + def __init__(self, master, body, header, footer): + urwid.Frame.__init__(self, body, header=header, footer=footer) + self.master = master + signals.focus.connect(self.sig_focus) + + def sig_focus(self, sender, section): + self.focus_position = section + + def keypress(self, size, k): + k = urwid.Frame.keypress(self, self.master.loop.screen_size, k) + if k == "?": + self.master.view_help() + elif k == "c": + if not self.master.client_playback: + signals.status_prompt_path.send( + self, + prompt = "Client replay", + callback = self.master.client_playback_path + ) + else: + signals.status_prompt_onekey.send( + self, + prompt = "Stop current client replay?", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = self.master.stop_client_playback_prompt, + ) + elif k == "H": + self.master.view_grideditor( + grideditor.SetHeadersEditor( + self.master, + self.master.setheaders.get_specs(), + self.master.setheaders.set + ) + ) + elif k == "I": + self.master.view_grideditor( + grideditor.HostPatternEditor( + self.master, + [[x] for x in self.master.get_ignore_filter()], + self.master.edit_ignore_filter + ) + ) + elif k == "T": + self.master.view_grideditor( + grideditor.HostPatternEditor( + self.master, + [[x] for x in self.master.get_tcp_filter()], + self.master.edit_tcp_filter + ) + ) + elif k == "i": + signals.status_prompt.send( + self, + prompt = "Intercept filter", + text = self.master.state.intercept_txt, + callback = self.master.set_intercept + ) + elif k == "Q": + raise urwid.ExitMainLoop + elif k == "q": + signals.status_prompt_onekey.send( + self, + prompt = "Quit", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = self.master.quit, + ) + elif k == "M": + signals.status_prompt_onekey.send( + prompt = "Global default display mode", + keys = contentview.view_prompts, + callback = self.master.change_default_display_mode + ) + elif k == "R": + self.master.view_grideditor( + grideditor.ReplaceEditor( + self.master, + self.master.replacehooks.get_specs(), + self.master.replacehooks.set + ) + ) + elif k == "s": + self.master.view_grideditor( + grideditor.ScriptEditor( + self.master, + [[i.command] for i in self.master.scripts], + self.master.edit_scripts + ) + ) + elif k == "S": + if not self.master.server_playback: + signals.status_prompt_path.send( + self, + prompt = "Server replay path", + callback = self.master.server_playback_path + ) + else: + signals.status_prompt_onekey.send( + self, + prompt = "Stop current server replay?", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = self.master.stop_server_playback_prompt, + ) + elif k == "o": + signals.status_prompt_onekey.send( + prompt = "Options", + keys = ( + ("anticache", "a"), + ("anticomp", "c"), + ("showhost", "h"), + ("killextra", "k"), + ("norefresh", "n"), + ("no-upstream-certs", "u"), + ), + callback = self.master._change_options + ) + elif k == "t": + signals.status_prompt.send( + prompt = "Sticky cookie filter", + text = self.master.stickycookie_txt, + callback = self.master.set_stickycookie + ) + elif k == "u": + signals.status_prompt.send( + prompt = "Sticky auth filter", + text = self.master.stickyauth_txt, + callback = self.master.set_stickyauth + ) + else: + return k |