aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy/console
diff options
context:
space:
mode:
Diffstat (limited to 'libmproxy/console')
-rw-r--r--libmproxy/console/__init__.py718
-rw-r--r--libmproxy/console/common.py135
-rw-r--r--libmproxy/console/contentview.py90
-rw-r--r--libmproxy/console/flowdetailview.py11
-rw-r--r--libmproxy/console/flowlist.py150
-rw-r--r--libmproxy/console/flowview.py413
-rw-r--r--libmproxy/console/grideditor.py117
-rw-r--r--libmproxy/console/help.py21
-rw-r--r--libmproxy/console/palettes.py447
-rw-r--r--libmproxy/console/pathedit.py69
-rw-r--r--libmproxy/console/signals.py30
-rw-r--r--libmproxy/console/statusbar.py254
-rw-r--r--libmproxy/console/window.py142
13 files changed, 1465 insertions, 1132 deletions
diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py
index 0db06832..9375f973 100644
--- a/libmproxy/console/__init__.py
+++ b/libmproxy/console/__init__.py
@@ -1,259 +1,25 @@
from __future__ import absolute_import
-import mailcap, mimetypes, tempfile, os, subprocess, glob, time, shlex, stat
-import os.path, sys, weakref, traceback
-import urwid
-from .. import controller, utils, flow, script, proxy
-from . import flowlist, flowview, help, common, grideditor, palettes, contentview, flowdetailview
-
-EVENTLOG_SIZE = 500
-
-
-class Stop(Exception): pass
-
-
-class _PathCompleter:
- def __init__(self, _testing=False):
- """
- _testing: disables reloading of the lookup table to make testing possible.
- """
- self.lookup, self.offset = None, None
- self.final = None
- self._testing = _testing
- def reset(self):
- self.lookup = None
- self.offset = -1
+import mailcap
+import mimetypes
+import tempfile
+import os
+import os.path
+import shlex
+import signal
+import stat
+import subprocess
+import sys
+import traceback
+import urwid
+import weakref
- def complete(self, txt):
- """
- Returns the next completion for txt, or None if there is no completion.
- """
- path = os.path.expanduser(txt)
- if not self.lookup:
- if not self._testing:
- # Lookup is a set of (display value, actual value) tuples.
- self.lookup = []
- if os.path.isdir(path):
- files = glob.glob(os.path.join(path, "*"))
- prefix = txt
- else:
- files = glob.glob(path+"*")
- prefix = os.path.dirname(txt)
- prefix = prefix or "./"
- for f in files:
- display = os.path.join(prefix, os.path.basename(f))
- if os.path.isdir(f):
- display += "/"
- self.lookup.append((display, f))
- if not self.lookup:
- self.final = path
- return path
- self.lookup.sort()
- self.offset = -1
- self.lookup.append((txt, txt))
- self.offset += 1
- if self.offset >= len(self.lookup):
- self.offset = 0
- ret = self.lookup[self.offset]
- self.final = ret[1]
- return ret[0]
-
-#begin nocover
-
-class PathEdit(urwid.Edit, _PathCompleter):
- def __init__(self, *args, **kwargs):
- urwid.Edit.__init__(self, *args, **kwargs)
- _PathCompleter.__init__(self)
-
- def keypress(self, size, key):
- if key == "tab":
- comp = self.complete(self.get_edit_text())
- self.set_edit_text(comp)
- self.set_edit_pos(len(comp))
- else:
- self.reset()
- return urwid.Edit.keypress(self, size, key)
+from .. import controller, flow, script
+from . import flowlist, flowview, help, common, window, signals
+from . import grideditor, palettes, contentview, flowdetailview, statusbar
+EVENTLOG_SIZE = 500
-class ActionBar(common.WWrap):
- def __init__(self):
- self.message("")
-
- def selectable(self):
- return True
-
- def path_prompt(self, prompt, text):
- self.expire = None
- self.w = PathEdit(prompt, text)
-
- def prompt(self, prompt, text = ""):
- self.expire = None
- # A (partial) workaround for this Urwid issue:
- # https://github.com/Nic0/tyrs/issues/115
- # We can remove it once veryone is beyond 1.0.1
- if isinstance(prompt, basestring):
- prompt = unicode(prompt)
- self.w = urwid.Edit(prompt, text or "")
-
- def message(self, message, expire=None):
- self.expire = expire
- self.w = urwid.Text(message)
-
-
-class StatusBar(common.WWrap):
- def __init__(self, master, helptext):
- self.master, self.helptext = master, helptext
- self.ab = ActionBar()
- self.ib = common.WWrap(urwid.Text(""))
- self.w = urwid.Pile([self.ib, self.ab])
-
- def get_status(self):
- r = []
-
- if self.master.setheaders.count():
- r.append("[")
- r.append(("heading_key", "H"))
- r.append("eaders]")
- if self.master.replacehooks.count():
- r.append("[")
- r.append(("heading_key", "R"))
- r.append("eplacing]")
- if self.master.client_playback:
- r.append("[")
- r.append(("heading_key", "cplayback"))
- r.append(":%s to go]"%self.master.client_playback.count())
- if self.master.server_playback:
- r.append("[")
- r.append(("heading_key", "splayback"))
- if self.master.nopop:
- r.append(":%s in file]"%self.master.server_playback.count())
- else:
- r.append(":%s to go]"%self.master.server_playback.count())
- if self.master.get_ignore_filter():
- r.append("[")
- r.append(("heading_key", "I"))
- r.append("gnore:%d]" % len(self.master.get_ignore_filter()))
- if self.master.get_tcp_filter():
- r.append("[")
- r.append(("heading_key", "T"))
- r.append("CP:%d]" % len(self.master.get_tcp_filter()))
- if self.master.state.intercept_txt:
- r.append("[")
- r.append(("heading_key", "i"))
- r.append(":%s]"%self.master.state.intercept_txt)
- if self.master.state.limit_txt:
- r.append("[")
- r.append(("heading_key", "l"))
- r.append(":%s]"%self.master.state.limit_txt)
- if self.master.stickycookie_txt:
- r.append("[")
- r.append(("heading_key", "t"))
- r.append(":%s]"%self.master.stickycookie_txt)
- if self.master.stickyauth_txt:
- r.append("[")
- r.append(("heading_key", "u"))
- r.append(":%s]"%self.master.stickyauth_txt)
- if self.master.state.default_body_view.name != "Auto":
- r.append("[")
- r.append(("heading_key", "M"))
- r.append(":%s]"%self.master.state.default_body_view.name)
-
- opts = []
- if self.master.anticache:
- opts.append("anticache")
- if self.master.anticomp:
- opts.append("anticomp")
- if self.master.showhost:
- opts.append("showhost")
- if not self.master.refresh_server_playback:
- opts.append("norefresh")
- if self.master.killextra:
- opts.append("killextra")
- if self.master.server.config.no_upstream_cert:
- opts.append("no-upstream-cert")
- if self.master.state.follow_focus:
- opts.append("following")
- if self.master.stream_large_bodies:
- opts.append("stream:%s" % utils.pretty_size(self.master.stream_large_bodies.max_size))
-
- if opts:
- r.append("[%s]"%(":".join(opts)))
-
- if self.master.server.config.mode in ["reverse", "upstream"]:
- dst = self.master.server.config.mode.dst
- scheme = "https" if dst[0] else "http"
- if dst[1] != dst[0]:
- scheme += "2https" if dst[1] else "http"
- r.append("[dest:%s]"%utils.unparse_url(scheme, *dst[2:]))
- if self.master.scripts:
- r.append("[")
- r.append(("heading_key", "s"))
- r.append("cripts:%s]"%len(self.master.scripts))
- # r.append("[lt:%0.3f]"%self.master.looptime)
-
- if self.master.stream:
- r.append("[W:%s]"%self.master.stream_path)
-
- return r
-
- def redraw(self):
- if self.ab.expire and time.time() > self.ab.expire:
- self.message("")
-
- fc = self.master.state.flow_count()
- if self.master.state.focus is None:
- offset = 0
- else:
- offset = min(self.master.state.focus + 1, fc)
- t = [
- ('heading', ("[%s/%s]"%(offset, fc)).ljust(9))
- ]
-
- if self.master.server.bound:
- host = self.master.server.address.host
- if host == "0.0.0.0":
- host = "*"
- boundaddr = "[%s:%s]"%(host, self.master.server.address.port)
- else:
- boundaddr = ""
- t.extend(self.get_status())
- status = urwid.AttrWrap(urwid.Columns([
- urwid.Text(t),
- urwid.Text(
- [
- self.helptext,
- boundaddr
- ],
- align="right"
- ),
- ]), "heading")
- self.ib.set_w(status)
-
- def update(self, text):
- self.helptext = text
- self.redraw()
- self.master.drawscreen()
-
- def selectable(self):
- return True
-
- def get_edit_text(self):
- return self.ab.w.get_edit_text()
-
- def path_prompt(self, prompt, text):
- return self.ab.path_prompt(prompt, text)
-
- def prompt(self, prompt, text = ""):
- self.ab.prompt(prompt, text)
-
- def message(self, msg, expire=None):
- if expire:
- expire = time.time() + float(expire)/1000
- self.ab.message(msg, expire)
- self.master.drawscreen()
-
-
-#end nocover
class ConsoleState(flow.State):
def __init__(self):
@@ -262,13 +28,14 @@ class ConsoleState(flow.State):
self.follow_focus = None
self.default_body_view = contentview.get("Auto")
- self.view_mode = common.VIEW_LIST
self.view_flow_mode = common.VIEW_FLOW_REQUEST
- self.last_script = ""
- self.last_saveload = ""
self.flowsettings = weakref.WeakKeyDictionary()
+ def __setattr__(self, name, value):
+ self.__dict__[name] = value
+ signals.update_settings.send(self)
+
def add_flow_setting(self, flow, key, value):
d = self.flowsettings.setdefault(flow, {})
d[key] = value
@@ -337,7 +104,6 @@ class ConsoleState(flow.State):
super(ConsoleState, self).clear()
-
class Options(object):
attributes = [
"app",
@@ -367,6 +133,7 @@ class Options(object):
"nopop",
"palette",
]
+
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
@@ -375,14 +142,11 @@ class Options(object):
setattr(self, i, None)
-#begin nocover
-
-
class ConsoleMaster(flow.FlowMaster):
palette = []
+
def __init__(self, server, options):
flow.FlowMaster.__init__(self, server, ConsoleState())
- self.looptime = 0
self.stream_path = None
self.options = options
@@ -423,8 +187,6 @@ class ConsoleMaster(flow.FlowMaster):
self.eventlog = options.eventlog
self.eventlist = urwid.SimpleListWalker([])
- self.statusbar = None
-
if options.client_replay:
self.client_playback_path(options.client_replay)
@@ -439,13 +201,37 @@ class ConsoleMaster(flow.FlowMaster):
sys.exit(1)
if options.outfile:
- err = self.start_stream_to_path(options.outfile[0], options.outfile[1])
+ err = self.start_stream_to_path(
+ options.outfile[0],
+ options.outfile[1]
+ )
if err:
print >> sys.stderr, "Stream file error:", err
sys.exit(1)
+ self.view_stack = []
+
if options.app:
self.start_app(self.options.app_host, self.options.app_port)
+ signals.call_in.connect(self.sig_call_in)
+ signals.pop_view_state.connect(self.sig_pop_view_state)
+ signals.push_view_state.connect(self.sig_push_view_state)
+
+ def __setattr__(self, name, value):
+ self.__dict__[name] = value
+ signals.update_settings.send(self)
+
+ def sig_call_in(self, sender, seconds, callback, args=()):
+ def cb(*_):
+ return callback(*args)
+ self.loop.set_alarm_in(seconds, cb)
+
+ def sig_pop_view_state(self, sender):
+ if self.view_stack:
+ self.loop.widget = self.view_stack.pop()
+
+ def sig_push_view_state(self, sender):
+ self.view_stack.append(self.loop.widget)
def start_stream_to_path(self, path, mode="wb"):
path = os.path.expanduser(path)
@@ -472,7 +258,9 @@ class ConsoleMaster(flow.FlowMaster):
try:
s = script.Script(command, self)
except script.ScriptError, v:
- self.statusbar.message("Error loading script.")
+ signals.status_message.send(
+ message = "Error loading script."
+ )
self.add_event("Error loading script:\n%s"%v.args[0], "error")
return
@@ -483,22 +271,20 @@ class ConsoleMaster(flow.FlowMaster):
if f.error:
self._run_script_method("error", s, f)
s.unload()
- self.refresh_flow(f)
- self.state.last_script = command
+ signals.flow_change.send(self, flow = f)
def set_script(self, command):
if not command:
return
ret = self.load_script(command)
if ret:
- self.statusbar.message(ret)
- self.state.last_script = command
+ signals.status_message.send(message=ret)
def toggle_eventlog(self):
self.eventlog = not self.eventlog
self.view_flowlist()
- def _readflow(self, paths):
+ def _readflows(self, path):
"""
Utitility function that reads a list of flows
or prints an error to the UI if that fails.
@@ -507,28 +293,30 @@ class ConsoleMaster(flow.FlowMaster):
- a list of flows, otherwise.
"""
try:
- return flow.read_flows_from_paths(paths)
+ return flow.read_flows_from_paths(path)
except flow.FlowReadError as e:
- if not self.statusbar:
- print >> sys.stderr, e.strerror
- sys.exit(1)
- else:
- self.statusbar.message(e.strerror)
- return None
+ signals.status_message.send(message=e.strerror)
def client_playback_path(self, path):
- flows = self._readflow(path)
+ if not isinstance(path, list):
+ path = [path]
+ flows = self._readflows(path)
if flows:
self.start_client_playback(flows, False)
def server_playback_path(self, path):
- flows = self._readflow(path)
+ if not isinstance(path, list):
+ path = [path]
+ flows = self._readflows(path)
if flows:
self.start_server_playback(
flows,
self.killextra, self.rheaders,
False, self.nopop,
- self.options.replay_ignore_params, self.options.replay_ignore_content, self.options.replay_ignore_payload_params
+ self.options.replay_ignore_params,
+ self.options.replay_ignore_content,
+ self.options.replay_ignore_payload_params,
+ self.options.replay_ignore_host
)
def spawn_editor(self, data):
@@ -545,9 +333,11 @@ class ConsoleMaster(flow.FlowMaster):
try:
subprocess.call(cmd)
except:
- self.statusbar.message("Can't start editor: %s" % " ".join(c))
+ signals.status_message.send(
+ message = "Can't start editor: %s" % " ".join(c)
+ )
else:
- data = open(name,"rb").read()
+ data = open(name, "rb").read()
self.ui.start()
os.unlink(name)
return data
@@ -584,27 +374,32 @@ class ConsoleMaster(flow.FlowMaster):
try:
subprocess.call(cmd, shell=shell)
except:
- self.statusbar.message("Can't start external viewer: %s" % " ".join(c))
+ signals.status_message.send(
+ message="Can't start external viewer: %s" % " ".join(c)
+ )
self.ui.start()
os.unlink(name)
def set_palette(self, name):
self.palette = palettes.palettes[name]
+ def ticker(self, *userdata):
+ changed = self.tick(self.masterq, timeout=0)
+ if changed:
+ self.loop.draw_screen()
+ signals.update_settings.send()
+ self.loop.set_alarm_in(0.01, self.ticker)
+
def run(self):
self.ui = urwid.raw_display.Screen()
self.ui.set_terminal_properties(256)
- self.ui.register_palette(self.palette)
+ self.ui.register_palette(self.palette.palette())
self.flow_list_walker = flowlist.FlowListWalker(self, self.state)
- self.view = None
- self.statusbar = None
- self.header = None
- self.body = None
self.help_context = None
- self.prompting = False
- self.onekey = False
-
- self.view_flowlist()
+ self.loop = urwid.MainLoop(
+ urwid.SolidFill("x"),
+ screen = self.ui,
+ )
self.server.start_slave(
controller.Slave,
@@ -624,46 +419,60 @@ class ConsoleMaster(flow.FlowMaster):
print >> sys.stderr, "Could not load file:", ret
sys.exit(1)
+ self.loop.set_alarm_in(0.01, self.ticker)
+
+ # It's not clear why we need to handle this explicitly - without this,
+ # mitmproxy hangs on keyboard interrupt. Remove if we ever figure it
+ # out.
+ def exit(s, f):
+ raise urwid.ExitMainLoop
+ signal.signal(signal.SIGINT, exit)
+
+ self.loop.set_alarm_in(
+ 0.0001,
+ lambda *args: self.view_flowlist()
+ )
+
try:
- self.ui.run_wrapper(self.loop)
+ self.loop.run()
except Exception:
- self.ui.stop()
+ self.loop.stop()
sys.stdout.flush()
print >> sys.stderr, traceback.format_exc()
print >> sys.stderr, "mitmproxy has crashed!"
- print >> sys.stderr, "Please lodge a bug report at: https://github.com/mitmproxy/mitmproxy"
- print >> sys.stderr, "Shutting down..."
+ print >> sys.stderr, "Please lodge a bug report at:"
+ print >> sys.stderr, "\thttps://github.com/mitmproxy/mitmproxy"
+ print >> sys.stderr, "Shutting down..."
sys.stderr.flush()
self.shutdown()
- def make_view(self):
- self.view = urwid.Frame(
- self.body,
- header = self.header,
- footer = self.statusbar
- )
- self.view.set_focus("body")
-
def view_help(self):
- h = help.HelpView(self, self.help_context, (self.statusbar, self.body, self.header))
- self.statusbar = StatusBar(self, help.footer)
- self.body = h
- self.header = None
- self.make_view()
+ signals.push_view_state.send(self)
+ self.loop.widget = window.Window(
+ self,
+ help.HelpView(self.help_context),
+ None,
+ statusbar.StatusBar(self, help.footer)
+ )
def view_flowdetails(self, flow):
- h = flowdetailview.FlowDetailsView(self, flow, (self.statusbar, self.body, self.header))
- self.statusbar = StatusBar(self, flowdetailview.footer)
- self.body = h
- self.header = None
- self.make_view()
+ signals.push_view_state.send(self)
+ self.loop.widget = window.Window(
+ self,
+ flowdetailview.FlowDetailsView(low),
+ None,
+ statusbar.StatusBar(self, flowdetailview.footer)
+ )
def view_grideditor(self, ge):
- self.body = ge
- self.header = None
+ signals.push_view_state.send(self)
self.help_context = ge.make_help()
- self.statusbar = StatusBar(self, grideditor.footer)
- self.make_view()
+ self.loop.widget = window.Window(
+ self,
+ ge,
+ None,
+ statusbar.StatusBar(self, grideditor.FOOTER)
+ )
def view_flowlist(self):
if self.ui.started:
@@ -672,27 +481,31 @@ class ConsoleMaster(flow.FlowMaster):
self.state.set_focus(self.state.flow_count())
if self.eventlog:
- self.body = flowlist.BodyPile(self)
+ body = flowlist.BodyPile(self)
else:
- self.body = flowlist.FlowListBox(self)
- self.statusbar = StatusBar(self, flowlist.footer)
- self.header = None
- self.state.view_mode = common.VIEW_LIST
+ body = flowlist.FlowListBox(self)
- self.make_view()
self.help_context = flowlist.help_context
+ self.loop.widget = window.Window(
+ self,
+ body,
+ None,
+ statusbar.StatusBar(self, flowlist.footer)
+ )
+ self.loop.draw_screen()
def view_flow(self, flow):
- self.body = flowview.FlowView(self, self.state, flow)
- self.header = flowview.FlowViewHeader(self, flow)
- self.statusbar = StatusBar(self, flowview.footer)
+ signals.push_view_state.send(self)
self.state.set_focus_flow(flow)
- self.state.view_mode = common.VIEW_FLOW
- self.make_view()
self.help_context = flowview.help_context
+ self.loop.widget = window.Window(
+ self,
+ flowview.FlowView(self, self.state, flow),
+ flowview.FlowViewHeader(self, flow),
+ statusbar.StatusBar(self, flowview.footer)
+ )
def _write_flows(self, path, flows):
- self.state.last_saveload = path
if not path:
return
path = os.path.expanduser(path)
@@ -703,7 +516,7 @@ class ConsoleMaster(flow.FlowMaster):
fw.add(i)
f.close()
except IOError, v:
- self.statusbar.message(v.strerror)
+ signals.status_message.send(message=v.strerror)
def save_one_flow(self, path, flow):
return self._write_flows(path, [flow])
@@ -718,7 +531,6 @@ class ConsoleMaster(flow.FlowMaster):
return ret or "Flows loaded from %s"%path
def load_flows_path(self, path):
- self.state.last_saveload = path
reterr = None
try:
flow.FlowMaster.load_flows_file(self, path)
@@ -728,55 +540,6 @@ class ConsoleMaster(flow.FlowMaster):
self.sync_list_view()
return reterr
- def path_prompt(self, prompt, text, callback, *args):
- self.statusbar.path_prompt(prompt, text)
- self.view.set_focus("footer")
- self.prompting = (callback, args)
-
- def prompt(self, prompt, text, callback, *args):
- self.statusbar.prompt(prompt, text)
- self.view.set_focus("footer")
- self.prompting = (callback, args)
-
- def prompt_edit(self, prompt, text, callback):
- self.statusbar.prompt(prompt + ": ", text)
- self.view.set_focus("footer")
- self.prompting = (callback, [])
-
- def prompt_onekey(self, prompt, keys, callback, *args):
- """
- Keys are a set of (word, key) tuples. The appropriate key in the
- word is highlighted.
- """
- prompt = [prompt, " ("]
- mkup = []
- for i, e in enumerate(keys):
- mkup.extend(common.highlight_key(e[0], e[1]))
- if i < len(keys)-1:
- mkup.append(",")
- prompt.extend(mkup)
- prompt.append(")? ")
- self.onekey = "".join(i[1] for i in keys)
- self.prompt(prompt, "", callback, *args)
-
- def prompt_done(self):
- self.prompting = False
- self.onekey = False
- self.view.set_focus("body")
- self.statusbar.message("")
-
- def prompt_execute(self, txt=None):
- if not txt:
- txt = self.statusbar.get_edit_text()
- p, args = self.prompting
- self.prompt_done()
- msg = p(txt, *args)
- if msg:
- self.statusbar.message(msg, 1000)
-
- def prompt_cancel(self):
- self.prompt_done()
-
def accept_all(self):
self.state.accept_all(self)
@@ -793,18 +556,6 @@ class ConsoleMaster(flow.FlowMaster):
self.state.default_body_view = v
self.refresh_focus()
- def drawscreen(self):
- size = self.ui.get_cols_rows()
- canvas = self.view.render(size, focus=1)
- self.ui.draw_screen(size, canvas)
- return size
-
- def pop_view(self):
- if self.state.view_mode == common.VIEW_FLOW:
- self.view_flow(self.state.view[self.state.focus])
- else:
- self.view_flowlist()
-
def edit_scripts(self, scripts):
commands = [x[0] for x in scripts] # remove outer array
if commands == [s.command for s in self.scripts]:
@@ -822,170 +573,6 @@ class ConsoleMaster(flow.FlowMaster):
patterns = (x[0] for x in tcp)
self.set_tcp_filter(patterns)
- def loop(self):
- changed = True
- try:
- while not self.should_exit.is_set():
- startloop = time.time()
- if changed:
- self.statusbar.redraw()
- size = self.drawscreen()
- changed = self.tick(self.masterq, timeout=0.1)
- self.ui.set_input_timeouts(max_wait=0)
- keys = self.ui.get_input()
- if keys:
- changed = True
- for k in keys:
- if self.prompting:
- if k == "esc":
- self.prompt_cancel()
- elif self.onekey:
- if k == "enter":
- self.prompt_cancel()
- elif k in self.onekey:
- self.prompt_execute(k)
- elif k == "enter":
- self.prompt_execute()
- else:
- self.view.keypress(size, k)
- else:
- k = self.view.keypress(size, k)
- if k:
- self.statusbar.message("")
- if k == "?":
- self.view_help()
- elif k == "c":
- if not self.client_playback:
- self.path_prompt(
- "Client replay: ",
- self.state.last_saveload,
- self.client_playback_path
- )
- else:
- self.prompt_onekey(
- "Stop current client replay?",
- (
- ("yes", "y"),
- ("no", "n"),
- ),
- self.stop_client_playback_prompt,
- )
- elif k == "H":
- self.view_grideditor(
- grideditor.SetHeadersEditor(
- self,
- self.setheaders.get_specs(),
- self.setheaders.set
- )
- )
- elif k == "I":
- self.view_grideditor(
- grideditor.HostPatternEditor(
- self,
- [[x] for x in self.get_ignore_filter()],
- self.edit_ignore_filter
- )
- )
- elif k == "T":
- self.view_grideditor(
- grideditor.HostPatternEditor(
- self,
- [[x] for x in self.get_tcp_filter()],
- self.edit_tcp_filter
- )
- )
- elif k == "i":
- self.prompt(
- "Intercept filter: ",
- self.state.intercept_txt,
- self.set_intercept
- )
- elif k == "Q":
- raise Stop
- elif k == "q":
- self.prompt_onekey(
- "Quit",
- (
- ("yes", "y"),
- ("no", "n"),
- ),
- self.quit,
- )
- elif k == "M":
- self.prompt_onekey(
- "Global default display mode",
- contentview.view_prompts,
- self.change_default_display_mode
- )
- elif k == "R":
- self.view_grideditor(
- grideditor.ReplaceEditor(
- self,
- self.replacehooks.get_specs(),
- self.replacehooks.set
- )
- )
- elif k == "s":
- self.view_grideditor(
- grideditor.ScriptEditor(
- self,
- [[i.command] for i in self.scripts],
- self.edit_scripts
- )
- )
- #if self.scripts:
- # self.load_script(None)
- #else:
- # self.path_prompt(
- # "Set script: ",
- # self.state.last_script,
- # self.set_script
- # )
- elif k == "S":
- if not self.server_playback:
- self.path_prompt(
- "Server replay path: ",
- self.state.last_saveload,
- self.server_playback_path
- )
- else:
- self.prompt_onekey(
- "Stop current server replay?",
- (
- ("yes", "y"),
- ("no", "n"),
- ),
- self.stop_server_playback_prompt,
- )
- elif k == "o":
- self.prompt_onekey(
- "Options",
- (
- ("anticache", "a"),
- ("anticomp", "c"),
- ("showhost", "h"),
- ("killextra", "k"),
- ("norefresh", "n"),
- ("no-upstream-certs", "u"),
- ),
- self._change_options
- )
- elif k == "t":
- self.prompt(
- "Sticky cookie filter: ",
- self.stickycookie_txt,
- self.set_stickycookie
- )
- elif k == "u":
- self.prompt(
- "Sticky auth filter: ",
- self.stickyauth_txt,
- self.set_stickyauth
- )
- self.looptime = time.time() - startloop
- except (Stop, KeyboardInterrupt):
- pass
-
def stop_client_playback_prompt(self, a):
if a != "n":
self.stop_client_playback()
@@ -996,7 +583,7 @@ class ConsoleMaster(flow.FlowMaster):
def quit(self, a):
if a != "n":
- raise Stop
+ raise urwid.ExitMainLoop
def _change_options(self, a):
if a == "a":
@@ -1012,7 +599,9 @@ class ConsoleMaster(flow.FlowMaster):
elif a == "n":
self.refresh_server_playback = not self.refresh_server_playback
elif a == "u":
- self.server.config.no_upstream_cert = not self.server.config.no_upstream_cert
+ self.server.config.no_upstream_cert =\
+ not self.server.config.no_upstream_cert
+ signals.update_settings.send(self)
def shutdown(self):
self.state.killall(self)
@@ -1039,15 +628,10 @@ class ConsoleMaster(flow.FlowMaster):
def refresh_focus(self):
if self.state.view:
- self.refresh_flow(self.state.view[self.state.focus])
-
- def refresh_flow(self, c):
- if hasattr(self.header, "refresh_flow"):
- self.header.refresh_flow(c)
- if hasattr(self.body, "refresh_flow"):
- self.body.refresh_flow(c)
- if hasattr(self.statusbar, "refresh_flow"):
- self.statusbar.refresh_flow(c)
+ signals.flow_change.send(
+ self,
+ flow = self.state.view[self.state.focus]
+ )
def process_flow(self, f):
if self.state.intercept and f.match(self.state.intercept) and not f.request.is_replay:
@@ -1055,7 +639,7 @@ class ConsoleMaster(flow.FlowMaster):
else:
f.reply()
self.sync_list_view()
- self.refresh_flow(f)
+ signals.flow_change.send(self, flow = f)
def clear_events(self):
self.eventlist[:] = []
diff --git a/libmproxy/console/common.py b/libmproxy/console/common.py
index fa21c93e..bc8a2aad 100644
--- a/libmproxy/console/common.py
+++ b/libmproxy/console/common.py
@@ -1,18 +1,18 @@
from __future__ import absolute_import
+
import urwid
import urwid.util
import os
+
from .. import utils
from ..protocol.http import CONTENT_MISSING, decoded
+from . import signals
try:
import pyperclip
except:
pyperclip = False
-VIEW_LIST = 0
-VIEW_FLOW = 1
-
VIEW_FLOW_REQUEST = 0
VIEW_FLOW_RESPONSE = 1
@@ -29,6 +29,14 @@ METHOD_OPTIONS = [
]
+def is_keypress(k):
+ """
+ Is this input event a keypress?
+ """
+ if isinstance(k, basestring):
+ return True
+
+
def highlight_key(s, k):
l = []
parts = s.split(k, 1)
@@ -41,6 +49,8 @@ def highlight_key(s, k):
KEY_MAX = 30
+
+
def format_keyvals(lst, key="key", val="text", indent=0):
"""
Format a list of (key, value) tuples.
@@ -103,10 +113,8 @@ else:
SYMBOL_RETURN = u"<-"
-
def raw_format_flow(f, focus, extended, padding):
f = dict(f)
-
pile = []
req = []
if extended:
@@ -122,7 +130,7 @@ def raw_format_flow(f, focus, extended, padding):
req.append(fcol(SYMBOL_REPLAY, "replay"))
req.append(fcol(f["req_method"], "method"))
- preamble = sum(i[1] for i in req) + len(req) -1
+ preamble = sum(i[1] for i in req) + len(req) - 1
if f["intercepted"] and not f["acked"]:
uc = "intercept"
@@ -182,23 +190,19 @@ def raw_format_flow(f, focus, extended, padding):
def save_data(path, data, master, state):
if not path:
return
- state.last_saveload = path
path = os.path.expanduser(path)
try:
with file(path, "wb") as f:
f.write(data)
except IOError, v:
- master.statusbar.message(v.strerror)
+ signals.status_message.send(message=v.strerror)
def ask_save_path(prompt, data, master, state):
- master.path_prompt(
- prompt,
- state.last_saveload,
- save_data,
- data,
- master,
- state
+ signals.status_prompt_path.send(
+ prompt = prompt,
+ callback = save_data,
+ args = (data, master, state)
)
@@ -238,28 +242,27 @@ def copy_flow(part, scope, flow, master, state):
if not data:
if scope == "q":
- master.statusbar.message("No request content to copy.")
+ signals.status_message.send(message="No request content to copy.")
elif scope == "s":
- master.statusbar.message("No response content to copy.")
+ signals.status_message.send(message="No response content to copy.")
else:
- master.statusbar.message("No contents to copy.")
+ signals.status_message.send(message="No contents to copy.")
return
try:
master.add_event(str(len(data)))
pyperclip.copy(data)
- except RuntimeError:
+ except (RuntimeError, UnicodeDecodeError):
def save(k):
if k == "y":
- ask_save_path("Save data: ", data, master, state)
-
- master.prompt_onekey(
- "Cannot copy binary data to clipboard. Save as file?",
- (
+ ask_save_path("Save data", data, master, state)
+ signals.status_prompt_onekey.send(
+ prompt = "Cannot copy binary data to clipboard. Save as file?",
+ keys = (
("yes", "y"),
("no", "n"),
),
- save
+ callback = save
)
@@ -271,39 +274,35 @@ def ask_copy_part(scope, flow, master, state):
if scope != "s":
choices.append(("url", "u"))
- master.prompt_onekey(
- "Copy",
- choices,
- copy_flow,
- scope,
- flow,
- master,
- state
+ signals.status_prompt_onekey.send(
+ prompt = "Copy",
+ keys = choices,
+ callback = copy_flow,
+ args = (scope, flow, master, state)
)
def ask_save_body(part, master, state, flow):
"""
- Save either the request or the response body to disk.
- part can either be "q" (request), "s" (response) or None (ask user if necessary).
+ Save either the request or the response body to disk. part can either be
+ "q" (request), "s" (response) or None (ask user if necessary).
"""
request_has_content = flow.request and flow.request.content
response_has_content = flow.response and flow.response.content
if part is None:
- # We first need to determine whether we want to save the request or the response content.
+ # We first need to determine whether we want to save the request or the
+ # response content.
if request_has_content and response_has_content:
- master.prompt_onekey(
- "Save",
- (
+ signals.status_prompt_onekey.send(
+ prompt = "Save",
+ keys = (
("request", "q"),
("response", "s"),
),
- ask_save_body,
- master,
- state,
- flow
+ callback = ask_save_body,
+ args = (master, state, flow)
)
elif response_has_content:
ask_save_body("s", master, state, flow)
@@ -311,18 +310,24 @@ def ask_save_body(part, master, state, flow):
ask_save_body("q", master, state, flow)
elif part == "q" and request_has_content:
- ask_save_path("Save request content: ", flow.request.get_decoded_content(), master, state)
+ ask_save_path(
+ "Save request content",
+ flow.request.get_decoded_content(),
+ master,
+ state
+ )
elif part == "s" and response_has_content:
- ask_save_path("Save response content: ", flow.response.get_decoded_content(), master, state)
+ ask_save_path(
+ "Save response content",
+ flow.response.get_decoded_content(),
+ master,
+ state
+ )
else:
- master.statusbar.message("No content to save.")
+ signals.status_message.send(message="No content to save.")
-class FlowCache:
- @utils.LRUCache(200)
- def format_flow(self, *args):
- return raw_format_flow(*args)
-flowcache = FlowCache()
+flowcache = utils.LRUCache(800)
def format_flow(f, focus, extended=False, hostheader=False, padding=2):
@@ -348,7 +353,6 @@ def format_flow(f, focus, extended=False, hostheader=False, padding=2):
duration = 0
if f.response.timestamp_end and f.request.timestamp_start:
duration = f.response.timestamp_end - f.request.timestamp_start
- size = f.response.size()
roundtrip = utils.pretty_duration(duration)
d.update(dict(
@@ -362,26 +366,7 @@ def format_flow(f, focus, extended=False, hostheader=False, padding=2):
d["resp_ctype"] = t[0].split(";")[0]
else:
d["resp_ctype"] = ""
- return flowcache.format_flow(tuple(sorted(d.items())), focus, extended, padding)
-
-
-def int_version(v):
- SIG = 3
- v = urwid.__version__.split("-")[0].split(".")
- x = 0
- for i in range(min(SIG, len(v))):
- x += int(v[i]) * 10**(SIG-i)
- return x
-
-
-# We have to do this to be portable over 0.9.8 and 0.9.9 If compatibility
-# becomes a pain to maintain, we'll just mandate 0.9.9 or newer.
-class WWrap(urwid.WidgetWrap):
- if int_version(urwid.__version__) >= 990:
- def set_w(self, x):
- self._w = x
- def get_w(self):
- return self._w
- w = property(get_w, set_w)
-
-
+ return flowcache.get(
+ raw_format_flow,
+ tuple(sorted(d.items())), focus, extended, padding
+ )
diff --git a/libmproxy/console/contentview.py b/libmproxy/console/contentview.py
index 84e9946d..12ed5b64 100644
--- a/libmproxy/console/contentview.py
+++ b/libmproxy/console/contentview.py
@@ -1,14 +1,23 @@
from __future__ import absolute_import
-import logging, subprocess, re, cStringIO, traceback, json, urwid
+import cStringIO
+import json
+import logging
+import lxml.html
+import lxml.etree
from PIL import Image
from PIL.ExifTags import TAGS
+import re
+import subprocess
+import traceback
+import urwid
-import lxml.html, lxml.etree
import netlib.utils
+
from . import common
from .. import utils, encoding, flow
from ..contrib import jsbeautifier, html2text
from ..contrib.wbxml.ASCommandResponse import ASCommandResponse
+
try:
import pyamf
from pyamf import remoting, flex
@@ -62,6 +71,7 @@ class ViewAuto:
name = "Auto"
prompt = ("auto", "a")
content_types = []
+
def __call__(self, hdrs, content, limit):
ctype = hdrs.get_first("content-type")
if ctype:
@@ -78,6 +88,7 @@ class ViewRaw:
name = "Raw"
prompt = ("raw", "r")
content_types = []
+
def __call__(self, hdrs, content, limit):
txt = _view_text(content[:limit], len(content), limit)
return "Raw", txt
@@ -87,6 +98,7 @@ class ViewHex:
name = "Hex"
prompt = ("hex", "e")
content_types = []
+
def __call__(self, hdrs, content, limit):
txt = []
for offset, hexa, s in netlib.utils.hexdump(content[:limit]):
@@ -105,8 +117,14 @@ class ViewXML:
name = "XML"
prompt = ("xml", "x")
content_types = ["text/xml"]
+
def __call__(self, hdrs, content, limit):
- parser = lxml.etree.XMLParser(remove_blank_text=True, resolve_entities=False, strip_cdata=False, recover=False)
+ parser = lxml.etree.XMLParser(
+ remove_blank_text=True,
+ resolve_entities=False,
+ strip_cdata=False,
+ recover=False
+ )
try:
document = lxml.etree.fromstring(content, parser)
except lxml.etree.XMLSyntaxError:
@@ -121,18 +139,18 @@ class ViewXML:
lxml.etree.tostring(p)
)
p = p.getprevious()
- doctype=docinfo.doctype
+ doctype = docinfo.doctype
if prev:
doctype += "\n".join(prev).strip()
doctype = doctype.strip()
s = lxml.etree.tostring(
- document,
- pretty_print=True,
- xml_declaration=True,
- doctype=doctype or None,
- encoding = docinfo.encoding
- )
+ document,
+ pretty_print=True,
+ xml_declaration=True,
+ doctype=doctype or None,
+ encoding = docinfo.encoding
+ )
txt = []
for i in s[:limit].strip().split("\n"):
@@ -147,6 +165,7 @@ class ViewJSON:
name = "JSON"
prompt = ("json", "s")
content_types = ["application/json"]
+
def __call__(self, hdrs, content, limit):
lines = utils.pretty_json(content)
if lines:
@@ -167,12 +186,20 @@ class ViewHTML:
name = "HTML"
prompt = ("html", "h")
content_types = ["text/html"]
+
def __call__(self, hdrs, content, limit):
if utils.isXML(content):
- parser = lxml.etree.HTMLParser(strip_cdata=True, remove_blank_text=True)
+ parser = lxml.etree.HTMLParser(
+ strip_cdata=True,
+ remove_blank_text=True
+ )
d = lxml.html.fromstring(content, parser=parser)
docinfo = d.getroottree().docinfo
- s = lxml.etree.tostring(d, pretty_print=True, doctype=docinfo.doctype)
+ s = lxml.etree.tostring(
+ d,
+ pretty_print=True,
+ doctype=docinfo.doctype
+ )
return "HTML", _view_text(s[:limit], len(s), limit)
@@ -180,6 +207,7 @@ class ViewHTMLOutline:
name = "HTML Outline"
prompt = ("html outline", "o")
content_types = ["text/html"]
+
def __call__(self, hdrs, content, limit):
content = content.decode("utf-8")
h = html2text.HTML2Text(baseurl="")
@@ -194,14 +222,15 @@ class ViewURLEncoded:
name = "URL-encoded"
prompt = ("urlencoded", "u")
content_types = ["application/x-www-form-urlencoded"]
+
def __call__(self, hdrs, content, limit):
lines = utils.urldecode(content)
if lines:
body = common.format_keyvals(
- [(k+":", v) for (k, v) in lines],
- key = "header",
- val = "text"
- )
+ [(k+":", v) for (k, v) in lines],
+ key = "header",
+ val = "text"
+ )
return "URLEncoded form", body
@@ -209,6 +238,7 @@ class ViewMultipart:
name = "Multipart Form"
prompt = ("multipart", "m")
content_types = ["multipart/form-data"]
+
def __call__(self, hdrs, content, limit):
v = utils.multipartdecode(hdrs, content)
if v:
@@ -302,12 +332,14 @@ class ViewJavaScript:
"application/javascript",
"text/javascript"
]
+
def __call__(self, hdrs, content, limit):
opts = jsbeautifier.default_options()
opts.indent_size = 2
res = jsbeautifier.beautify(content[:limit], opts)
return "JavaScript", _view_text(res, len(res), limit)
+
class ViewCSS:
name = "CSS"
prompt = ("css", "c")
@@ -335,6 +367,7 @@ class ViewImage:
"image/vnd.microsoft.icon",
"image/x-icon",
]
+
def __call__(self, hdrs, content, limit):
try:
img = Image.open(cStringIO.StringIO(content))
@@ -360,14 +393,17 @@ class ViewImage:
)
clean = []
for i in parts:
- clean.append([netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])])
- fmt = common.format_keyvals(
- clean,
- key = "header",
- val = "text"
+ clean.append(
+ [netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])]
)
+ fmt = common.format_keyvals(
+ clean,
+ key = "header",
+ val = "text"
+ )
return "%s image"%img.format, fmt
+
class ViewProtobuf:
"""Human friendly view of protocol buffers
The view uses the protoc compiler to decode the binary
@@ -383,7 +419,10 @@ class ViewProtobuf:
@staticmethod
def is_available():
try:
- p = subprocess.Popen(["protoc", "--version"], stdout=subprocess.PIPE)
+ p = subprocess.Popen(
+ ["protoc", "--version"],
+ stdout=subprocess.PIPE
+ )
out, _ = p.communicate()
return out.startswith("libprotoc")
except:
@@ -407,6 +446,7 @@ class ViewProtobuf:
txt = _view_text(decoded[:limit], len(decoded), limit)
return "Protobuf", txt
+
class ViewWBXML:
name = "WBXML"
prompt = ("wbxml", "w")
@@ -416,14 +456,14 @@ class ViewWBXML:
]
def __call__(self, hdrs, content, limit):
-
+
try:
parser = ASCommandResponse(content)
parsedContent = parser.xmlString
txt = _view_text(parsedContent, len(parsedContent), limit)
return "WBXML", txt
except:
- return None
+ return None
views = [
ViewAuto(),
@@ -492,7 +532,7 @@ def get_content_view(viewmode, hdrItems, content, limit, logfunc, is_request):
# Third-party viewers can fail in unexpected ways...
except Exception:
s = traceback.format_exc()
- s = "Content viewer failed: \n" + s
+ s = "Content viewer failed: \n" + s
logfunc(s, "error")
ret = None
if not ret:
diff --git a/libmproxy/console/flowdetailview.py b/libmproxy/console/flowdetailview.py
index 4164c416..8bfdae4a 100644
--- a/libmproxy/console/flowdetailview.py
+++ b/libmproxy/console/flowdetailview.py
@@ -1,6 +1,6 @@
from __future__ import absolute_import
import urwid
-from . import common
+from . import common, signals
from .. import utils
footer = [
@@ -8,8 +8,8 @@ footer = [
]
class FlowDetailsView(urwid.ListBox):
- def __init__(self, master, flow, state):
- self.master, self.flow, self.state = master, flow, state
+ def __init__(self, flow):
+ self.flow = flow
urwid.ListBox.__init__(
self,
self.flowtext()
@@ -18,10 +18,7 @@ class FlowDetailsView(urwid.ListBox):
def keypress(self, size, key):
key = common.shortcuts(key)
if key == "q":
- self.master.statusbar = self.state[0]
- self.master.body = self.state[1]
- self.master.header = self.state[2]
- self.master.make_view()
+ signals.pop_view_state.send(self)
return None
elif key == "?":
key = None
diff --git a/libmproxy/console/flowlist.py b/libmproxy/console/flowlist.py
index 9e7c6d69..946bd97b 100644
--- a/libmproxy/console/flowlist.py
+++ b/libmproxy/console/flowlist.py
@@ -1,7 +1,8 @@
from __future__ import absolute_import
import urwid
from netlib import http
-from . import common
+from . import common, signals
+
def _mkhelp():
text = []
@@ -35,6 +36,7 @@ footer = [
('heading_key', "?"), ":help ",
]
+
class EventListBox(urwid.ListBox):
def __init__(self, master):
self.master = master
@@ -60,7 +62,10 @@ class BodyPile(urwid.Pile):
self,
[
FlowListBox(master),
- urwid.Frame(EventListBox(master), header = self.inactive_header)
+ urwid.Frame(
+ EventListBox(master),
+ header = self.inactive_header
+ )
]
)
self.master = master
@@ -80,39 +85,41 @@ class BodyPile(urwid.Pile):
# This is essentially a copypasta from urwid.Pile's keypress handler.
# So much for "closed for modification, but open for extension".
item_rows = None
- if len(size)==2:
- item_rows = self.get_item_rows( size, focus=True )
+ if len(size) == 2:
+ item_rows = self.get_item_rows(size, focus = True)
i = self.widget_list.index(self.focus_item)
- tsize = self.get_item_size(size,i,True,item_rows)
- return self.focus_item.keypress( tsize, key )
+ tsize = self.get_item_size(size, i, True, item_rows)
+ return self.focus_item.keypress(tsize, key)
-class ConnectionItem(common.WWrap):
+class ConnectionItem(urwid.WidgetWrap):
def __init__(self, master, state, flow, focus):
self.master, self.state, self.flow = master, state, flow
self.f = focus
w = self.get_text()
- common.WWrap.__init__(self, w)
+ urwid.WidgetWrap.__init__(self, w)
def get_text(self):
- return common.format_flow(self.flow, self.f, hostheader=self.master.showhost)
+ return common.format_flow(
+ self.flow,
+ self.f,
+ hostheader = self.master.showhost
+ )
def selectable(self):
return True
def save_flows_prompt(self, k):
if k == "a":
- self.master.path_prompt(
- "Save all flows to: ",
- self.state.last_saveload,
- self.master.save_flows
+ signals.status_prompt_path.send(
+ prompt = "Save all flows to",
+ callback = self.master.save_flows
)
else:
- self.master.path_prompt(
- "Save this flow to: ",
- self.state.last_saveload,
- self.master.save_one_flow,
- self.flow
+ signals.status_prompt_path.send(
+ prompt = "Save this flow to",
+ callback = self.master.save_one_flow,
+ args = (self.flow,)
)
def stop_server_playback_prompt(self, a):
@@ -125,22 +132,25 @@ class ConnectionItem(common.WWrap):
[i.copy() for i in self.master.state.view],
self.master.killextra, self.master.rheaders,
False, self.master.nopop,
- self.master.options.replay_ignore_params, self.master.options.replay_ignore_content,
- self.master.options.replay_ignore_payload_params
+ self.master.options.replay_ignore_params,
+ self.master.options.replay_ignore_content,
+ self.master.options.replay_ignore_payload_params,
+ self.master.options.replay_ignore_host
)
elif k == "t":
self.master.start_server_playback(
[self.flow.copy()],
self.master.killextra, self.master.rheaders,
False, self.master.nopop,
- self.master.options.replay_ignore_params, self.master.options.replay_ignore_content,
- self.master.options.replay_ignore_payload_params
+ self.master.options.replay_ignore_params,
+ self.master.options.replay_ignore_content,
+ self.master.options.replay_ignore_payload_params,
+ self.master.options.replay_ignore_host
)
else:
- self.master.path_prompt(
- "Server replay path: ",
- self.state.last_saveload,
- self.master.server_playback_path
+ signals.status_prompt_path.send(
+ prompt = "Server replay path",
+ callback = self.master.server_playback_path
)
def keypress(self, (maxcol,), key):
@@ -158,43 +168,44 @@ class ConnectionItem(common.WWrap):
elif key == "r":
r = self.master.replay_request(self.flow)
if r:
- self.master.statusbar.message(r)
+ signals.status_message.send(message=r)
self.master.sync_list_view()
elif key == "S":
if not self.master.server_playback:
- self.master.prompt_onekey(
- "Server Replay",
- (
+ signals.status_prompt_onekey.send(
+ prompt = "Server Replay",
+ keys = (
("all flows", "a"),
("this flow", "t"),
("file", "f"),
),
- self.server_replay_prompt,
+ callback = self.server_replay_prompt,
)
else:
- self.master.prompt_onekey(
- "Stop current server replay?",
- (
+ signals.status_prompt_onekey.send(
+ prompt = "Stop current server replay?",
+ keys = (
("yes", "y"),
("no", "n"),
),
- self.stop_server_playback_prompt,
+ callback = self.stop_server_playback_prompt,
)
elif key == "V":
if not self.flow.modified():
- self.master.statusbar.message("Flow not modified.")
+ signals.status_message.send(message="Flow not modified.")
return
self.state.revert(self.flow)
self.master.sync_list_view()
- self.master.statusbar.message("Reverted.")
+ signals.status_message.send(message="Reverted.")
elif key == "w":
- self.master.prompt_onekey(
- "Save",
- (
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Save",
+ keys = (
("all flows", "a"),
("this flow", "t"),
),
- self.save_flows_prompt,
+ callback = self.save_flows_prompt,
)
elif key == "X":
self.flow.kill(self.master)
@@ -202,11 +213,10 @@ class ConnectionItem(common.WWrap):
if self.flow.request:
self.master.view_flow(self.flow)
elif key == "|":
- self.master.path_prompt(
- "Send flow to script: ",
- self.state.last_script,
- self.master.run_script_once,
- self.flow
+ signals.status_prompt_path.send(
+ prompt = "Send flow to script",
+ callback = self.master.run_script_once,
+ args = (self.flow,)
)
elif key == "g":
common.ask_copy_part("a", self.flow, self.master, self.state)
@@ -249,11 +259,16 @@ class FlowListBox(urwid.ListBox):
def get_method_raw(self, k):
if k:
- self.get_url(k)
+ self.get_url(k)
def get_method(self, k):
if k == "e":
- self.master.prompt("Method:", "", self.get_method_raw)
+ signals.status_prompt.send(
+ self,
+ prompt = "Method",
+ text = "",
+ callback = self.get_method_raw
+ )
else:
method = ""
for i in common.METHOD_OPTIONS:
@@ -261,13 +276,18 @@ class FlowListBox(urwid.ListBox):
method = i[0].upper()
self.get_url(method)
- def get_url(self,method):
- self.master.prompt("URL:", "http://www.example.com/", self.new_request, method)
+ def get_url(self, method):
+ signals.status_prompt.send(
+ prompt = "URL",
+ text = "http://www.example.com/",
+ callback = self.new_request,
+ args = (method,)
+ )
def new_request(self, url, method):
parts = http.parse_url(str(url))
if not parts:
- self.master.statusbar.message("Invalid Url")
+ signals.status_message.send(message="Invalid Url")
return
scheme, host, port, path = parts
f = self.master.create_request(method, scheme, host, port, path)
@@ -283,25 +303,33 @@ class FlowListBox(urwid.ListBox):
elif key == "e":
self.master.toggle_eventlog()
elif key == "l":
- self.master.prompt("Limit: ", self.master.state.limit_txt, self.master.set_limit)
+ signals.status_prompt.send(
+ prompt = "Limit",
+ text = self.master.state.limit_txt,
+ callback = self.master.set_limit
+ )
elif key == "L":
- self.master.path_prompt(
- "Load flows: ",
- self.master.state.last_saveload,
- self.master.load_flows_callback
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Load flows",
+ callback = self.master.load_flows_callback
)
elif key == "n":
- self.master.prompt_onekey("Method", common.METHOD_OPTIONS, self.get_method)
+ signals.status_prompt_onekey.send(
+ prompt = "Method",
+ keys = common.METHOD_OPTIONS,
+ callback = self.get_method
+ )
elif key == "F":
self.master.toggle_follow_flows()
elif key == "W":
if self.master.stream:
self.master.stop_stream()
else:
- self.master.path_prompt(
- "Stream flows to: ",
- self.master.state.last_saveload,
- self.master.start_stream_to_path
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Stream flows to",
+ callback = self.master.start_stream_to_path
)
else:
return urwid.ListBox.keypress(self, size, key)
diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py
index 5c91512c..1aebb0f0 100644
--- a/libmproxy/console/flowview.py
+++ b/libmproxy/console/flowview.py
@@ -1,7 +1,7 @@
from __future__ import absolute_import
import os, sys, copy
import urwid
-from . import common, grideditor, contentview
+from . import common, grideditor, contentview, signals
from .. import utils, flow, controller
from ..protocol.http import HTTPRequest, HTTPResponse, CONTENT_MISSING, decoded
@@ -19,7 +19,7 @@ def _mkhelp():
("D", "duplicate flow"),
("e", "edit request/response"),
("f", "load full body data"),
- ("g", "copy response(content/headers) to clipboard"),
+ ("g", "copy response(content/headers) to clipboard"),
("m", "change body display mode for this entity"),
(None,
common.highlight_key("automatic", "a") +
@@ -84,32 +84,33 @@ footer = [
]
-class FlowViewHeader(common.WWrap):
+class FlowViewHeader(urwid.WidgetWrap):
def __init__(self, master, f):
self.master, self.flow = master, f
- self.w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost)
-
- def refresh_flow(self, f):
- if f == self.flow:
- self.w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost)
-
-
-class CallbackCache:
- @utils.LRUCache(200)
- def _callback(self, method, *args, **kwargs):
- return getattr(self.obj, method)(*args, **kwargs)
+ self._w = common.format_flow(
+ f,
+ False,
+ extended=True,
+ padding=0,
+ hostheader=self.master.showhost
+ )
+ signals.flow_change.connect(self.sig_flow_change)
+
+ def sig_flow_change(self, sender, flow):
+ if flow == self.flow:
+ self._w = common.format_flow(
+ flow,
+ False,
+ extended=True,
+ padding=0,
+ hostheader=self.master.showhost
+ )
- def callback(self, obj, method, *args, **kwargs):
- # obj varies!
- self.obj = obj
- return self._callback(method, *args, **kwargs)
-cache = CallbackCache()
+cache = utils.LRUCache(200)
-class FlowView(common.WWrap):
- REQ = 0
- RESP = 1
+class FlowView(urwid.WidgetWrap):
highlight_color = "focusfield"
def __init__(self, master, state, flow):
@@ -119,37 +120,39 @@ class FlowView(common.WWrap):
self.view_response()
else:
self.view_request()
+ signals.flow_change.connect(self.sig_flow_change)
- def _cached_content_view(self, viewmode, hdrItems, content, limit, is_request):
- return contentview.get_content_view(viewmode, hdrItems, content, limit, self.master.add_event, is_request)
+ def sig_flow_change(self, sender, flow):
+ if flow == self.flow:
+ if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response:
+ self.view_response()
+ else:
+ self.view_request()
def content_view(self, viewmode, conn):
- full = self.state.get_flow_setting(
- self.flow,
- (self.state.view_flow_mode, "fullcontents"),
- False
- )
- if full:
- limit = sys.maxint
+ if conn.content == CONTENT_MISSING:
+ msg, body = "", [urwid.Text([("error", "[content missing]")])]
+ return (msg, body)
else:
- limit = contentview.VIEW_CUTOFF
- description, text_objects = cache.callback(
- self, "_cached_content_view",
- viewmode,
- tuple(tuple(i) for i in conn.headers.lst),
- conn.content,
- limit,
- isinstance(conn, HTTPRequest)
- )
- return (description, text_objects)
-
- def cont_view_handle_missing(self, conn, viewmode):
- if conn.content == CONTENT_MISSING:
- msg, body = "", [urwid.Text([("error", "[content missing]")])]
+ full = self.state.get_flow_setting(
+ self.flow,
+ (self.state.view_flow_mode, "fullcontents"),
+ False
+ )
+ if full:
+ limit = sys.maxint
else:
- msg, body = self.content_view(viewmode, conn)
-
- return (msg, body)
+ limit = contentview.VIEW_CUTOFF
+ description, text_objects = cache.get(
+ contentview.get_content_view,
+ viewmode,
+ tuple(tuple(i) for i in conn.headers.lst),
+ conn.content,
+ limit,
+ self.master.add_event,
+ isinstance(conn, HTTPRequest)
+ )
+ return (description, text_objects)
def viewmode_get(self, override):
return self.state.default_body_view if override is None else override
@@ -170,7 +173,7 @@ class FlowView(common.WWrap):
)
override = self.override_get()
viewmode = self.viewmode_get(override)
- msg, body = self.cont_view_handle_missing(conn, viewmode)
+ msg, body = self.content_view(viewmode, conn)
return headers, msg, body
def conn_text_merge(self, headers, msg, body):
@@ -207,7 +210,8 @@ class FlowView(common.WWrap):
def conn_text(self, conn):
"""
- Same as conn_text_raw, but returns result wrapped in a listbox ready for usage.
+ Same as conn_text_raw, but returns result wrapped in a listbox ready for
+ usage.
"""
headers, msg, body = self.conn_text_raw(conn)
merged = self.conn_text_merge(headers, msg, body)
@@ -278,14 +282,16 @@ class FlowView(common.WWrap):
"""
runs the previous search again, forwards or backwards.
"""
- last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
+ last_search_string = self.state.get_flow_setting(
+ self.flow, "last_search_string"
+ )
if last_search_string:
message = self.search(last_search_string, backwards)
if message:
- self.master.statusbar.message(message)
+ signals.status_message.send(message=message)
else:
message = "no previous searches have been made"
- self.master.statusbar.message(message)
+ signals.status_message.send(message=message)
return message
@@ -319,7 +325,11 @@ class FlowView(common.WWrap):
# generate the body, highlight the words and get focus
headers, msg, body = self.conn_text_raw(text)
try:
- body, focus_position = self.search_highlight_text(body, search_string, backwards=backwards)
+ body, focus_position = self.search_highlight_text(
+ body,
+ search_string,
+ backwards=backwards
+ )
except SearchError:
return "Search not supported in this view."
@@ -331,12 +341,16 @@ class FlowView(common.WWrap):
merged = self.conn_text_merge(headers, msg, body)
list_box = urwid.ListBox(merged)
list_box.set_focus(focus_position + 2)
- self.w = self.wrap_body(const, list_box)
- self.master.statusbar.redraw()
+ self._w = self.wrap_body(const, list_box)
+ signals.update_settings.send(self)
self.last_displayed_body = list_box
- wrapped, wrapped_message = self.search_wrapped_around(last_find_line, last_search_index, backwards)
+ wrapped, wrapped_message = self.search_wrapped_around(
+ last_find_line,
+ last_search_index,
+ backwards
+ )
if wrapped:
return wrapped_message
@@ -344,9 +358,15 @@ class FlowView(common.WWrap):
def search_get_start(self, search_string):
start_line = 0
start_index = 0
- last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
+ last_search_string = self.state.get_flow_setting(
+ self.flow,
+ "last_search_string"
+ )
if search_string == last_search_string:
- start_line = self.state.get_flow_setting(self.flow, "last_find_line")
+ start_line = self.state.get_flow_setting(
+ self.flow,
+ "last_find_line"
+ )
start_index = self.state.get_flow_setting(self.flow,
"last_search_index")
@@ -391,7 +411,10 @@ class FlowView(common.WWrap):
found = False
text_objects = copy.deepcopy(text_objects)
- loop_range = self.search_get_range(len(text_objects), start_line, backwards)
+ loop_range = self.search_get_range(
+ len(text_objects),
+ start_line, backwards
+ )
for i in loop_range:
text_object = text_objects[i]
@@ -403,10 +426,19 @@ class FlowView(common.WWrap):
if i != start_line:
start_index = 0
- find_index = self.search_find(text, search_string, start_index, backwards)
+ find_index = self.search_find(
+ text,
+ search_string,
+ start_index,
+ backwards
+ )
if find_index != -1:
- new_text = self.search_highlight_object(text, find_index, search_string)
+ new_text = self.search_highlight_object(
+ text,
+ find_index,
+ search_string
+ )
text_objects[i] = new_text
found = True
@@ -424,14 +456,26 @@ class FlowView(common.WWrap):
focus_pos = None
else:
if not backwards:
- self.state.add_flow_setting(self.flow, "last_search_index", 0)
- self.state.add_flow_setting(self.flow, "last_find_line", 0)
+ self.state.add_flow_setting(
+ self.flow, "last_search_index", 0
+ )
+ self.state.add_flow_setting(
+ self.flow, "last_find_line", 0
+ )
else:
- self.state.add_flow_setting(self.flow, "last_search_index", None)
- self.state.add_flow_setting(self.flow, "last_find_line", len(text_objects) - 1)
+ self.state.add_flow_setting(
+ self.flow, "last_search_index", None
+ )
+ self.state.add_flow_setting(
+ self.flow, "last_find_line", len(text_objects) - 1
+ )
- text_objects, focus_pos = self.search_highlight_text(text_objects,
- search_string, looping=True, backwards=backwards)
+ text_objects, focus_pos = self.search_highlight_text(
+ text_objects,
+ search_string,
+ looping=True,
+ backwards=backwards
+ )
return text_objects, focus_pos
@@ -455,8 +499,7 @@ class FlowView(common.WWrap):
def view_request(self):
self.state.view_flow_mode = common.VIEW_FLOW_REQUEST
body = self.conn_text(self.flow.request)
- self.w = self.wrap_body(common.VIEW_FLOW_REQUEST, body)
- self.master.statusbar.redraw()
+ self._w = self.wrap_body(common.VIEW_FLOW_REQUEST, body)
def view_response(self):
self.state.view_flow_mode = common.VIEW_FLOW_RESPONSE
@@ -475,29 +518,25 @@ class FlowView(common.WWrap):
)
]
)
- self.w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body)
- self.master.statusbar.redraw()
-
- def refresh_flow(self, c=None):
- if c == self.flow:
- if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response:
- self.view_response()
- else:
- self.view_request()
+ self._w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body)
def set_method_raw(self, m):
if m:
self.flow.request.method = m
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def edit_method(self, m):
if m == "e":
- self.master.prompt_edit("Method", self.flow.request.method, self.set_method_raw)
+ signals.status_prompt.send(
+ prompt = "Method",
+ text = self.flow.request.method,
+ callback = self.set_method_raw
+ )
else:
for i in common.METHOD_OPTIONS:
if i[1] == m:
self.flow.request.method = i[0].upper()
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def set_url(self, url):
request = self.flow.request
@@ -505,7 +544,7 @@ class FlowView(common.WWrap):
request.url = str(url)
except ValueError:
return "Invalid URL."
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def set_resp_code(self, code):
response = self.flow.response
@@ -516,28 +555,37 @@ class FlowView(common.WWrap):
import BaseHTTPServer
if BaseHTTPServer.BaseHTTPRequestHandler.responses.has_key(int(code)):
response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[int(code)][0]
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def set_resp_msg(self, msg):
response = self.flow.response
response.msg = msg
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def set_headers(self, lst, conn):
conn.headers = flow.ODictCaseless(lst)
+ signals.flow_change.send(self, flow = self.flow)
def set_query(self, lst, conn):
conn.set_query(flow.ODict(lst))
+ signals.flow_change.send(self, flow = self.flow)
def set_path_components(self, lst, conn):
conn.set_path_components([i[0] for i in lst])
+ signals.flow_change.send(self, flow = self.flow)
def set_form(self, lst, conn):
conn.set_form_urlencoded(flow.ODict(lst))
+ signals.flow_change.send(self, flow = self.flow)
def edit_form(self, conn):
self.master.view_grideditor(
- grideditor.URLEncodedFormEditor(self.master, conn.get_form_urlencoded().lst, self.set_form, conn)
+ grideditor.URLEncodedFormEditor(
+ self.master,
+ conn.get_form_urlencoded().lst,
+ self.set_form,
+ conn
+ )
)
def edit_form_confirm(self, key, conn):
@@ -559,42 +607,80 @@ class FlowView(common.WWrap):
self.flow.backup()
if part == "r":
with decoded(message):
- # Fix an issue caused by some editors when editing a request/response body.
- # Many editors make it hard to save a file without a terminating newline on the last
- # line. When editing message bodies, this can cause problems. For now, I just
- # strip the newlines off the end of the body when we return from an editor.
+ # Fix an issue caused by some editors when editing a
+ # request/response body. Many editors make it hard to save a
+ # file without a terminating newline on the last line. When
+ # editing message bodies, this can cause problems. For now, I
+ # just strip the newlines off the end of the body when we return
+ # from an editor.
c = self.master.spawn_editor(message.content or "")
message.content = c.rstrip("\n")
elif part == "f":
if not message.get_form_urlencoded() and message.content:
- self.master.prompt_onekey(
- "Existing body is not a URL-encoded form. Clear and edit?",
- [
+ signals.status_prompt_onekey.send(
+ prompt = "Existing body is not a URL-encoded form. Clear and edit?",
+ keys = [
("yes", "y"),
("no", "n"),
],
- self.edit_form_confirm,
- message
+ callback = self.edit_form_confirm,
+ args = (message,)
)
else:
self.edit_form(message)
elif part == "h":
- self.master.view_grideditor(grideditor.HeaderEditor(self.master, message.headers.lst, self.set_headers, message))
+ self.master.view_grideditor(
+ grideditor.HeaderEditor(
+ self.master,
+ message.headers.lst,
+ self.set_headers,
+ message
+ )
+ )
elif part == "p":
p = message.get_path_components()
p = [[i] for i in p]
- self.master.view_grideditor(grideditor.PathEditor(self.master, p, self.set_path_components, message))
+ self.master.view_grideditor(
+ grideditor.PathEditor(
+ self.master,
+ p,
+ self.set_path_components,
+ message
+ )
+ )
elif part == "q":
- self.master.view_grideditor(grideditor.QueryEditor(self.master, message.get_query().lst, self.set_query, message))
+ self.master.view_grideditor(
+ grideditor.QueryEditor(
+ self.master,
+ message.get_query().lst,
+ self.set_query, message
+ )
+ )
elif part == "u" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- self.master.prompt_edit("URL", message.url, self.set_url)
+ signals.status_prompt.send(
+ prompt = "URL",
+ text = message.url,
+ callback = self.set_url
+ )
elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- self.master.prompt_onekey("Method", common.METHOD_OPTIONS, self.edit_method)
+ signals.status_prompt_onekey.send(
+ prompt = "Method",
+ keys = common.METHOD_OPTIONS,
+ callback = self.edit_method
+ )
elif part == "c" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
- self.master.prompt_edit("Code", str(message.code), self.set_resp_code)
+ signals.status_prompt.send(
+ prompt = "Code",
+ text = str(message.code),
+ callback = self.set_resp_code
+ )
elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
- self.master.prompt_edit("Message", message.msg, self.set_resp_msg)
- self.master.refresh_flow(self.flow)
+ signals.status_prompt.send(
+ prompt = "Message",
+ text = message.msg,
+ callback = self.set_resp_msg
+ )
+ signals.flow_change.send(self, flow = self.flow)
def _view_nextprev_flow(self, np, flow):
try:
@@ -606,9 +692,10 @@ class FlowView(common.WWrap):
else:
new_flow, new_idx = self.state.get_prev(idx)
if new_flow is None:
- self.master.statusbar.message("No more flows!")
- return
- self.master.view_flow(new_flow)
+ signals.status_message.send(message="No more flows!")
+ else:
+ signals.pop_view_state.send(self)
+ self.master.view_flow(new_flow)
def view_next_flow(self, flow):
return self._view_nextprev_flow("next", flow)
@@ -622,7 +709,7 @@ class FlowView(common.WWrap):
(self.state.view_flow_mode, "prettyview"),
contentview.get_by_shortcut(t)
)
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def delete_body(self, t):
if t == "m":
@@ -633,7 +720,7 @@ class FlowView(common.WWrap):
self.flow.request.content = val
else:
self.flow.response.content = val
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def keypress(self, size, key):
if key == " ":
@@ -647,8 +734,8 @@ class FlowView(common.WWrap):
conn = self.flow.response
if key == "q":
- self.master.view_flowlist()
- key = None
+ signals.pop_view_state.send(self)
+ return None
elif key == "tab":
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
self.view_response()
@@ -656,7 +743,7 @@ class FlowView(common.WWrap):
self.view_request()
elif key in ("up", "down", "page up", "page down"):
# Why doesn't this just work??
- self.w.keypress(size, key)
+ self._w.keypress(size, key)
elif key == "a":
self.flow.accept_intercept(self.master)
self.master.view_flow(self.flow)
@@ -681,12 +768,12 @@ class FlowView(common.WWrap):
elif key == "D":
f = self.master.duplicate_flow(self.flow)
self.master.view_flow(f)
- self.master.statusbar.message("Duplicated.")
+ signals.status_message.send(message="Duplicated.")
elif key == "e":
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- self.master.prompt_onekey(
- "Edit request",
- (
+ signals.status_prompt_onekey.send(
+ prompt = "Edit request",
+ keys = (
("query", "q"),
("path", "p"),
("url", "u"),
@@ -695,29 +782,29 @@ class FlowView(common.WWrap):
("raw body", "r"),
("method", "m"),
),
- self.edit
+ callback = self.edit
)
else:
- self.master.prompt_onekey(
- "Edit response",
- (
+ signals.status_prompt_onekey.send(
+ prompt = "Edit response",
+ keys = (
("code", "c"),
("message", "m"),
("header", "h"),
("raw body", "r"),
),
- self.edit
+ callback = self.edit
)
key = None
elif key == "f":
- self.master.statusbar.message("Loading all body data...")
+ signals.status_message.send(message="Loading all body data...")
self.state.add_flow_setting(
self.flow,
(self.state.view_flow_mode, "fullcontents"),
True
)
- self.master.refresh_flow(self.flow)
- self.master.statusbar.message("")
+ signals.flow_change.send(self, flow = self.flow)
+ signals.status_message.send(message="")
elif key == "g":
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
scope = "q"
@@ -727,10 +814,11 @@ class FlowView(common.WWrap):
elif key == "m":
p = list(contentview.view_prompts)
p.insert(0, ("Clear", "C"))
- self.master.prompt_onekey(
- "Display mode",
- p,
- self.change_this_display_mode
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Display mode",
+ keys = p,
+ callback = self.change_this_display_mode
)
key = None
elif key == "p":
@@ -738,21 +826,20 @@ class FlowView(common.WWrap):
elif key == "r":
r = self.master.replay_request(self.flow)
if r:
- self.master.statusbar.message(r)
- self.master.refresh_flow(self.flow)
+ signals.status_message.send(message=r)
+ signals.flow_change.send(self, flow = self.flow)
elif key == "V":
if not self.flow.modified():
- self.master.statusbar.message("Flow not modified.")
+ signals.status_message.send(message="Flow not modified.")
return
self.state.revert(self.flow)
- self.master.refresh_flow(self.flow)
- self.master.statusbar.message("Reverted.")
+ signals.flow_change.send(self, flow = self.flow)
+ signals.status_message.send(message="Reverted.")
elif key == "W":
- self.master.path_prompt(
- "Save this flow: ",
- self.state.last_saveload,
- self.master.save_one_flow,
- self.flow
+ signals.status_prompt_path.send(
+ prompt = "Save this flow",
+ callback = self.master.save_one_flow,
+ args = (self.flow,)
)
elif key == "v":
if conn and conn.content:
@@ -761,20 +848,23 @@ class FlowView(common.WWrap):
if os.environ.has_key("EDITOR") or os.environ.has_key("PAGER"):
self.master.spawn_external_viewer(conn.content, t)
else:
- self.master.statusbar.message("Error! Set $EDITOR or $PAGER.")
+ signals.status_message.send(
+ message = "Error! Set $EDITOR or $PAGER."
+ )
elif key == "|":
- self.master.path_prompt(
- "Send flow to script: ", self.state.last_script,
- self.master.run_script_once, self.flow
+ signals.status_prompt_path.send(
+ prompt = "Send flow to script",
+ callback = self.master.run_script_once,
+ args = (self.flow,)
)
elif key == "x":
- self.master.prompt_onekey(
- "Delete body",
- (
+ signals.status_prompt_onekey.send(
+ prompt = "Delete body",
+ keys = (
("completely", "c"),
("mark as missing", "m"),
),
- self.delete_body
+ callback = self.delete_body
)
key = None
elif key == "X":
@@ -785,24 +875,31 @@ class FlowView(common.WWrap):
e = conn.headers.get_first("content-encoding", "identity")
if e != "identity":
if not conn.decode():
- self.master.statusbar.message("Could not decode - invalid data?")
+ signals.status_message.send(
+ message = "Could not decode - invalid data?"
+ )
else:
- self.master.prompt_onekey(
- "Select encoding: ",
- (
+ signals.status_prompt_onekey.send(
+ prompt = "Select encoding: ",
+ keys = (
("gzip", "z"),
("deflate", "d"),
),
- self.encode_callback,
- conn
+ callback = self.encode_callback,
+ args = (conn,)
)
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
elif key == "/":
- last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
- search_prompt = "Search body ["+last_search_string+"]: " if last_search_string else "Search body: "
- self.master.prompt(search_prompt,
- None,
- self.search)
+ last_search_string = self.state.get_flow_setting(
+ self.flow,
+ "last_search_string"
+ )
+ search_prompt = "Search body ["+last_search_string+"]" if last_search_string else "Search body"
+ signals.status_prompt.send(
+ prompt = search_prompt,
+ text = "",
+ callback = self.search
+ )
elif key == "n":
self.search_again(backwards=False)
elif key == "N":
@@ -816,4 +913,4 @@ class FlowView(common.WWrap):
"d": "deflate",
}
conn.encode(encoding_map[key])
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
diff --git a/libmproxy/console/grideditor.py b/libmproxy/console/grideditor.py
index 438d0ad7..4bcc0171 100644
--- a/libmproxy/console/grideditor.py
+++ b/libmproxy/console/grideditor.py
@@ -1,21 +1,25 @@
from __future__ import absolute_import
-import copy, re, os
+
+import copy
+import re
+import os
import urwid
-from . import common
+
+from . import common, signals
from .. import utils, filt, script
from netlib import http_uastrings
-footer = [
+FOOTER = [
('heading_key', "enter"), ":edit ",
('heading_key', "q"), ":back ",
]
-footer_editing = [
+FOOTER_EDITING = [
('heading_key', "esc"), ":stop editing ",
]
-class SText(common.WWrap):
+class SText(urwid.WidgetWrap):
def __init__(self, txt, focused, error):
txt = txt.encode("string-escape")
w = urwid.Text(txt, wrap="any")
@@ -26,10 +30,10 @@ class SText(common.WWrap):
w = urwid.AttrWrap(w, "focusfield")
elif error:
w = urwid.AttrWrap(w, "field_error")
- common.WWrap.__init__(self, w)
+ urwid.WidgetWrap.__init__(self, w)
def get_text(self):
- return self.w.get_text()[0]
+ return self._w.get_text()[0]
def keypress(self, size, key):
return key
@@ -38,21 +42,21 @@ class SText(common.WWrap):
return True
-class SEdit(common.WWrap):
+class SEdit(urwid.WidgetWrap):
def __init__(self, txt):
txt = txt.encode("string-escape")
w = urwid.Edit(edit_text=txt, wrap="any", multiline=True)
w = urwid.AttrWrap(w, "editfield")
- common.WWrap.__init__(self, w)
+ urwid.WidgetWrap.__init__(self, w)
def get_text(self):
- return self.w.get_text()[0]
+ return self._w.get_text()[0]
def selectable(self):
return True
-class GridRow(common.WWrap):
+class GridRow(urwid.WidgetWrap):
def __init__(self, focused, editing, editor, values):
self.focused, self.editing, self.editor = focused, editing, editor
@@ -76,14 +80,14 @@ class GridRow(common.WWrap):
)
if focused is not None:
w.set_focus_column(focused)
- common.WWrap.__init__(self, w)
+ urwid.WidgetWrap.__init__(self, w)
def get_edit_value(self):
return self.editing.get_text()
def keypress(self, s, k):
if self.editing:
- w = self.w.column_widths(s)[self.focused]
+ w = self._w.column_widths(s)[self.focused]
k = self.editing.keypress((w,), k)
return k
@@ -121,12 +125,14 @@ class GridWalker(urwid.ListWalker):
try:
val = val.decode("string-escape")
except ValueError:
- self.editor.master.statusbar.message("Invalid Python-style string encoding.", 1000)
+ signals.status_message.send(
+ self, message = "Invalid Python-style string encoding.", expure = 1000
+ )
return
errors = self.lst[self.focus][1]
emsg = self.editor.is_error(self.focus_col, val)
if emsg:
- self.editor.master.statusbar.message(emsg, 1000)
+ signals.status_message.send(message = emsg, expire = 1)
errors.add(self.focus_col)
else:
errors.discard(self.focus_col)
@@ -155,13 +161,15 @@ class GridWalker(urwid.ListWalker):
def start_edit(self):
if self.lst:
- self.editing = GridRow(self.focus_col, True, self.editor, self.lst[self.focus])
- self.editor.master.statusbar.update(footer_editing)
+ self.editing = GridRow(
+ self.focus_col, True, self.editor, self.lst[self.focus]
+ )
+ self.editor.master.loop.widget.footer.update(FOOTER_EDITING)
self._modified()
def stop_edit(self):
if self.editing:
- self.editor.master.statusbar.update(footer)
+ self.editor.master.loop.widget.footer.update(FOOTER)
self.set_current_value(self.editing.get_edit_value(), False)
self.editing = False
self._modified()
@@ -187,7 +195,12 @@ class GridWalker(urwid.ListWalker):
if self.editing:
return self.editing, self.focus
elif self.lst:
- return GridRow(self.focus_col, False, self.editor, self.lst[self.focus]), self.focus
+ return GridRow(
+ self.focus_col,
+ False,
+ self.editor,
+ self.lst[self.focus]
+ ), self.focus
else:
return None, None
@@ -213,10 +226,13 @@ class GridListBox(urwid.ListBox):
FIRST_WIDTH_MAX = 40
FIRST_WIDTH_MIN = 20
-class GridEditor(common.WWrap):
+
+
+class GridEditor(urwid.WidgetWrap):
title = None
columns = None
headings = None
+
def __init__(self, master, value, callback, *cb_args, **cb_kwargs):
value = copy.deepcopy(value)
self.master, self.value, self.callback = master, value, callback
@@ -248,18 +264,18 @@ class GridEditor(common.WWrap):
self.walker = GridWalker(self.value, self)
self.lb = GridListBox(self.walker)
- self.w = urwid.Frame(
+ self._w = urwid.Frame(
self.lb,
header = urwid.Pile([title, h])
)
- self.master.statusbar.update("")
+ self.master.loop.widget.footer.update("")
self.show_empty_msg()
def show_empty_msg(self):
if self.walker.lst:
- self.w.set_footer(None)
+ self._w.set_footer(None)
else:
- self.w.set_footer(
+ self._w.set_footer(
urwid.Text(
[
("highlight", "No values. Press "),
@@ -297,7 +313,7 @@ class GridEditor(common.WWrap):
if self.walker.focus == pf and self.walker.focus_col != pfc:
self.walker.start_edit()
else:
- self.w.keypress(size, key)
+ self._w.keypress(size, key)
return None
key = common.shortcuts(key)
@@ -307,7 +323,7 @@ class GridEditor(common.WWrap):
if not i[1] and any([x.strip() for x in i[0]]):
res.append(i[0])
self.callback(res, *self.cb_args, **self.cb_kwargs)
- self.master.pop_view()
+ signals.pop_view_state.send(self)
elif key in ["h", "left"]:
self.walker.left()
elif key in ["l", "right"]:
@@ -322,10 +338,19 @@ class GridEditor(common.WWrap):
self.walker.delete_focus()
elif key == "r":
if self.walker.get_current_value() is not None:
- self.master.path_prompt("Read file: ", "", self.read_file)
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Read file",
+ callback = self.read_file
+ )
elif key == "R":
if self.walker.get_current_value() is not None:
- self.master.path_prompt("Read unescaped file: ", "", self.read_file, True)
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Read unescaped file",
+ callback = self.read_file,
+ args = (True,)
+ )
elif key == "e":
o = self.walker.get_current_value()
if o is not None:
@@ -336,7 +361,7 @@ class GridEditor(common.WWrap):
elif key in ["enter"]:
self.walker.start_edit()
elif not self.handle_key(key):
- return self.w.keypress(size, key)
+ return self._w.keypress(size, key)
def is_error(self, col, val):
"""
@@ -362,7 +387,9 @@ class GridEditor(common.WWrap):
("tab", "next field"),
("enter", "edit field"),
]
- text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
text.append(
urwid.Text(
[
@@ -384,6 +411,7 @@ class HeaderEditor(GridEditor):
title = "Editing headers"
columns = 2
headings = ("Key", "Value")
+
def make_help(self):
h = GridEditor.make_help(self)
text = []
@@ -391,7 +419,9 @@ class HeaderEditor(GridEditor):
keys = [
("U", "add User-Agent header"),
]
- text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
text.append(urwid.Text([("text", "\n")]))
text.extend(h)
return text
@@ -408,10 +438,10 @@ class HeaderEditor(GridEditor):
def handle_key(self, key):
if key == "U":
- self.master.prompt_onekey(
- "Add User-Agent header:",
- [(i[0], i[1]) for i in http_uastrings.UASTRINGS],
- self.set_user_agent,
+ signals.status_prompt_onekey.send(
+ prompt = "Add User-Agent header:",
+ keys = [(i[0], i[1]) for i in http_uastrings.UASTRINGS],
+ callback = self.set_user_agent,
)
return True
@@ -426,6 +456,7 @@ class ReplaceEditor(GridEditor):
title = "Editing replacement patterns"
columns = 3
headings = ("Filter", "Regex", "Replacement")
+
def is_error(self, col, val):
if col == 0:
if not filt.parse(val):
@@ -442,6 +473,7 @@ class SetHeadersEditor(GridEditor):
title = "Editing header set patterns"
columns = 3
headings = ("Filter", "Header", "Value")
+
def is_error(self, col, val):
if col == 0:
if not filt.parse(val):
@@ -455,7 +487,9 @@ class SetHeadersEditor(GridEditor):
keys = [
("U", "add User-Agent header"),
]
- text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
text.append(urwid.Text([("text", "\n")]))
text.extend(h)
return text
@@ -473,10 +507,10 @@ class SetHeadersEditor(GridEditor):
def handle_key(self, key):
if key == "U":
- self.master.prompt_onekey(
- "Add User-Agent header:",
- [(i[0], i[1]) for i in http_uastrings.UASTRINGS],
- self.set_user_agent,
+ signals.status_prompt_onekey.send(
+ prompt = "Add User-Agent header:",
+ keys = [(i[0], i[1]) for i in http_uastrings.UASTRINGS],
+ callback = self.set_user_agent,
)
return True
@@ -491,6 +525,7 @@ class ScriptEditor(GridEditor):
title = "Editing scripts"
columns = 1
headings = ("Command",)
+
def is_error(self, col, val):
try:
script.Script.parse_command(val)
@@ -507,4 +542,4 @@ class HostPatternEditor(GridEditor):
try:
re.compile(val, re.IGNORECASE)
except re.error as e:
- return "Invalid regex: %s" % str(e) \ No newline at end of file
+ return "Invalid regex: %s" % str(e)
diff --git a/libmproxy/console/help.py b/libmproxy/console/help.py
index 27288a36..73cd8a50 100644
--- a/libmproxy/console/help.py
+++ b/libmproxy/console/help.py
@@ -1,6 +1,8 @@
from __future__ import absolute_import
+
import urwid
-from . import common
+
+from . import common, signals
from .. import filt, version
footer = [
@@ -8,9 +10,9 @@ footer = [
('heading_key', "q"), ":back ",
]
+
class HelpView(urwid.ListBox):
- def __init__(self, master, help_context, state):
- self.master, self.state = master, state
+ def __init__(self, help_context):
self.help_context = help_context or []
urwid.ListBox.__init__(
self,
@@ -122,7 +124,9 @@ class HelpView(urwid.ListBox):
("T", "set tcp proxying pattern"),
("u", "set sticky auth expression"),
]
- text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
text.append(urwid.Text([("head", "\n\nFilter expressions:\n")]))
f = []
@@ -167,16 +171,15 @@ class HelpView(urwid.ListBox):
("~q ~b test", "Requests where body contains \"test\""),
("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."),
]
- text.extend(common.format_keyvals(examples, key="key", val="text", indent=4))
+ text.extend(
+ common.format_keyvals(examples, key="key", val="text", indent=4)
+ )
return text
def keypress(self, size, key):
key = common.shortcuts(key)
if key == "q":
- self.master.statusbar = self.state[0]
- self.master.body = self.state[1]
- self.master.header = self.state[2]
- self.master.make_view()
+ signals.pop_view_state.send(self)
return None
elif key == "?":
key = None
diff --git a/libmproxy/console/palettes.py b/libmproxy/console/palettes.py
index 650cf261..cfb2702c 100644
--- a/libmproxy/console/palettes.py
+++ b/libmproxy/console/palettes.py
@@ -1,192 +1,261 @@
-palettes = {
-# Default palette for dark background
- 'dark': [
- # name, foreground, background, mono, foreground_high, background_high
- # For details on the meaning of the elements refer to
- # http://excess.org/urwid/reference.html#Screen-register_palette
-
- ('body', 'black', 'dark cyan'),
- ('foot', 'light gray', 'default'),
- ('title', 'white,bold', 'default',),
- ('editline', 'white', 'default',),
-
- # Status bar & heading
- ('heading', 'light gray', 'dark blue', None, 'g85', 'dark blue'),
- ('heading_key', 'light cyan', 'dark blue', None, 'light cyan', 'dark blue'),
- ('heading_inactive', 'white', 'dark gray', None, 'g58', 'g11'),
-
- # Help
- ('key', 'light cyan', 'default'),
- ('head', 'white,bold', 'default'),
- ('text', 'light gray', 'default'),
-
- # List and Connections
- ('method', 'dark cyan', 'default'),
- ('focus', 'yellow', 'default'),
-
- ('code_200', 'light green', 'default'),
- ('code_300', 'light blue', 'default'),
- ('code_400', 'light red', 'default', None, '#f60', 'default'),
- ('code_500', 'light red', 'default'),
- ('code_other', 'dark red', 'default'),
-
- ('error', 'light red', 'default'),
-
- ('header', 'dark cyan', 'default'),
- ('highlight', 'white,bold', 'default'),
- ('intercept', 'brown', 'default', None, '#f60', 'default'),
- ('replay', 'light green', 'default', None, '#0f0', 'default'),
- ('ack', 'light red', 'default'),
-
- # Hex view
- ('offset', 'dark cyan', 'default'),
-
- # Grid Editor
- ('focusfield', 'black', 'light gray'),
- ('focusfield_error', 'dark red', 'light gray'),
- ('field_error', 'dark red', 'black'),
- ('editfield', 'black', 'light cyan'),
- ],
-
-# Palette for light background
- 'light': [
- ('body', 'black', 'dark cyan'),
- ('foot', 'dark gray', 'default'),
- ('title', 'white,bold', 'light blue',),
- ('editline', 'white', 'default',),
-
- # Status bar & heading
- ('heading', 'white', 'light gray', None, 'g85', 'dark blue'),
- ('heading_key', 'dark blue', 'light gray', None, 'light cyan', 'dark blue'),
- ('heading_inactive', 'light gray', 'dark gray', None, 'dark gray', 'dark blue'),
-
- # Help
- ('key', 'dark blue,bold', 'default'),
- ('head', 'black,bold', 'default'),
- ('text', 'dark gray', 'default'),
-
- # List and Connections
- ('method', 'dark cyan', 'default'),
- ('focus', 'black', 'default'),
-
- ('code_200', 'dark green', 'default'),
- ('code_300', 'light blue', 'default'),
- ('code_400', 'dark red', 'default', None, '#f60', 'default'),
- ('code_500', 'dark red', 'default'),
- ('code_other', 'light red', 'default'),
-
- ('error', 'light red', 'default'),
-
- ('header', 'dark blue', 'default'),
- ('highlight', 'black,bold', 'default'),
- ('intercept', 'brown', 'default', None, '#f60', 'default'),
- ('replay', 'dark green', 'default', None, '#0f0', 'default'),
- ('ack', 'dark red', 'default'),
-
- # Hex view
- ('offset', 'dark blue', 'default'),
-
- # Grid Editor
- ('focusfield', 'black', 'light gray'),
- ('focusfield_error', 'dark red', 'light gray'),
- ('field_error', 'dark red', 'black'),
- ('editfield', 'black', 'light cyan'),
- ],
-
-# Palettes for terminals that use the Solarized precision colors
-# (http://ethanschoonover.com/solarized#the-values)
-
-# For dark backgrounds
- 'solarized_dark': [
- ('body', 'dark cyan', 'default'),
- ('foot', 'dark gray', 'default'),
- ('title', 'white,bold', 'default',),
- ('editline', 'white', 'default',),
-
- # Status bar & heading
- ('heading', 'light gray', 'light cyan',),
- ('heading_key', 'dark blue', 'white',),
- ('heading_inactive', 'light cyan', 'light gray',),
-
- # Help
- ('key', 'dark blue', 'default',),
- ('head', 'white,underline', 'default'),
- ('text', 'light cyan', 'default'),
-
- # List and Connections
- ('method', 'dark cyan', 'default'),
- ('focus', 'white', 'default'),
-
- ('code_200', 'dark green', 'default'),
- ('code_300', 'light blue', 'default'),
- ('code_400', 'dark red', 'default',),
- ('code_500', 'dark red', 'default'),
- ('code_other', 'light red', 'default'),
-
- ('error', 'light red', 'default'),
-
- ('header', 'yellow', 'default'),
- ('highlight', 'white', 'default'),
- ('intercept', 'brown', 'default',),
- ('replay', 'dark green', 'default',),
- ('ack', 'dark red', 'default'),
-
- # Hex view
- ('offset', 'yellow', 'default'),
- ('text', 'light cyan', 'default'),
-
- # Grid Editor
- ('focusfield', 'white', 'light cyan'),
- ('focusfield_error', 'dark red', 'light gray'),
- ('field_error', 'dark red', 'black'),
- ('editfield', 'black', 'light gray'),
- ],
-
-# For light backgrounds
- 'solarized_light': [
- ('body', 'dark cyan', 'default'),
- ('foot', 'dark gray', 'default'),
- ('title', 'white,bold', 'light cyan',),
- ('editline', 'white', 'default',),
-
- # Status bar & heading
- ('heading', 'light cyan', 'light gray',),
- ('heading_key', 'dark blue', 'white',),
- ('heading_inactive', 'white', 'light gray',),
-
- # Help
- ('key', 'dark blue', 'default',),
- ('head', 'black,underline', 'default'),
- ('text', 'light cyan', 'default'),
-
- # List and Connections
- ('method', 'dark cyan', 'default'),
- ('focus', 'black', 'default'),
-
- ('code_200', 'dark green', 'default'),
- ('code_300', 'light blue', 'default'),
- ('code_400', 'dark red', 'default',),
- ('code_500', 'dark red', 'default'),
- ('code_other', 'light red', 'default'),
-
- ('error', 'light red', 'default'),
-
- ('header', 'light cyan', 'default'),
- ('highlight', 'black,bold', 'default'),
- ('intercept', 'brown', 'default',),
- ('replay', 'dark green', 'default',),
- ('ack', 'dark red', 'default'),
-
- # Hex view
- ('offset', 'light cyan', 'default'),
- ('text', 'yellow', 'default'),
-
- # Grid Editor
- ('focusfield', 'black', 'light gray'),
- ('focusfield_error', 'dark red', 'light gray'),
- ('field_error', 'dark red', 'black'),
- ('editfield', 'white', 'light cyan'),
- ],
+# Low-color themes should ONLY use the standard foreground and background
+# colours listed here:
+#
+# http://urwid.org/manual/displayattributes.html
+#
+
+
+
+class Palette:
+ _fields = [
+ 'title',
+
+ # Status bar & heading
+ 'heading', 'heading_key', 'heading_inactive',
+
+ # Help
+ 'key', 'head', 'text',
+
+ # List and Connections
+ 'method', 'focus',
+ 'code_200', 'code_300', 'code_400', 'code_500', 'code_other',
+ 'error',
+ 'header', 'highlight', 'intercept', 'replay',
+
+ # Hex view
+ 'offset',
+
+ # Grid Editor
+ 'focusfield', 'focusfield_error', 'field_error', 'editfield',
+ ]
+ high = None
+
+ def palette(self):
+ l = []
+ for i in self._fields:
+ v = [i]
+ v.extend(self.low[i])
+ if self.high and i in self.high:
+ v.append(None)
+ v.extend(self.high[i])
+ l.append(tuple(v))
+ return l
+
+
+class LowDark(Palette):
+ """
+ Low-color dark background
+ """
+ low = dict(
+ title = ('white,bold', 'default'),
+
+ # Status bar & heading
+ heading = ('light gray', 'dark blue'),
+ heading_key = ('light cyan', 'dark blue'),
+ heading_inactive = ('white', 'dark gray'),
+
+ # Help
+ key = ('light cyan', 'default'),
+ head = ('white,bold', 'default'),
+ text = ('light gray', 'default'),
+
+ # List and Connections
+ method = ('dark cyan', 'default'),
+ focus = ('yellow', 'default'),
+
+ code_200 = ('dark green', 'default'),
+ code_300 = ('light blue', 'default'),
+ code_400 = ('light red', 'default'),
+ code_500 = ('light red', 'default'),
+ code_other = ('dark red', 'default'),
+
+ error = ('light red', 'default'),
+
+ header = ('dark cyan', 'default'),
+ highlight = ('white,bold', 'default'),
+ intercept = ('brown', 'default'),
+ replay = ('light green', 'default'),
+
+ # Hex view
+ offset = ('dark cyan', 'default'),
+
+ # Grid Editor
+ focusfield = ('black', 'light gray'),
+ focusfield_error = ('dark red', 'light gray'),
+ field_error = ('dark red', 'default'),
+ editfield = ('white', 'default'),
+ )
+
+
+class Dark(LowDark):
+ high = dict(
+ heading_inactive = ('g58', 'g11'),
+ intercept = ('#f60', 'default'),
+ )
+
+
+class LowLight(Palette):
+ """
+ Low-color light background
+ """
+ low = dict(
+ title = ('dark magenta,bold', 'light blue'),
+
+ # Status bar & heading
+ heading = ('light gray', 'dark blue'),
+ heading_key = ('light cyan', 'dark blue'),
+ heading_inactive = ('black', 'light gray'),
+
+ # Help
+ key = ('dark blue,bold', 'default'),
+ head = ('black,bold', 'default'),
+ text = ('dark gray', 'default'),
+
+ # List and Connections
+ method = ('dark cyan', 'default'),
+ focus = ('black', 'default'),
+
+ code_200 = ('dark green', 'default'),
+ code_300 = ('light blue', 'default'),
+ code_400 = ('dark red', 'default'),
+ code_500 = ('dark red', 'default'),
+ code_other = ('light red', 'default'),
+
+ error = ('light red', 'default'),
+
+ header = ('dark blue', 'default'),
+ highlight = ('black,bold', 'default'),
+ intercept = ('brown', 'default'),
+ replay = ('dark green', 'default'),
+
+ # Hex view
+ offset = ('dark blue', 'default'),
+
+ # Grid Editor
+ focusfield = ('black', 'light gray'),
+ focusfield_error = ('dark red', 'light gray'),
+ field_error = ('dark red', 'black'),
+ editfield = ('black', 'default'),
+ )
+
+
+class Light(LowLight):
+ high = dict(
+ heading = ('g99', '#08f'),
+ heading_key = ('#0ff,bold', '#08f'),
+ heading_inactive = ('g35', 'g85'),
+ replay = ('#0a0,bold', 'default'),
+ )
+
+
+# Solarized palette in Urwid-style terminal high-colour offsets
+# See: http://ethanschoonover.com/solarized
+sol_base03 = "h234"
+sol_base02 = "h235"
+sol_base01 = "h240"
+sol_base00 = "h241"
+sol_base0 = "h244"
+sol_base1 = "h245"
+sol_base2 = "h254"
+sol_base3 = "h230"
+sol_yellow = "h136"
+sol_orange = "h166"
+sol_red = "h160"
+sol_magenta = "h125"
+sol_violet = "h61"
+sol_blue = "h33"
+sol_cyan = "h37"
+sol_green = "h64"
+class SolarizedLight(LowLight):
+ high = dict(
+ title = (sol_blue, 'default'),
+ text = (sol_base00, 'default'),
+
+ # Status bar & heading
+ heading = (sol_base2, sol_base02),
+ heading_key = (sol_blue, sol_base03),
+ heading_inactive = (sol_base03, sol_base1),
+
+ # Help
+ key = (sol_blue, 'default',),
+ head = (sol_base00, 'default'),
+
+ # List and Connections
+ method = (sol_cyan, 'default'),
+ focus = (sol_base01, 'default'),
+
+ code_200 = (sol_green, 'default'),
+ code_300 = (sol_blue, 'default'),
+ code_400 = (sol_orange, 'default',),
+ code_500 = (sol_red, 'default'),
+ code_other = (sol_magenta, 'default'),
+
+ error = (sol_red, 'default'),
+
+ header = (sol_base01, 'default'),
+ highlight = (sol_base01, 'default'),
+ intercept = (sol_red, 'default',),
+ replay = (sol_green, 'default',),
+
+ # Hex view
+ offset = (sol_cyan, 'default'),
+
+ # Grid Editor
+ focusfield = (sol_base00, sol_base2),
+ focusfield_error = (sol_red, sol_base2),
+ field_error = (sol_red, 'default'),
+ editfield = (sol_base01, 'default'),
+ )
+
+
+class SolarizedDark(LowDark):
+ high = dict(
+ title = (sol_blue, 'default'),
+ text = (sol_base0, 'default'),
+
+ # Status bar & heading
+ heading = (sol_base03, sol_base1),
+ heading_key = (sol_blue+",bold", sol_base1),
+ heading_inactive = (sol_base1, sol_base02),
+
+ # Help
+ key = (sol_blue, 'default',),
+ head = (sol_base00, 'default'),
+
+ # List and Connections
+ method = (sol_cyan, 'default'),
+ focus = (sol_base1, 'default'),
+
+ code_200 = (sol_green, 'default'),
+ code_300 = (sol_blue, 'default'),
+ code_400 = (sol_orange, 'default',),
+ code_500 = (sol_red, 'default'),
+ code_other = (sol_magenta, 'default'),
+
+ error = (sol_red, 'default'),
+
+ header = (sol_base01, 'default'),
+ highlight = (sol_base01, 'default'),
+ intercept = (sol_red, 'default',),
+ replay = (sol_green, 'default',),
+
+ # Hex view
+ offset = (sol_cyan, 'default'),
+
+ # Grid Editor
+ focusfield = (sol_base0, sol_base02),
+ focusfield_error = (sol_red, sol_base02),
+ field_error = (sol_red, 'default'),
+ editfield = (sol_base1, 'default'),
+ )
+
+palettes = {
+ "lowlight": LowLight(),
+ "lowdark": LowDark(),
+ "light": Light(),
+ "dark": Dark(),
+ "solarized_light": SolarizedLight(),
+ "solarized_dark": SolarizedDark(),
}
diff --git a/libmproxy/console/pathedit.py b/libmproxy/console/pathedit.py
new file mode 100644
index 00000000..53cda3be
--- /dev/null
+++ b/libmproxy/console/pathedit.py
@@ -0,0 +1,69 @@
+import glob
+import os.path
+
+import urwid
+
+
+class _PathCompleter:
+ def __init__(self, _testing=False):
+ """
+ _testing: disables reloading of the lookup table to make testing
+ possible.
+ """
+ self.lookup, self.offset = None, None
+ self.final = None
+ self._testing = _testing
+
+ def reset(self):
+ self.lookup = None
+ self.offset = -1
+
+ def complete(self, txt):
+ """
+ Returns the next completion for txt, or None if there is no
+ completion.
+ """
+ path = os.path.expanduser(txt)
+ if not self.lookup:
+ if not self._testing:
+ # Lookup is a set of (display value, actual value) tuples.
+ self.lookup = []
+ if os.path.isdir(path):
+ files = glob.glob(os.path.join(path, "*"))
+ prefix = txt
+ else:
+ files = glob.glob(path+"*")
+ prefix = os.path.dirname(txt)
+ prefix = prefix or "./"
+ for f in files:
+ display = os.path.join(prefix, os.path.basename(f))
+ if os.path.isdir(f):
+ display += "/"
+ self.lookup.append((display, f))
+ if not self.lookup:
+ self.final = path
+ return path
+ self.lookup.sort()
+ self.offset = -1
+ self.lookup.append((txt, txt))
+ self.offset += 1
+ if self.offset >= len(self.lookup):
+ self.offset = 0
+ ret = self.lookup[self.offset]
+ self.final = ret[1]
+ return ret[0]
+
+
+class PathEdit(urwid.Edit, _PathCompleter):
+ def __init__(self, *args, **kwargs):
+ urwid.Edit.__init__(self, *args, **kwargs)
+ _PathCompleter.__init__(self)
+
+ def keypress(self, size, key):
+ if key == "tab":
+ comp = self.complete(self.get_edit_text())
+ self.set_edit_text(comp)
+ self.set_edit_pos(len(comp))
+ else:
+ self.reset()
+ return urwid.Edit.keypress(self, size, key)
diff --git a/libmproxy/console/signals.py b/libmproxy/console/signals.py
new file mode 100644
index 00000000..e4c11f5a
--- /dev/null
+++ b/libmproxy/console/signals.py
@@ -0,0 +1,30 @@
+import blinker
+
+# Show a status message in the action bar
+status_message = blinker.Signal()
+
+# Prompt for input
+status_prompt = blinker.Signal()
+
+# Prompt for a path
+status_prompt_path = blinker.Signal()
+
+# Prompt for a single keystroke
+status_prompt_onekey = blinker.Signal()
+
+# Call a callback in N seconds
+call_in = blinker.Signal()
+
+# Focus the body, footer or header of the main window
+focus = blinker.Signal()
+
+# Fired when settings change
+update_settings = blinker.Signal()
+
+# Fired when a flow changes
+flow_change = blinker.Signal()
+
+
+# Pop and push view state onto a stack
+pop_view_state = blinker.Signal()
+push_view_state = blinker.Signal()
diff --git a/libmproxy/console/statusbar.py b/libmproxy/console/statusbar.py
new file mode 100644
index 00000000..7fb15aa6
--- /dev/null
+++ b/libmproxy/console/statusbar.py
@@ -0,0 +1,254 @@
+import time
+import os.path
+
+import urwid
+
+from . import pathedit, signals, common
+from .. import utils
+
+
+class ActionBar(urwid.WidgetWrap):
+ def __init__(self):
+ urwid.WidgetWrap.__init__(self, None)
+ self.clear()
+ signals.status_message.connect(self.sig_message)
+ signals.status_prompt.connect(self.sig_prompt)
+ signals.status_prompt_path.connect(self.sig_path_prompt)
+ signals.status_prompt_onekey.connect(self.sig_prompt_onekey)
+
+ self.last_path = ""
+
+ self.prompting = False
+ self.onekey = False
+ self.pathprompt = False
+
+
+ def sig_message(self, sender, message, expire=None):
+ w = urwid.Text(message)
+ self._w = w
+ if expire:
+ def cb(*args):
+ if w == self._w:
+ self.clear()
+ signals.call_in.send(seconds=expire, callback=cb)
+
+ def prep_prompt(self, p):
+ return p.strip() + ": "
+
+ def sig_prompt(self, sender, prompt, text, callback, args=()):
+ signals.focus.send(self, section="footer")
+ self._w = urwid.Edit(self.prep_prompt(prompt), text or "")
+ self.prompting = (callback, args)
+
+ def sig_path_prompt(self, sender, prompt, callback, args=()):
+ signals.focus.send(self, section="footer")
+ self._w = pathedit.PathEdit(
+ self.prep_prompt(prompt),
+ os.path.dirname(self.last_path)
+ )
+ self.pathprompt = True
+ self.prompting = (callback, args)
+
+ def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
+ """
+ Keys are a set of (word, key) tuples. The appropriate key in the
+ word is highlighted.
+ """
+ signals.focus.send(self, section="footer")
+ prompt = [prompt, " ("]
+ mkup = []
+ for i, e in enumerate(keys):
+ mkup.extend(common.highlight_key(e[0], e[1]))
+ if i < len(keys)-1:
+ mkup.append(",")
+ prompt.extend(mkup)
+ prompt.append(")? ")
+ self.onekey = set(i[1] for i in keys)
+ self._w = urwid.Edit(prompt, "")
+ self.prompting = (callback, args)
+
+ def selectable(self):
+ return True
+
+ def keypress(self, size, k):
+ if self.prompting:
+ if k == "esc":
+ self.prompt_done()
+ elif self.onekey:
+ if k == "enter":
+ self.prompt_done()
+ elif k in self.onekey:
+ self.prompt_execute(k)
+ elif k == "enter":
+ self.prompt_execute(self._w.get_edit_text())
+ else:
+ if common.is_keypress(k):
+ self._w.keypress(size, k)
+ else:
+ return k
+
+ def clear(self):
+ self._w = urwid.Text("")
+
+ def prompt_done(self):
+ self.prompting = False
+ self.onekey = False
+ self.pathprompt = False
+ signals.status_message.send(message="")
+ signals.focus.send(self, section="body")
+
+ def prompt_execute(self, txt):
+ if self.pathprompt:
+ self.last_path = txt
+ p, args = self.prompting
+ self.prompt_done()
+ msg = p(txt, *args)
+ if msg:
+ signals.status_message.send(message=msg, expire=1)
+
+
+class StatusBar(urwid.WidgetWrap):
+ def __init__(self, master, helptext):
+ self.master, self.helptext = master, helptext
+ self.ab = ActionBar()
+ self.ib = urwid.WidgetWrap(urwid.Text(""))
+ self._w = urwid.Pile([self.ib, self.ab])
+ signals.update_settings.connect(self.sig_update_settings)
+ self.redraw()
+
+ def sig_update_settings(self, sender):
+ self.redraw()
+
+ def keypress(self, *args, **kwargs):
+ return self.ab.keypress(*args, **kwargs)
+
+ def get_status(self):
+ r = []
+
+ if self.master.setheaders.count():
+ r.append("[")
+ r.append(("heading_key", "H"))
+ r.append("eaders]")
+ if self.master.replacehooks.count():
+ r.append("[")
+ r.append(("heading_key", "R"))
+ r.append("eplacing]")
+ if self.master.client_playback:
+ r.append("[")
+ r.append(("heading_key", "cplayback"))
+ r.append(":%s to go]"%self.master.client_playback.count())
+ if self.master.server_playback:
+ r.append("[")
+ r.append(("heading_key", "splayback"))
+ if self.master.nopop:
+ r.append(":%s in file]"%self.master.server_playback.count())
+ else:
+ r.append(":%s to go]"%self.master.server_playback.count())
+ if self.master.get_ignore_filter():
+ r.append("[")
+ r.append(("heading_key", "I"))
+ r.append("gnore:%d]" % len(self.master.get_ignore_filter()))
+ if self.master.get_tcp_filter():
+ r.append("[")
+ r.append(("heading_key", "T"))
+ r.append("CP:%d]" % len(self.master.get_tcp_filter()))
+ if self.master.state.intercept_txt:
+ r.append("[")
+ r.append(("heading_key", "i"))
+ r.append(":%s]"%self.master.state.intercept_txt)
+ if self.master.state.limit_txt:
+ r.append("[")
+ r.append(("heading_key", "l"))
+ r.append(":%s]"%self.master.state.limit_txt)
+ if self.master.stickycookie_txt:
+ r.append("[")
+ r.append(("heading_key", "t"))
+ r.append(":%s]"%self.master.stickycookie_txt)
+ if self.master.stickyauth_txt:
+ r.append("[")
+ r.append(("heading_key", "u"))
+ r.append(":%s]"%self.master.stickyauth_txt)
+ if self.master.state.default_body_view.name != "Auto":
+ r.append("[")
+ r.append(("heading_key", "M"))
+ r.append(":%s]"%self.master.state.default_body_view.name)
+
+ opts = []
+ if self.master.anticache:
+ opts.append("anticache")
+ if self.master.anticomp:
+ opts.append("anticomp")
+ if self.master.showhost:
+ opts.append("showhost")
+ if not self.master.refresh_server_playback:
+ opts.append("norefresh")
+ if self.master.killextra:
+ opts.append("killextra")
+ if self.master.server.config.no_upstream_cert:
+ opts.append("no-upstream-cert")
+ if self.master.state.follow_focus:
+ opts.append("following")
+ if self.master.stream_large_bodies:
+ opts.append(
+ "stream:%s" % utils.pretty_size(
+ self.master.stream_large_bodies.max_size
+ )
+ )
+
+ if opts:
+ r.append("[%s]"%(":".join(opts)))
+
+ if self.master.server.config.mode in ["reverse", "upstream"]:
+ dst = self.master.server.config.mode.dst
+ scheme = "https" if dst[0] else "http"
+ if dst[1] != dst[0]:
+ scheme += "2https" if dst[1] else "http"
+ r.append("[dest:%s]"%utils.unparse_url(scheme, *dst[2:]))
+ if self.master.scripts:
+ r.append("[")
+ r.append(("heading_key", "s"))
+ r.append("cripts:%s]"%len(self.master.scripts))
+ # r.append("[lt:%0.3f]"%self.master.looptime)
+
+ if self.master.stream:
+ r.append("[W:%s]"%self.master.stream_path)
+
+ return r
+
+ def redraw(self):
+ fc = self.master.state.flow_count()
+ if self.master.state.focus is None:
+ offset = 0
+ else:
+ offset = min(self.master.state.focus + 1, fc)
+ t = [
+ ('heading', ("[%s/%s]"%(offset, fc)).ljust(9))
+ ]
+
+ if self.master.server.bound:
+ host = self.master.server.address.host
+ if host == "0.0.0.0":
+ host = "*"
+ boundaddr = "[%s:%s]"%(host, self.master.server.address.port)
+ else:
+ boundaddr = ""
+ t.extend(self.get_status())
+ status = urwid.AttrWrap(urwid.Columns([
+ urwid.Text(t),
+ urwid.Text(
+ [
+ self.helptext,
+ boundaddr
+ ],
+ align="right"
+ ),
+ ]), "heading")
+ self.ib._w = status
+
+ def update(self, text):
+ self.helptext = text
+ self.redraw()
+ self.master.loop.draw_screen()
+
+ def selectable(self):
+ return True
diff --git a/libmproxy/console/window.py b/libmproxy/console/window.py
new file mode 100644
index 00000000..d686f61d
--- /dev/null
+++ b/libmproxy/console/window.py
@@ -0,0 +1,142 @@
+import urwid
+from . import common, grideditor, signals, contentview
+
+class Window(urwid.Frame):
+ def __init__(self, master, body, header, footer):
+ urwid.Frame.__init__(self, body, header=header, footer=footer)
+ self.master = master
+ signals.focus.connect(self.sig_focus)
+
+ def sig_focus(self, sender, section):
+ self.focus_position = section
+
+ def keypress(self, size, k):
+ k = urwid.Frame.keypress(self, self.master.loop.screen_size, k)
+ if k == "?":
+ self.master.view_help()
+ elif k == "c":
+ if not self.master.client_playback:
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Client replay",
+ callback = self.master.client_playback_path
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Stop current client replay?",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = self.master.stop_client_playback_prompt,
+ )
+ elif k == "H":
+ self.master.view_grideditor(
+ grideditor.SetHeadersEditor(
+ self.master,
+ self.master.setheaders.get_specs(),
+ self.master.setheaders.set
+ )
+ )
+ elif k == "I":
+ self.master.view_grideditor(
+ grideditor.HostPatternEditor(
+ self.master,
+ [[x] for x in self.master.get_ignore_filter()],
+ self.master.edit_ignore_filter
+ )
+ )
+ elif k == "T":
+ self.master.view_grideditor(
+ grideditor.HostPatternEditor(
+ self.master,
+ [[x] for x in self.master.get_tcp_filter()],
+ self.master.edit_tcp_filter
+ )
+ )
+ elif k == "i":
+ signals.status_prompt.send(
+ self,
+ prompt = "Intercept filter",
+ text = self.master.state.intercept_txt,
+ callback = self.master.set_intercept
+ )
+ elif k == "Q":
+ raise urwid.ExitMainLoop
+ elif k == "q":
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Quit",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = self.master.quit,
+ )
+ elif k == "M":
+ signals.status_prompt_onekey.send(
+ prompt = "Global default display mode",
+ keys = contentview.view_prompts,
+ callback = self.master.change_default_display_mode
+ )
+ elif k == "R":
+ self.master.view_grideditor(
+ grideditor.ReplaceEditor(
+ self.master,
+ self.master.replacehooks.get_specs(),
+ self.master.replacehooks.set
+ )
+ )
+ elif k == "s":
+ self.master.view_grideditor(
+ grideditor.ScriptEditor(
+ self.master,
+ [[i.command] for i in self.master.scripts],
+ self.master.edit_scripts
+ )
+ )
+ elif k == "S":
+ if not self.master.server_playback:
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Server replay path",
+ callback = self.master.server_playback_path
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Stop current server replay?",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = self.master.stop_server_playback_prompt,
+ )
+ elif k == "o":
+ signals.status_prompt_onekey.send(
+ prompt = "Options",
+ keys = (
+ ("anticache", "a"),
+ ("anticomp", "c"),
+ ("showhost", "h"),
+ ("killextra", "k"),
+ ("norefresh", "n"),
+ ("no-upstream-certs", "u"),
+ ),
+ callback = self.master._change_options
+ )
+ elif k == "t":
+ signals.status_prompt.send(
+ prompt = "Sticky cookie filter",
+ text = self.master.stickycookie_txt,
+ callback = self.master.set_stickycookie
+ )
+ elif k == "u":
+ signals.status_prompt.send(
+ prompt = "Sticky auth filter",
+ text = self.master.stickyauth_txt,
+ callback = self.master.set_stickyauth
+ )
+ else:
+ return k