aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy/console
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2016-02-15 14:58:46 +0100
committerMaximilian Hils <git@maximilianhils.com>2016-02-15 14:58:46 +0100
commit33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04 (patch)
tree31914a601302579ff817504019296fd7e9e46765 /libmproxy/console
parent36f34f701991b5d474c005ec45e3b66e20f326a8 (diff)
downloadmitmproxy-33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04.tar.gz
mitmproxy-33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04.tar.bz2
mitmproxy-33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04.zip
move mitmproxy
Diffstat (limited to 'libmproxy/console')
-rw-r--r--libmproxy/console/__init__.py744
-rw-r--r--libmproxy/console/common.py444
-rw-r--r--libmproxy/console/flowdetailview.py153
-rw-r--r--libmproxy/console/flowlist.py397
-rw-r--r--libmproxy/console/flowview.py711
-rw-r--r--libmproxy/console/grideditor.py716
-rw-r--r--libmproxy/console/help.py117
-rw-r--r--libmproxy/console/options.py271
-rw-r--r--libmproxy/console/palettepicker.py82
-rw-r--r--libmproxy/console/palettes.py326
-rw-r--r--libmproxy/console/pathedit.py71
-rw-r--r--libmproxy/console/searchable.py93
-rw-r--r--libmproxy/console/select.py120
-rw-r--r--libmproxy/console/signals.py43
-rw-r--r--libmproxy/console/statusbar.py258
-rw-r--r--libmproxy/console/tabs.py70
-rw-r--r--libmproxy/console/window.py90
17 files changed, 0 insertions, 4706 deletions
diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py
deleted file mode 100644
index e739ec61..00000000
--- a/libmproxy/console/__init__.py
+++ /dev/null
@@ -1,744 +0,0 @@
-from __future__ import absolute_import
-
-import mailcap
-import mimetypes
-import tempfile
-import os
-import os.path
-import shlex
-import signal
-import stat
-import subprocess
-import sys
-import traceback
-import urwid
-import weakref
-
-from .. import controller, flow, script, contentviews
-from . import flowlist, flowview, help, window, signals, options
-from . import grideditor, palettes, statusbar, palettepicker
-
-EVENTLOG_SIZE = 500
-
-
-class ConsoleState(flow.State):
-
- def __init__(self):
- flow.State.__init__(self)
- self.focus = None
- self.follow_focus = None
- self.default_body_view = contentviews.get("Auto")
- self.flowsettings = weakref.WeakKeyDictionary()
- self.last_search = None
-
- def __setattr__(self, name, value):
- self.__dict__[name] = value
- signals.update_settings.send(self)
-
- def add_flow_setting(self, flow, key, value):
- d = self.flowsettings.setdefault(flow, {})
- d[key] = value
-
- def get_flow_setting(self, flow, key, default=None):
- d = self.flowsettings.get(flow, {})
- return d.get(key, default)
-
- def add_flow(self, f):
- super(ConsoleState, self).add_flow(f)
- if self.focus is None:
- self.set_focus(0)
- elif self.follow_focus:
- self.set_focus(len(self.view) - 1)
- self.set_flow_marked(f, False)
- return f
-
- def update_flow(self, f):
- super(ConsoleState, self).update_flow(f)
- if self.focus is None:
- self.set_focus(0)
- return f
-
- def set_limit(self, limit):
- ret = flow.State.set_limit(self, limit)
- self.set_focus(self.focus)
- return ret
-
- def get_focus(self):
- if not self.view or self.focus is None:
- return None, None
- return self.view[self.focus], self.focus
-
- def set_focus(self, idx):
- if self.view:
- if idx >= len(self.view):
- idx = len(self.view) - 1
- elif idx < 0:
- idx = 0
- self.focus = idx
- else:
- self.focus = None
-
- def set_focus_flow(self, f):
- self.set_focus(self.view.index(f))
-
- def get_from_pos(self, pos):
- if len(self.view) <= pos or pos < 0:
- return None, None
- return self.view[pos], pos
-
- def get_next(self, pos):
- return self.get_from_pos(pos + 1)
-
- def get_prev(self, pos):
- return self.get_from_pos(pos - 1)
-
- def delete_flow(self, f):
- if f in self.view and self.view.index(f) <= self.focus:
- self.focus -= 1
- if self.focus < 0:
- self.focus = None
- ret = flow.State.delete_flow(self, f)
- self.set_focus(self.focus)
- return ret
-
- def clear(self):
- marked_flows = []
- for f in self.flows:
- if self.flow_marked(f):
- marked_flows.append(f)
-
- super(ConsoleState, self).clear()
-
- for f in marked_flows:
- self.add_flow(f)
- self.set_flow_marked(f, True)
-
- if len(self.flows.views) == 0:
- self.focus = None
- else:
- self.focus = 0
- self.set_focus(self.focus)
-
- def flow_marked(self, flow):
- return self.get_flow_setting(flow, "marked", False)
-
- def set_flow_marked(self, flow, marked):
- self.add_flow_setting(flow, "marked", marked)
-
-
-class Options(object):
- attributes = [
- "app",
- "app_domain",
- "app_ip",
- "anticache",
- "anticomp",
- "client_replay",
- "eventlog",
- "follow",
- "keepserving",
- "kill",
- "intercept",
- "limit",
- "no_server",
- "refresh_server_playback",
- "rfile",
- "scripts",
- "showhost",
- "replacements",
- "rheaders",
- "setheaders",
- "server_replay",
- "stickycookie",
- "stickyauth",
- "stream_large_bodies",
- "verbosity",
- "wfile",
- "nopop",
- "palette",
- "palette_transparent",
- "no_mouse"
- ]
-
- def __init__(self, **kwargs):
- for k, v in kwargs.items():
- setattr(self, k, v)
- for i in self.attributes:
- if not hasattr(self, i):
- setattr(self, i, None)
-
-
-class ConsoleMaster(flow.FlowMaster):
- palette = []
-
- def __init__(self, server, options):
- flow.FlowMaster.__init__(self, server, ConsoleState())
- self.stream_path = None
- self.options = options
-
- for i in options.replacements:
- self.replacehooks.add(*i)
-
- for i in options.setheaders:
- self.setheaders.add(*i)
-
- r = self.set_intercept(options.intercept)
- if r:
- print >> sys.stderr, "Intercept error:", r
- sys.exit(1)
-
- if options.limit:
- self.set_limit(options.limit)
-
- r = self.set_stickycookie(options.stickycookie)
- if r:
- print >> sys.stderr, "Sticky cookies error:", r
- sys.exit(1)
-
- r = self.set_stickyauth(options.stickyauth)
- if r:
- print >> sys.stderr, "Sticky auth error:", r
- sys.exit(1)
-
- self.set_stream_large_bodies(options.stream_large_bodies)
-
- self.refresh_server_playback = options.refresh_server_playback
- self.anticache = options.anticache
- self.anticomp = options.anticomp
- self.killextra = options.kill
- self.rheaders = options.rheaders
- self.nopop = options.nopop
- self.showhost = options.showhost
- self.palette = options.palette
- self.palette_transparent = options.palette_transparent
-
- self.eventlog = options.eventlog
- self.eventlist = urwid.SimpleListWalker([])
- self.follow = options.follow
-
- if options.client_replay:
- self.client_playback_path(options.client_replay)
-
- if options.server_replay:
- self.server_playback_path(options.server_replay)
-
- if options.scripts:
- for i in options.scripts:
- err = self.load_script(i)
- if err:
- print >> sys.stderr, "Script load error:", err
- sys.exit(1)
-
- if options.outfile:
- err = self.start_stream_to_path(
- options.outfile[0],
- options.outfile[1]
- )
- if err:
- print >> sys.stderr, "Stream file error:", err
- sys.exit(1)
-
- self.view_stack = []
-
- if options.app:
- self.start_app(self.options.app_host, self.options.app_port)
- signals.call_in.connect(self.sig_call_in)
- signals.pop_view_state.connect(self.sig_pop_view_state)
- signals.push_view_state.connect(self.sig_push_view_state)
- signals.sig_add_event.connect(self.sig_add_event)
-
- def __setattr__(self, name, value):
- self.__dict__[name] = value
- signals.update_settings.send(self)
-
- def load_script(self, command, use_reloader=True):
- # We default to using the reloader in the console ui.
- super(ConsoleMaster, self).load_script(command, use_reloader)
-
- def sig_add_event(self, sender, e, level):
- needed = dict(error=0, info=1, debug=2).get(level, 1)
- if self.options.verbosity < needed:
- return
-
- if level == "error":
- e = urwid.Text(("error", str(e)))
- else:
- e = urwid.Text(str(e))
- self.eventlist.append(e)
- if len(self.eventlist) > EVENTLOG_SIZE:
- self.eventlist.pop(0)
- self.eventlist.set_focus(len(self.eventlist) - 1)
-
- def add_event(self, e, level):
- signals.add_event(e, level)
-
- def sig_call_in(self, sender, seconds, callback, args=()):
- def cb(*_):
- return callback(*args)
- self.loop.set_alarm_in(seconds, cb)
-
- def sig_pop_view_state(self, sender):
- if len(self.view_stack) > 1:
- self.view_stack.pop()
- self.loop.widget = self.view_stack[-1]
- else:
- signals.status_prompt_onekey.send(
- self,
- prompt = "Quit",
- keys = (
- ("yes", "y"),
- ("no", "n"),
- ),
- callback = self.quit,
- )
-
- def sig_push_view_state(self, sender, window):
- self.view_stack.append(window)
- self.loop.widget = window
- self.loop.draw_screen()
-
- def _run_script_method(self, method, s, f):
- status, val = s.run(method, f)
- if val:
- if status:
- signals.add_event("Method %s return: %s" % (method, val), "debug")
- else:
- signals.add_event(
- "Method %s error: %s" %
- (method, val[1]), "error")
-
- def run_script_once(self, command, f):
- if not command:
- return
- signals.add_event("Running script on flow: %s" % command, "debug")
-
- try:
- s = script.Script(command, script.ScriptContext(self))
- except script.ScriptException as v:
- signals.status_message.send(
- message = "Error loading script."
- )
- signals.add_event("Error loading script:\n%s" % v.args[0], "error")
- return
-
- if f.request:
- self._run_script_method("request", s, f)
- if f.response:
- self._run_script_method("response", s, f)
- if f.error:
- self._run_script_method("error", s, f)
- s.unload()
- signals.flow_change.send(self, flow = f)
-
- def set_script(self, command):
- if not command:
- return
- ret = self.load_script(command)
- if ret:
- signals.status_message.send(message=ret)
-
- def toggle_eventlog(self):
- self.eventlog = not self.eventlog
- signals.pop_view_state.send(self)
- self.view_flowlist()
-
- def _readflows(self, path):
- """
- Utitility function that reads a list of flows
- or prints an error to the UI if that fails.
- Returns
- - None, if there was an error.
- - a list of flows, otherwise.
- """
- try:
- return flow.read_flows_from_paths(path)
- except flow.FlowReadError as e:
- signals.status_message.send(message=e.strerror)
-
- def client_playback_path(self, path):
- if not isinstance(path, list):
- path = [path]
- flows = self._readflows(path)
- if flows:
- self.start_client_playback(flows, False)
-
- def server_playback_path(self, path):
- if not isinstance(path, list):
- path = [path]
- flows = self._readflows(path)
- if flows:
- self.start_server_playback(
- flows,
- self.killextra, self.rheaders,
- False, self.nopop,
- self.options.replay_ignore_params,
- self.options.replay_ignore_content,
- self.options.replay_ignore_payload_params,
- self.options.replay_ignore_host
- )
-
- def spawn_editor(self, data):
- fd, name = tempfile.mkstemp('', "mproxy")
- os.write(fd, data)
- os.close(fd)
- c = os.environ.get("EDITOR")
- # if no EDITOR is set, assume 'vi'
- if not c:
- c = "vi"
- cmd = shlex.split(c)
- cmd.append(name)
- self.ui.stop()
- try:
- subprocess.call(cmd)
- except:
- signals.status_message.send(
- message = "Can't start editor: %s" % " ".join(c)
- )
- else:
- data = open(name, "rb").read()
- self.ui.start()
- os.unlink(name)
- return data
-
- def spawn_external_viewer(self, data, contenttype):
- if contenttype:
- contenttype = contenttype.split(";")[0]
- ext = mimetypes.guess_extension(contenttype) or ""
- else:
- ext = ""
- fd, name = tempfile.mkstemp(ext, "mproxy")
- os.write(fd, data)
- os.close(fd)
-
- # read-only to remind the user that this is a view function
- os.chmod(name, stat.S_IREAD)
-
- cmd = None
- shell = False
-
- if contenttype:
- c = mailcap.getcaps()
- cmd, _ = mailcap.findmatch(c, contenttype, filename=name)
- if cmd:
- shell = True
- if not cmd:
- # hm which one should get priority?
- c = os.environ.get("PAGER") or os.environ.get("EDITOR")
- if not c:
- c = "less"
- cmd = shlex.split(c)
- cmd.append(name)
- self.ui.stop()
- try:
- subprocess.call(cmd, shell=shell)
- except:
- signals.status_message.send(
- message="Can't start external viewer: %s" % " ".join(c)
- )
- self.ui.start()
- os.unlink(name)
-
- def set_palette(self, name):
- self.palette = name
- self.ui.register_palette(
- palettes.palettes[name].palette(self.palette_transparent)
- )
- self.ui.clear()
-
- def ticker(self, *userdata):
- changed = self.tick(self.masterq, timeout=0)
- if changed:
- self.loop.draw_screen()
- signals.update_settings.send()
- self.loop.set_alarm_in(0.01, self.ticker)
-
- def run(self):
- self.ui = urwid.raw_display.Screen()
- self.ui.set_terminal_properties(256)
- self.set_palette(self.palette)
- self.loop = urwid.MainLoop(
- urwid.SolidFill("x"),
- screen = self.ui,
- handle_mouse = not self.options.no_mouse,
- )
-
- self.server.start_slave(
- controller.Slave,
- controller.Channel(self.masterq, self.should_exit)
- )
-
- if self.options.rfile:
- ret = self.load_flows_path(self.options.rfile)
- if ret and self.state.flow_count():
- signals.add_event(
- "File truncated or corrupted. "
- "Loaded as many flows as possible.",
- "error"
- )
- elif ret and not self.state.flow_count():
- self.shutdown()
- print >> sys.stderr, "Could not load file:", ret
- sys.exit(1)
-
- self.loop.set_alarm_in(0.01, self.ticker)
-
- # It's not clear why we need to handle this explicitly - without this,
- # mitmproxy hangs on keyboard interrupt. Remove if we ever figure it
- # out.
- def exit(s, f):
- raise urwid.ExitMainLoop
- signal.signal(signal.SIGINT, exit)
-
- self.loop.set_alarm_in(
- 0.0001,
- lambda *args: self.view_flowlist()
- )
-
- try:
- self.loop.run()
- except Exception:
- self.loop.stop()
- sys.stdout.flush()
- print >> sys.stderr, traceback.format_exc()
- print >> sys.stderr, "mitmproxy has crashed!"
- print >> sys.stderr, "Please lodge a bug report at:"
- print >> sys.stderr, "\thttps://github.com/mitmproxy/mitmproxy"
- print >> sys.stderr, "Shutting down..."
- sys.stderr.flush()
- self.shutdown()
-
- def view_help(self, helpctx):
- signals.push_view_state.send(
- self,
- window = window.Window(
- self,
- help.HelpView(helpctx),
- None,
- statusbar.StatusBar(self, help.footer),
- None
- )
- )
-
- def view_options(self):
- for i in self.view_stack:
- if isinstance(i["body"], options.Options):
- return
- signals.push_view_state.send(
- self,
- window = window.Window(
- self,
- options.Options(self),
- None,
- statusbar.StatusBar(self, options.footer),
- options.help_context,
- )
- )
-
- def view_palette_picker(self):
- signals.push_view_state.send(
- self,
- window = window.Window(
- self,
- palettepicker.PalettePicker(self),
- None,
- statusbar.StatusBar(self, palettepicker.footer),
- palettepicker.help_context,
- )
- )
-
- def view_grideditor(self, ge):
- signals.push_view_state.send(
- self,
- window = window.Window(
- self,
- ge,
- None,
- statusbar.StatusBar(self, grideditor.FOOTER),
- ge.make_help()
- )
- )
-
- def view_flowlist(self):
- if self.ui.started:
- self.ui.clear()
- if self.state.follow_focus:
- self.state.set_focus(self.state.flow_count())
-
- if self.eventlog:
- body = flowlist.BodyPile(self)
- else:
- body = flowlist.FlowListBox(self)
-
- if self.follow:
- self.toggle_follow_flows()
-
- signals.push_view_state.send(
- self,
- window = window.Window(
- self,
- body,
- None,
- statusbar.StatusBar(self, flowlist.footer),
- flowlist.help_context
- )
- )
-
- def view_flow(self, flow, tab_offset=0):
- self.state.set_focus_flow(flow)
- signals.push_view_state.send(
- self,
- window = window.Window(
- self,
- flowview.FlowView(self, self.state, flow, tab_offset),
- flowview.FlowViewHeader(self, flow),
- statusbar.StatusBar(self, flowview.footer),
- flowview.help_context
- )
- )
-
- def _write_flows(self, path, flows):
- if not path:
- return
- path = os.path.expanduser(path)
- try:
- f = file(path, "wb")
- fw = flow.FlowWriter(f)
- for i in flows:
- fw.add(i)
- f.close()
- except IOError as v:
- signals.status_message.send(message=v.strerror)
-
- def save_one_flow(self, path, flow):
- return self._write_flows(path, [flow])
-
- def save_flows(self, path):
- return self._write_flows(path, self.state.view)
-
- def save_marked_flows(self, path):
- marked_flows = []
- for f in self.state.view:
- if self.state.flow_marked(f):
- marked_flows.append(f)
- return self._write_flows(path, marked_flows)
-
- def load_flows_callback(self, path):
- if not path:
- return
- ret = self.load_flows_path(path)
- return ret or "Flows loaded from %s" % path
-
- def load_flows_path(self, path):
- reterr = None
- try:
- flow.FlowMaster.load_flows_file(self, path)
- except flow.FlowReadError as v:
- reterr = str(v)
- signals.flowlist_change.send(self)
- return reterr
-
- def accept_all(self):
- self.state.accept_all(self)
-
- def set_limit(self, txt):
- v = self.state.set_limit(txt)
- signals.flowlist_change.send(self)
- return v
-
- def set_intercept(self, txt):
- return self.state.set_intercept(txt)
-
- def change_default_display_mode(self, t):
- v = contentviews.get_by_shortcut(t)
- self.state.default_body_view = v
- self.refresh_focus()
-
- def edit_scripts(self, scripts):
- commands = [x[0] for x in scripts] # remove outer array
- if commands == [s.command for s in self.scripts]:
- return
-
- self.unload_scripts()
- for command in commands:
- self.load_script(command)
- signals.update_settings.send(self)
-
- def stop_client_playback_prompt(self, a):
- if a != "n":
- self.stop_client_playback()
-
- def stop_server_playback_prompt(self, a):
- if a != "n":
- self.stop_server_playback()
-
- def quit(self, a):
- if a != "n":
- raise urwid.ExitMainLoop
-
- def shutdown(self):
- self.state.killall(self)
- flow.FlowMaster.shutdown(self)
-
- def clear_flows(self):
- self.state.clear()
- signals.flowlist_change.send(self)
-
- def toggle_follow_flows(self):
- # toggle flow follow
- self.state.follow_focus = not self.state.follow_focus
- # jump to most recent flow if follow is now on
- if self.state.follow_focus:
- self.state.set_focus(self.state.flow_count())
- signals.flowlist_change.send(self)
-
- def delete_flow(self, f):
- self.state.delete_flow(f)
- signals.flowlist_change.send(self)
-
- def refresh_focus(self):
- if self.state.view:
- signals.flow_change.send(
- self,
- flow = self.state.view[self.state.focus]
- )
-
- def process_flow(self, f):
- if self.state.intercept and f.match(
- self.state.intercept) and not f.request.is_replay:
- f.intercept(self)
- else:
- # check if flow was intercepted within an inline script by flow.intercept()
- if f.intercepted:
- f.intercept(self)
- else:
- f.reply()
- signals.flowlist_change.send(self)
- signals.flow_change.send(self, flow = f)
-
- def clear_events(self):
- self.eventlist[:] = []
-
- # Handlers
- def handle_error(self, f):
- f = flow.FlowMaster.handle_error(self, f)
- if f:
- self.process_flow(f)
- return f
-
- def handle_request(self, f):
- f = flow.FlowMaster.handle_request(self, f)
- if f:
- self.process_flow(f)
- return f
-
- def handle_response(self, f):
- f = flow.FlowMaster.handle_response(self, f)
- if f:
- self.process_flow(f)
- return f
-
- def handle_script_change(self, script):
- if super(ConsoleMaster, self).handle_script_change(script):
- signals.status_message.send(message='"{}" reloaded.'.format(script.filename))
- else:
- signals.status_message.send(message='Error reloading "{}".'.format(script.filename))
diff --git a/libmproxy/console/common.py b/libmproxy/console/common.py
deleted file mode 100644
index c29ffddc..00000000
--- a/libmproxy/console/common.py
+++ /dev/null
@@ -1,444 +0,0 @@
-from __future__ import absolute_import
-
-import urwid
-import urwid.util
-import os
-
-from netlib.http import CONTENT_MISSING
-import netlib.utils
-
-from .. import utils
-from .. import flow_export
-from ..models import decoded
-from . import signals
-
-
-try:
- import pyperclip
-except:
- pyperclip = False
-
-
-VIEW_FLOW_REQUEST = 0
-VIEW_FLOW_RESPONSE = 1
-
-METHOD_OPTIONS = [
- ("get", "g"),
- ("post", "p"),
- ("put", "u"),
- ("head", "h"),
- ("trace", "t"),
- ("delete", "d"),
- ("options", "o"),
- ("edit raw", "e"),
-]
-
-
-def is_keypress(k):
- """
- Is this input event a keypress?
- """
- if isinstance(k, basestring):
- return True
-
-
-def highlight_key(str, key, textattr="text", keyattr="key"):
- l = []
- parts = str.split(key, 1)
- if parts[0]:
- l.append((textattr, parts[0]))
- l.append((keyattr, key))
- if parts[1]:
- l.append((textattr, parts[1]))
- return l
-
-
-KEY_MAX = 30
-
-
-def format_keyvals(lst, key="key", val="text", indent=0):
- """
- Format a list of (key, value) tuples.
-
- If key is None, it's treated specially:
- - We assume a sub-value, and add an extra indent.
- - The value is treated as a pre-formatted list of directives.
- """
- ret = []
- if lst:
- maxk = min(max(len(i[0]) for i in lst if i and i[0]), KEY_MAX)
- for i, kv in enumerate(lst):
- if kv is None:
- ret.append(urwid.Text(""))
- else:
- if isinstance(kv[1], urwid.Widget):
- v = kv[1]
- elif kv[1] is None:
- v = urwid.Text("")
- else:
- v = urwid.Text([(val, kv[1])])
- ret.append(
- urwid.Columns(
- [
- ("fixed", indent, urwid.Text("")),
- (
- "fixed",
- maxk,
- urwid.Text([(key, kv[0] or "")])
- ),
- v
- ],
- dividechars = 2
- )
- )
- return ret
-
-
-def shortcuts(k):
- if k == " ":
- k = "page down"
- elif k == "ctrl f":
- k = "page down"
- elif k == "ctrl b":
- k = "page up"
- elif k == "j":
- k = "down"
- elif k == "k":
- k = "up"
- return k
-
-
-def fcol(s, attr):
- s = unicode(s)
- return (
- "fixed",
- len(s),
- urwid.Text(
- [
- (attr, s)
- ]
- )
- )
-
-if urwid.util.detected_encoding:
- SYMBOL_REPLAY = u"\u21ba"
- SYMBOL_RETURN = u"\u2190"
- SYMBOL_MARK = u"\u25cf"
-else:
- SYMBOL_REPLAY = u"[r]"
- SYMBOL_RETURN = u"<-"
- SYMBOL_MARK = "[m]"
-
-
-def raw_format_flow(f, focus, extended):
- f = dict(f)
- pile = []
- req = []
- if extended:
- req.append(
- fcol(
- utils.format_timestamp(f["req_timestamp"]),
- "highlight"
- )
- )
- else:
- req.append(fcol(">>" if focus else " ", "focus"))
-
- if f["marked"]:
- req.append(fcol(SYMBOL_MARK, "mark"))
-
- if f["req_is_replay"]:
- req.append(fcol(SYMBOL_REPLAY, "replay"))
- req.append(fcol(f["req_method"], "method"))
-
- preamble = sum(i[1] for i in req) + len(req) - 1
-
- if f["intercepted"] and not f["acked"]:
- uc = "intercept"
- elif f["resp_code"] or f["err_msg"]:
- uc = "text"
- else:
- uc = "title"
-
- url = f["req_url"]
- if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"):
- url += " " + f["req_http_version"]
- req.append(
- urwid.Text([(uc, url)])
- )
-
- pile.append(urwid.Columns(req, dividechars=1))
-
- resp = []
- resp.append(
- ("fixed", preamble, urwid.Text(""))
- )
-
- if f["resp_code"]:
- codes = {
- 2: "code_200",
- 3: "code_300",
- 4: "code_400",
- 5: "code_500",
- }
- ccol = codes.get(f["resp_code"] / 100, "code_other")
- resp.append(fcol(SYMBOL_RETURN, ccol))
- if f["resp_is_replay"]:
- resp.append(fcol(SYMBOL_REPLAY, "replay"))
- resp.append(fcol(f["resp_code"], ccol))
- if f["intercepted"] and f["resp_code"] and not f["acked"]:
- rc = "intercept"
- else:
- rc = "text"
-
- if f["resp_ctype"]:
- resp.append(fcol(f["resp_ctype"], rc))
- resp.append(fcol(f["resp_clen"], rc))
- resp.append(fcol(f["roundtrip"], rc))
-
- elif f["err_msg"]:
- resp.append(fcol(SYMBOL_RETURN, "error"))
- resp.append(
- urwid.Text([
- (
- "error",
- f["err_msg"]
- )
- ])
- )
- pile.append(urwid.Columns(resp, dividechars=1))
- return urwid.Pile(pile)
-
-
-# Save file to disk
-def save_data(path, data):
- if not path:
- return
- try:
- with file(path, "wb") as f:
- f.write(data)
- except IOError as v:
- signals.status_message.send(message=v.strerror)
-
-
-def ask_save_overwrite(path, data):
- if not path:
- return
- path = os.path.expanduser(path)
- if os.path.exists(path):
- def save_overwrite(k):
- if k == "y":
- save_data(path, data)
-
- signals.status_prompt_onekey.send(
- prompt = "'" + path + "' already exists. Overwrite?",
- keys = (
- ("yes", "y"),
- ("no", "n"),
- ),
- callback = save_overwrite
- )
- else:
- save_data(path, data)
-
-
-def ask_save_path(prompt, data):
- signals.status_prompt_path.send(
- prompt = prompt,
- callback = ask_save_overwrite,
- args = (data, )
- )
-
-
-def copy_flow_format_data(part, scope, flow):
- if part == "u":
- data = flow.request.url
- else:
- data = ""
- if scope in ("q", "a"):
- if flow.request.content is None or flow.request.content == CONTENT_MISSING:
- return None, "Request content is missing"
- with decoded(flow.request):
- if part == "h":
- data += netlib.http.http1.assemble_request(flow.request)
- elif part == "c":
- data += flow.request.content
- else:
- raise ValueError("Unknown part: {}".format(part))
- if scope == "a" and flow.request.content and flow.response:
- # Add padding between request and response
- data += "\r\n" * 2
- if scope in ("s", "a") and flow.response:
- if flow.response.content is None or flow.response.content == CONTENT_MISSING:
- return None, "Response content is missing"
- with decoded(flow.response):
- if part == "h":
- data += netlib.http.http1.assemble_response(flow.response)
- elif part == "c":
- data += flow.response.content
- else:
- raise ValueError("Unknown part: {}".format(part))
- return data, False
-
-
-def export_prompt(k, flow):
- exporters = {
- "c": flow_export.curl_command,
- "p": flow_export.python_code,
- "r": flow_export.raw_request,
- }
- if k in exporters:
- copy_to_clipboard_or_prompt(exporters[k](flow))
-
-
-def copy_to_clipboard_or_prompt(data):
- # pyperclip calls encode('utf-8') on data to be copied without checking.
- # if data are already encoded that way UnicodeDecodeError is thrown.
- toclip = ""
- try:
- toclip = data.decode('utf-8')
- except (UnicodeDecodeError):
- toclip = data
-
- try:
- pyperclip.copy(toclip)
- except (RuntimeError, UnicodeDecodeError, AttributeError):
- def save(k):
- if k == "y":
- ask_save_path("Save data", data)
- signals.status_prompt_onekey.send(
- prompt = "Cannot copy data to clipboard. Save as file?",
- keys = (
- ("yes", "y"),
- ("no", "n"),
- ),
- callback = save
- )
-
-
-def copy_flow(part, scope, flow, master, state):
- """
- part: _c_ontent, _h_eaders+content, _u_rl
- scope: _a_ll, re_q_uest, re_s_ponse
- """
- data, err = copy_flow_format_data(part, scope, flow)
-
- if err:
- signals.status_message.send(message=err)
- return
-
- if not data:
- if scope == "q":
- signals.status_message.send(message="No request content to copy.")
- elif scope == "s":
- signals.status_message.send(message="No response content to copy.")
- else:
- signals.status_message.send(message="No contents to copy.")
- return
-
- copy_to_clipboard_or_prompt(data)
-
-
-def ask_copy_part(scope, flow, master, state):
- choices = [
- ("content", "c"),
- ("headers+content", "h")
- ]
- if scope != "s":
- choices.append(("url", "u"))
-
- signals.status_prompt_onekey.send(
- prompt = "Copy",
- keys = choices,
- callback = copy_flow,
- args = (scope, flow, master, state)
- )
-
-
-def ask_save_body(part, master, state, flow):
- """
- Save either the request or the response body to disk. part can either be
- "q" (request), "s" (response) or None (ask user if necessary).
- """
-
- request_has_content = flow.request and flow.request.content
- response_has_content = flow.response and flow.response.content
-
- if part is None:
- # We first need to determine whether we want to save the request or the
- # response content.
- if request_has_content and response_has_content:
- signals.status_prompt_onekey.send(
- prompt = "Save",
- keys = (
- ("request", "q"),
- ("response", "s"),
- ),
- callback = ask_save_body,
- args = (master, state, flow)
- )
- elif response_has_content:
- ask_save_body("s", master, state, flow)
- else:
- ask_save_body("q", master, state, flow)
-
- elif part == "q" and request_has_content:
- ask_save_path(
- "Save request content",
- flow.request.get_decoded_content()
- )
- elif part == "s" and response_has_content:
- ask_save_path(
- "Save response content",
- flow.response.get_decoded_content()
- )
- else:
- signals.status_message.send(message="No content to save.")
-
-
-flowcache = utils.LRUCache(800)
-
-
-def format_flow(f, focus, extended=False, hostheader=False, marked=False):
- d = dict(
- intercepted = f.intercepted,
- acked = f.reply.acked,
-
- req_timestamp = f.request.timestamp_start,
- req_is_replay = f.request.is_replay,
- req_method = f.request.method,
- req_url = f.request.pretty_url if hostheader else f.request.url,
- req_http_version = f.request.http_version,
-
- err_msg = f.error.msg if f.error else None,
- resp_code = f.response.status_code if f.response else None,
-
- marked = marked,
- )
- if f.response:
- if f.response.content:
- contentdesc = netlib.utils.pretty_size(len(f.response.content))
- elif f.response.content == CONTENT_MISSING:
- contentdesc = "[content missing]"
- else:
- contentdesc = "[no content]"
- duration = 0
- if f.response.timestamp_end and f.request.timestamp_start:
- duration = f.response.timestamp_end - f.request.timestamp_start
- roundtrip = utils.pretty_duration(duration)
-
- d.update(dict(
- resp_code = f.response.status_code,
- resp_is_replay = f.response.is_replay,
- resp_clen = contentdesc,
- roundtrip = roundtrip,
- ))
- t = f.response.headers.get("content-type")
- if t:
- d["resp_ctype"] = t.split(";")[0]
- else:
- d["resp_ctype"] = ""
- return flowcache.get(
- raw_format_flow,
- tuple(sorted(d.items())), focus, extended
- )
diff --git a/libmproxy/console/flowdetailview.py b/libmproxy/console/flowdetailview.py
deleted file mode 100644
index f4b4262e..00000000
--- a/libmproxy/console/flowdetailview.py
+++ /dev/null
@@ -1,153 +0,0 @@
-from __future__ import absolute_import
-import urwid
-from . import common, searchable
-from .. import utils
-
-
-def maybe_timestamp(base, attr):
- if base and getattr(base, attr):
- return utils.format_timestamp_with_milli(getattr(base, attr))
- else:
- return "active"
-
-
-def flowdetails(state, flow):
- text = []
-
- cc = flow.client_conn
- sc = flow.server_conn
- req = flow.request
- resp = flow.response
-
- if sc is not None:
- text.append(urwid.Text([("head", "Server Connection:")]))
- parts = [
- ["Address", "%s:%s" % sc.address()],
- ]
-
- text.extend(
- common.format_keyvals(parts, key="key", val="text", indent=4)
- )
-
- c = sc.cert
- if c:
- text.append(urwid.Text([("head", "Server Certificate:")]))
- parts = [
- ["Type", "%s, %s bits" % c.keyinfo],
- ["SHA1 digest", c.digest("sha1")],
- ["Valid to", str(c.notafter)],
- ["Valid from", str(c.notbefore)],
- ["Serial", str(c.serial)],
- [
- "Subject",
- urwid.BoxAdapter(
- urwid.ListBox(
- common.format_keyvals(
- c.subject,
- key="highlight",
- val="text"
- )
- ),
- len(c.subject)
- )
- ],
- [
- "Issuer",
- urwid.BoxAdapter(
- urwid.ListBox(
- common.format_keyvals(
- c.issuer, key="highlight", val="text"
- )
- ),
- len(c.issuer)
- )
- ]
- ]
-
- if c.altnames:
- parts.append(
- [
- "Alt names",
- ", ".join(c.altnames)
- ]
- )
- text.extend(
- common.format_keyvals(parts, key="key", val="text", indent=4)
- )
-
- if cc is not None:
- text.append(urwid.Text([("head", "Client Connection:")]))
-
- parts = [
- ["Address", "%s:%s" % cc.address()],
- # ["Requests", "%s"%cc.requestcount],
- ]
-
- text.extend(
- common.format_keyvals(parts, key="key", val="text", indent=4)
- )
-
- parts = []
-
- parts.append(
- [
- "Client conn. established",
- maybe_timestamp(cc, "timestamp_start")
- ]
- )
- parts.append(
- [
- "Server conn. initiated",
- maybe_timestamp(sc, "timestamp_start")
- ]
- )
- parts.append(
- [
- "Server conn. TCP handshake",
- maybe_timestamp(sc, "timestamp_tcp_setup")
- ]
- )
- if sc.ssl_established:
- parts.append(
- [
- "Server conn. SSL handshake",
- maybe_timestamp(sc, "timestamp_ssl_setup")
- ]
- )
- parts.append(
- [
- "Client conn. SSL handshake",
- maybe_timestamp(cc, "timestamp_ssl_setup")
- ]
- )
- parts.append(
- [
- "First request byte",
- maybe_timestamp(req, "timestamp_start")
- ]
- )
- parts.append(
- [
- "Request complete",
- maybe_timestamp(req, "timestamp_end")
- ]
- )
- parts.append(
- [
- "First response byte",
- maybe_timestamp(resp, "timestamp_start")
- ]
- )
- parts.append(
- [
- "Response complete",
- maybe_timestamp(resp, "timestamp_end")
- ]
- )
-
- # sort operations by timestamp
- parts = sorted(parts, key=lambda p: p[1])
-
- text.append(urwid.Text([("head", "Timing:")]))
- text.extend(common.format_keyvals(parts, key="key", val="text", indent=4))
- return searchable.Searchable(state, text)
diff --git a/libmproxy/console/flowlist.py b/libmproxy/console/flowlist.py
deleted file mode 100644
index c2201055..00000000
--- a/libmproxy/console/flowlist.py
+++ /dev/null
@@ -1,397 +0,0 @@
-from __future__ import absolute_import
-import urwid
-
-import netlib.utils
-
-from . import common, signals
-
-
-def _mkhelp():
- text = []
- keys = [
- ("A", "accept all intercepted flows"),
- ("a", "accept this intercepted flow"),
- ("b", "save request/response body"),
- ("C", "clear flow list or eventlog"),
- ("d", "delete flow"),
- ("D", "duplicate flow"),
- ("E", "export"),
- ("e", "toggle eventlog"),
- ("F", "toggle follow flow list"),
- ("l", "set limit filter pattern"),
- ("L", "load saved flows"),
- ("m", "toggle flow mark"),
- ("n", "create a new request"),
- ("P", "copy flow to clipboard"),
- ("r", "replay request"),
- ("U", "unmark all marked flows"),
- ("V", "revert changes to request"),
- ("w", "save flows "),
- ("W", "stream flows to file"),
- ("X", "kill and delete flow, even if it's mid-intercept"),
- ("tab", "tab between eventlog and flow list"),
- ("enter", "view flow"),
- ("|", "run script on this flow"),
- ]
- text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
- return text
-help_context = _mkhelp()
-
-footer = [
- ('heading_key', "?"), ":help ",
-]
-
-
-class EventListBox(urwid.ListBox):
-
- def __init__(self, master):
- self.master = master
- urwid.ListBox.__init__(self, master.eventlist)
-
- def keypress(self, size, key):
- key = common.shortcuts(key)
- if key == "C":
- self.master.clear_events()
- key = None
- elif key == "G":
- self.set_focus(len(self.master.eventlist) - 1)
- elif key == "g":
- self.set_focus(0)
- return urwid.ListBox.keypress(self, size, key)
-
-
-class BodyPile(urwid.Pile):
-
- def __init__(self, master):
- h = urwid.Text("Event log")
- h = urwid.Padding(h, align="left", width=("relative", 100))
-
- self.inactive_header = urwid.AttrWrap(h, "heading_inactive")
- self.active_header = urwid.AttrWrap(h, "heading")
-
- urwid.Pile.__init__(
- self,
- [
- FlowListBox(master),
- urwid.Frame(
- EventListBox(master),
- header = self.inactive_header
- )
- ]
- )
- self.master = master
-
- def keypress(self, size, key):
- if key == "tab":
- self.focus_position = (
- self.focus_position + 1) % len(self.widget_list)
- if self.focus_position == 1:
- self.widget_list[1].header = self.active_header
- else:
- self.widget_list[1].header = self.inactive_header
- key = None
- elif key == "e":
- self.master.toggle_eventlog()
- key = None
-
- # This is essentially a copypasta from urwid.Pile's keypress handler.
- # So much for "closed for modification, but open for extension".
- item_rows = None
- if len(size) == 2:
- item_rows = self.get_item_rows(size, focus = True)
- i = self.widget_list.index(self.focus_item)
- tsize = self.get_item_size(size, i, True, item_rows)
- return self.focus_item.keypress(tsize, key)
-
-
-class ConnectionItem(urwid.WidgetWrap):
-
- def __init__(self, master, state, flow, focus):
- self.master, self.state, self.flow = master, state, flow
- self.f = focus
- w = self.get_text()
- urwid.WidgetWrap.__init__(self, w)
-
- def get_text(self):
- return common.format_flow(
- self.flow,
- self.f,
- hostheader = self.master.showhost,
- marked=self.state.flow_marked(self.flow)
- )
-
- def selectable(self):
- return True
-
- def save_flows_prompt(self, k):
- if k == "a":
- signals.status_prompt_path.send(
- prompt = "Save all flows to",
- callback = self.master.save_flows
- )
- elif k == "m":
- signals.status_prompt_path.send(
- prompt = "Save marked flows to",
- callback = self.master.save_marked_flows
- )
- else:
- signals.status_prompt_path.send(
- prompt = "Save this flow to",
- callback = self.master.save_one_flow,
- args = (self.flow,)
- )
-
- def stop_server_playback_prompt(self, a):
- if a != "n":
- self.master.stop_server_playback()
-
- def server_replay_prompt(self, k):
- if k == "a":
- self.master.start_server_playback(
- [i.copy() for i in self.master.state.view],
- self.master.killextra, self.master.rheaders,
- False, self.master.nopop,
- self.master.options.replay_ignore_params,
- self.master.options.replay_ignore_content,
- self.master.options.replay_ignore_payload_params,
- self.master.options.replay_ignore_host
- )
- elif k == "t":
- self.master.start_server_playback(
- [self.flow.copy()],
- self.master.killextra, self.master.rheaders,
- False, self.master.nopop,
- self.master.options.replay_ignore_params,
- self.master.options.replay_ignore_content,
- self.master.options.replay_ignore_payload_params,
- self.master.options.replay_ignore_host
- )
- else:
- signals.status_prompt_path.send(
- prompt = "Server replay path",
- callback = self.master.server_playback_path
- )
-
- def mouse_event(self, size, event, button, col, row, focus):
- if event == "mouse press" and button == 1:
- if self.flow.request:
- self.master.view_flow(self.flow)
- return True
-
- def keypress(self, xxx_todo_changeme, key):
- (maxcol,) = xxx_todo_changeme
- key = common.shortcuts(key)
- if key == "a":
- self.flow.accept_intercept(self.master)
- signals.flowlist_change.send(self)
- elif key == "d":
- self.flow.kill(self.master)
- self.state.delete_flow(self.flow)
- signals.flowlist_change.send(self)
- elif key == "D":
- f = self.master.duplicate_flow(self.flow)
- self.master.view_flow(f)
- elif key == "m":
- if self.state.flow_marked(self.flow):
- self.state.set_flow_marked(self.flow, False)
- else:
- self.state.set_flow_marked(self.flow, True)
- signals.flowlist_change.send(self)
- elif key == "r":
- r = self.master.replay_request(self.flow)
- if r:
- signals.status_message.send(message=r)
- signals.flowlist_change.send(self)
- elif key == "S":
- if not self.master.server_playback:
- signals.status_prompt_onekey.send(
- prompt = "Server Replay",
- keys = (
- ("all flows", "a"),
- ("this flow", "t"),
- ("file", "f"),
- ),
- callback = self.server_replay_prompt,
- )
- else:
- signals.status_prompt_onekey.send(
- prompt = "Stop current server replay?",
- keys = (
- ("yes", "y"),
- ("no", "n"),
- ),
- callback = self.stop_server_playback_prompt,
- )
- elif key == "U":
- for f in self.state.flows:
- self.state.set_flow_marked(f, False)
- signals.flowlist_change.send(self)
- elif key == "V":
- if not self.flow.modified():
- signals.status_message.send(message="Flow not modified.")
- return
- self.state.revert(self.flow)
- signals.flowlist_change.send(self)
- signals.status_message.send(message="Reverted.")
- elif key == "w":
- signals.status_prompt_onekey.send(
- self,
- prompt = "Save",
- keys = (
- ("all flows", "a"),
- ("this flow", "t"),
- ("marked flows", "m"),
- ),
- callback = self.save_flows_prompt,
- )
- elif key == "X":
- self.flow.kill(self.master)
- elif key == "enter":
- if self.flow.request:
- self.master.view_flow(self.flow)
- elif key == "|":
- signals.status_prompt_path.send(
- prompt = "Send flow to script",
- callback = self.master.run_script_once,
- args = (self.flow,)
- )
- elif key == "P":
- common.ask_copy_part("a", self.flow, self.master, self.state)
- elif key == "E":
- signals.status_prompt_onekey.send(
- self,
- prompt = "Export",
- keys = (
- ("as curl command", "c"),
- ("as python code", "p"),
- ("as raw request", "r"),
- ),
- callback = common.export_prompt,
- args = (self.flow,)
- )
- elif key == "b":
- common.ask_save_body(None, self.master, self.state, self.flow)
- else:
- return key
-
-
-class FlowListWalker(urwid.ListWalker):
-
- def __init__(self, master, state):
- self.master, self.state = master, state
- signals.flowlist_change.connect(self.sig_flowlist_change)
-
- def sig_flowlist_change(self, sender):
- self._modified()
-
- def get_focus(self):
- f, i = self.state.get_focus()
- f = ConnectionItem(self.master, self.state, f, True) if f else None
- return f, i
-
- def set_focus(self, focus):
- ret = self.state.set_focus(focus)
- return ret
-
- def get_next(self, pos):
- f, i = self.state.get_next(pos)
- f = ConnectionItem(self.master, self.state, f, False) if f else None
- return f, i
-
- def get_prev(self, pos):
- f, i = self.state.get_prev(pos)
- f = ConnectionItem(self.master, self.state, f, False) if f else None
- return f, i
-
-
-class FlowListBox(urwid.ListBox):
-
- def __init__(self, master):
- self.master = master
- urwid.ListBox.__init__(
- self,
- FlowListWalker(master, master.state)
- )
-
- def get_method_raw(self, k):
- if k:
- self.get_url(k)
-
- def get_method(self, k):
- if k == "e":
- signals.status_prompt.send(
- self,
- prompt = "Method",
- text = "",
- callback = self.get_method_raw
- )
- else:
- method = ""
- for i in common.METHOD_OPTIONS:
- if i[1] == k:
- method = i[0].upper()
- self.get_url(method)
-
- def get_url(self, method):
- signals.status_prompt.send(
- prompt = "URL",
- text = "http://www.example.com/",
- callback = self.new_request,
- args = (method,)
- )
-
- def new_request(self, url, method):
- parts = netlib.utils.parse_url(str(url))
- if not parts:
- signals.status_message.send(message="Invalid Url")
- return
- scheme, host, port, path = parts
- f = self.master.create_request(method, scheme, host, port, path)
- self.master.view_flow(f)
-
- def keypress(self, size, key):
- key = common.shortcuts(key)
- if key == "A":
- self.master.accept_all()
- signals.flowlist_change.send(self)
- elif key == "C":
- self.master.clear_flows()
- elif key == "e":
- self.master.toggle_eventlog()
- elif key == "g":
- self.master.state.set_focus(0)
- signals.flowlist_change.send(self)
- elif key == "G":
- self.master.state.set_focus(self.master.state.flow_count())
- signals.flowlist_change.send(self)
- elif key == "l":
- signals.status_prompt.send(
- prompt = "Limit",
- text = self.master.state.limit_txt,
- callback = self.master.set_limit
- )
- elif key == "L":
- signals.status_prompt_path.send(
- self,
- prompt = "Load flows",
- callback = self.master.load_flows_callback
- )
- elif key == "n":
- signals.status_prompt_onekey.send(
- prompt = "Method",
- keys = common.METHOD_OPTIONS,
- callback = self.get_method
- )
- elif key == "F":
- self.master.toggle_follow_flows()
- elif key == "W":
- if self.master.stream:
- self.master.stop_stream()
- else:
- signals.status_prompt_path.send(
- self,
- prompt = "Stream flows to",
- callback = self.master.start_stream_to_path
- )
- else:
- return urwid.ListBox.keypress(self, size, key)
diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py
deleted file mode 100644
index d2b98b68..00000000
--- a/libmproxy/console/flowview.py
+++ /dev/null
@@ -1,711 +0,0 @@
-from __future__ import absolute_import, division
-import os
-import traceback
-import sys
-
-import math
-import urwid
-
-from netlib import odict
-from netlib.http import CONTENT_MISSING, Headers
-from . import common, grideditor, signals, searchable, tabs
-from . import flowdetailview
-from .. import utils, controller, contentviews
-from ..models import HTTPRequest, HTTPResponse, decoded
-from ..exceptions import ContentViewException
-
-
-class SearchError(Exception):
- pass
-
-
-def _mkhelp():
- text = []
- keys = [
- ("A", "accept all intercepted flows"),
- ("a", "accept this intercepted flow"),
- ("b", "save request/response body"),
- ("D", "duplicate flow"),
- ("d", "delete flow"),
- ("E", "export"),
- ("e", "edit request/response"),
- ("f", "load full body data"),
- ("m", "change body display mode for this entity"),
- (None,
- common.highlight_key("automatic", "a") +
- [("text", ": automatic detection")]
- ),
- (None,
- common.highlight_key("hex", "e") +
- [("text", ": Hex")]
- ),
- (None,
- common.highlight_key("html", "h") +
- [("text", ": HTML")]
- ),
- (None,
- common.highlight_key("image", "i") +
- [("text", ": Image")]
- ),
- (None,
- common.highlight_key("javascript", "j") +
- [("text", ": JavaScript")]
- ),
- (None,
- common.highlight_key("json", "s") +
- [("text", ": JSON")]
- ),
- (None,
- common.highlight_key("urlencoded", "u") +
- [("text", ": URL-encoded data")]
- ),
- (None,
- common.highlight_key("raw", "r") +
- [("text", ": raw data")]
- ),
- (None,
- common.highlight_key("xml", "x") +
- [("text", ": XML")]
- ),
- ("M", "change default body display mode"),
- ("p", "previous flow"),
- ("P", "copy response(content/headers) to clipboard"),
- ("r", "replay request"),
- ("V", "revert changes to request"),
- ("v", "view body in external viewer"),
- ("w", "save all flows matching current limit"),
- ("W", "save this flow"),
- ("x", "delete body"),
- ("z", "encode/decode a request/response"),
- ("tab", "next tab"),
- ("h, l", "previous tab, next tab"),
- ("space", "next flow"),
- ("|", "run script on this flow"),
- ("/", "search (case sensitive)"),
- ("n", "repeat search forward"),
- ("N", "repeat search backwards"),
- ]
- text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
- return text
-help_context = _mkhelp()
-
-footer = [
- ('heading_key', "?"), ":help ",
- ('heading_key', "q"), ":back ",
-]
-
-
-class FlowViewHeader(urwid.WidgetWrap):
-
- def __init__(self, master, f):
- self.master, self.flow = master, f
- self._w = common.format_flow(
- f,
- False,
- extended=True,
- hostheader=self.master.showhost
- )
- signals.flow_change.connect(self.sig_flow_change)
-
- def sig_flow_change(self, sender, flow):
- if flow == self.flow:
- self._w = common.format_flow(
- flow,
- False,
- extended=True,
- hostheader=self.master.showhost
- )
-
-
-cache = utils.LRUCache(200)
-
-TAB_REQ = 0
-TAB_RESP = 1
-
-
-class FlowView(tabs.Tabs):
- highlight_color = "focusfield"
-
- def __init__(self, master, state, flow, tab_offset):
- self.master, self.state, self.flow = master, state, flow
- tabs.Tabs.__init__(self,
- [
- (self.tab_request, self.view_request),
- (self.tab_response, self.view_response),
- (self.tab_details, self.view_details),
- ],
- tab_offset
- )
- self.show()
- self.last_displayed_body = None
- signals.flow_change.connect(self.sig_flow_change)
-
- def tab_request(self):
- if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response:
- return "Request intercepted"
- else:
- return "Request"
-
- def tab_response(self):
- if self.flow.intercepted and not self.flow.reply.acked and self.flow.response:
- return "Response intercepted"
- else:
- return "Response"
-
- def tab_details(self):
- return "Detail"
-
- def view_request(self):
- return self.conn_text(self.flow.request)
-
- def view_response(self):
- return self.conn_text(self.flow.response)
-
- def view_details(self):
- return flowdetailview.flowdetails(self.state, self.flow)
-
- def sig_flow_change(self, sender, flow):
- if flow == self.flow:
- self.show()
-
- def content_view(self, viewmode, message):
- if message.content == CONTENT_MISSING:
- msg, body = "", [urwid.Text([("error", "[content missing]")])]
- return msg, body
- else:
- full = self.state.get_flow_setting(
- self.flow,
- (self.tab_offset, "fullcontents"),
- False
- )
- if full:
- limit = sys.maxsize
- else:
- limit = contentviews.VIEW_CUTOFF
- return cache.get(
- self._get_content_view,
- viewmode,
- message,
- limit,
- (bytes(message.headers), message.content) # Cache invalidation
- )
-
- def _get_content_view(self, viewmode, message, max_lines, _):
-
- try:
- description, lines = contentviews.get_content_view(
- viewmode, message.content, headers=message.headers
- )
- except ContentViewException:
- s = "Content viewer failed: \n" + traceback.format_exc()
- signals.add_event(s, "error")
- description, lines = contentviews.get_content_view(
- contentviews.get("Raw"), message.content, headers=message.headers
- )
- description = description.replace("Raw", "Couldn't parse: falling back to Raw")
-
- # Give hint that you have to tab for the response.
- if description == "No content" and isinstance(message, HTTPRequest):
- description = "No request content (press tab to view response)"
-
- # If the users has a wide terminal, he gets fewer lines; this should not be an issue.
- chars_per_line = 80
- max_chars = max_lines * chars_per_line
- total_chars = 0
- text_objects = []
- for line in lines:
- txt = []
- for (style, text) in line:
- if total_chars + len(text) > max_chars:
- text = text[:max_chars - total_chars]
- txt.append((style, text))
- total_chars += len(text)
- if total_chars == max_chars:
- break
-
- # round up to the next line.
- total_chars = int(math.ceil(total_chars / chars_per_line) * chars_per_line)
-
- text_objects.append(urwid.Text(txt))
- if total_chars == max_chars:
- text_objects.append(urwid.Text([
- ("highlight", "Stopped displaying data after %d lines. Press " % max_lines),
- ("key", "f"),
- ("highlight", " to load all data.")
- ]))
- break
-
- return description, text_objects
-
- def viewmode_get(self):
- override = self.state.get_flow_setting(
- self.flow,
- (self.tab_offset, "prettyview")
- )
- return self.state.default_body_view if override is None else override
-
- def conn_text(self, conn):
- if conn:
- txt = common.format_keyvals(
- [(h + ":", v) for (h, v) in conn.headers.fields],
- key = "header",
- val = "text"
- )
- viewmode = self.viewmode_get()
- msg, body = self.content_view(viewmode, conn)
-
- cols = [
- urwid.Text(
- [
- ("heading", msg),
- ]
- ),
- urwid.Text(
- [
- " ",
- ('heading', "["),
- ('heading_key', "m"),
- ('heading', (":%s]" % viewmode.name)),
- ],
- align="right"
- )
- ]
- title = urwid.AttrWrap(urwid.Columns(cols), "heading")
-
- txt.append(title)
- txt.extend(body)
- else:
- txt = [
- urwid.Text(""),
- urwid.Text(
- [
- ("highlight", "No response. Press "),
- ("key", "e"),
- ("highlight", " and edit any aspect to add one."),
- ]
- )
- ]
- return searchable.Searchable(self.state, txt)
-
- def set_method_raw(self, m):
- if m:
- self.flow.request.method = m
- signals.flow_change.send(self, flow = self.flow)
-
- def edit_method(self, m):
- if m == "e":
- signals.status_prompt.send(
- prompt = "Method",
- text = self.flow.request.method,
- callback = self.set_method_raw
- )
- else:
- for i in common.METHOD_OPTIONS:
- if i[1] == m:
- self.flow.request.method = i[0].upper()
- signals.flow_change.send(self, flow = self.flow)
-
- def set_url(self, url):
- request = self.flow.request
- try:
- request.url = str(url)
- except ValueError:
- return "Invalid URL."
- signals.flow_change.send(self, flow = self.flow)
-
- def set_resp_code(self, code):
- response = self.flow.response
- try:
- response.status_code = int(code)
- except ValueError:
- return None
- import BaseHTTPServer
- if int(code) in BaseHTTPServer.BaseHTTPRequestHandler.responses:
- response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[
- int(code)][0]
- signals.flow_change.send(self, flow = self.flow)
-
- def set_resp_msg(self, msg):
- response = self.flow.response
- response.msg = msg
- signals.flow_change.send(self, flow = self.flow)
-
- def set_headers(self, fields, conn):
- conn.headers = Headers(fields)
- signals.flow_change.send(self, flow = self.flow)
-
- def set_query(self, lst, conn):
- conn.set_query(odict.ODict(lst))
- signals.flow_change.send(self, flow = self.flow)
-
- def set_path_components(self, lst, conn):
- conn.set_path_components(lst)
- signals.flow_change.send(self, flow = self.flow)
-
- def set_form(self, lst, conn):
- conn.set_form_urlencoded(odict.ODict(lst))
- signals.flow_change.send(self, flow = self.flow)
-
- def edit_form(self, conn):
- self.master.view_grideditor(
- grideditor.URLEncodedFormEditor(
- self.master,
- conn.get_form_urlencoded().lst,
- self.set_form,
- conn
- )
- )
-
- def edit_form_confirm(self, key, conn):
- if key == "y":
- self.edit_form(conn)
-
- def set_cookies(self, lst, conn):
- od = odict.ODict(lst)
- conn.set_cookies(od)
- signals.flow_change.send(self, flow = self.flow)
-
- def set_setcookies(self, data, conn):
- conn.set_cookies(data)
- signals.flow_change.send(self, flow = self.flow)
-
- def edit(self, part):
- if self.tab_offset == TAB_REQ:
- message = self.flow.request
- else:
- if not self.flow.response:
- self.flow.response = HTTPResponse(
- self.flow.request.http_version,
- 200, "OK", Headers(), ""
- )
- self.flow.response.reply = controller.DummyReply()
- message = self.flow.response
-
- self.flow.backup()
- if message == self.flow.request and part == "c":
- self.master.view_grideditor(
- grideditor.CookieEditor(
- self.master,
- message.get_cookies().lst,
- self.set_cookies,
- message
- )
- )
- if message == self.flow.response and part == "c":
- self.master.view_grideditor(
- grideditor.SetCookieEditor(
- self.master,
- message.get_cookies(),
- self.set_setcookies,
- message
- )
- )
- if part == "r":
- with decoded(message):
- # Fix an issue caused by some editors when editing a
- # request/response body. Many editors make it hard to save a
- # file without a terminating newline on the last line. When
- # editing message bodies, this can cause problems. For now, I
- # just strip the newlines off the end of the body when we return
- # from an editor.
- c = self.master.spawn_editor(message.content or "")
- message.content = c.rstrip("\n")
- elif part == "f":
- if not message.get_form_urlencoded() and message.content:
- signals.status_prompt_onekey.send(
- prompt = "Existing body is not a URL-encoded form. Clear and edit?",
- keys = [
- ("yes", "y"),
- ("no", "n"),
- ],
- callback = self.edit_form_confirm,
- args = (message,)
- )
- else:
- self.edit_form(message)
- elif part == "h":
- self.master.view_grideditor(
- grideditor.HeaderEditor(
- self.master,
- message.headers.fields,
- self.set_headers,
- message
- )
- )
- elif part == "p":
- p = message.get_path_components()
- self.master.view_grideditor(
- grideditor.PathEditor(
- self.master,
- p,
- self.set_path_components,
- message
- )
- )
- elif part == "q":
- self.master.view_grideditor(
- grideditor.QueryEditor(
- self.master,
- message.get_query().lst,
- self.set_query, message
- )
- )
- elif part == "u":
- signals.status_prompt.send(
- prompt = "URL",
- text = message.url,
- callback = self.set_url
- )
- elif part == "m":
- signals.status_prompt_onekey.send(
- prompt = "Method",
- keys = common.METHOD_OPTIONS,
- callback = self.edit_method
- )
- elif part == "o":
- signals.status_prompt.send(
- prompt = "Code",
- text = str(message.status_code),
- callback = self.set_resp_code
- )
- elif part == "m":
- signals.status_prompt.send(
- prompt = "Message",
- text = message.msg,
- callback = self.set_resp_msg
- )
- signals.flow_change.send(self, flow = self.flow)
-
- def _view_nextprev_flow(self, np, flow):
- try:
- idx = self.state.view.index(flow)
- except IndexError:
- return
- if np == "next":
- new_flow, new_idx = self.state.get_next(idx)
- else:
- new_flow, new_idx = self.state.get_prev(idx)
- if new_flow is None:
- signals.status_message.send(message="No more flows!")
- else:
- signals.pop_view_state.send(self)
- self.master.view_flow(new_flow, self.tab_offset)
-
- def view_next_flow(self, flow):
- return self._view_nextprev_flow("next", flow)
-
- def view_prev_flow(self, flow):
- return self._view_nextprev_flow("prev", flow)
-
- def change_this_display_mode(self, t):
- self.state.add_flow_setting(
- self.flow,
- (self.tab_offset, "prettyview"),
- contentviews.get_by_shortcut(t)
- )
- signals.flow_change.send(self, flow = self.flow)
-
- def delete_body(self, t):
- if t == "m":
- val = CONTENT_MISSING
- else:
- val = None
- if self.tab_offset == TAB_REQ:
- self.flow.request.content = val
- else:
- self.flow.response.content = val
- signals.flow_change.send(self, flow = self.flow)
-
- def keypress(self, size, key):
- key = super(self.__class__, self).keypress(size, key)
-
- if key == " ":
- self.view_next_flow(self.flow)
- return
-
- key = common.shortcuts(key)
- if self.tab_offset == TAB_REQ:
- conn = self.flow.request
- elif self.tab_offset == TAB_RESP:
- conn = self.flow.response
- else:
- conn = None
-
- if key in ("up", "down", "page up", "page down"):
- # Why doesn't this just work??
- self._w.keypress(size, key)
- elif key == "a":
- self.flow.accept_intercept(self.master)
- signals.flow_change.send(self, flow = self.flow)
- elif key == "A":
- self.master.accept_all()
- signals.flow_change.send(self, flow = self.flow)
- elif key == "d":
- if self.state.flow_count() == 1:
- self.master.view_flowlist()
- elif self.state.view.index(self.flow) == len(self.state.view) - 1:
- self.view_prev_flow(self.flow)
- else:
- self.view_next_flow(self.flow)
- f = self.flow
- f.kill(self.master)
- self.state.delete_flow(f)
- elif key == "D":
- f = self.master.duplicate_flow(self.flow)
- self.master.view_flow(f)
- signals.status_message.send(message="Duplicated.")
- elif key == "p":
- self.view_prev_flow(self.flow)
- elif key == "r":
- r = self.master.replay_request(self.flow)
- if r:
- signals.status_message.send(message=r)
- signals.flow_change.send(self, flow = self.flow)
- elif key == "V":
- if not self.flow.modified():
- signals.status_message.send(message="Flow not modified.")
- return
- self.state.revert(self.flow)
- signals.flow_change.send(self, flow = self.flow)
- signals.status_message.send(message="Reverted.")
- elif key == "W":
- signals.status_prompt_path.send(
- prompt = "Save this flow",
- callback = self.master.save_one_flow,
- args = (self.flow,)
- )
- elif key == "E":
- signals.status_prompt_onekey.send(
- self,
- prompt = "Export",
- keys = (
- ("as curl command", "c"),
- ("as python code", "p"),
- ("as raw request", "r"),
- ),
- callback = common.export_prompt,
- args = (self.flow,)
- )
- elif key == "|":
- signals.status_prompt_path.send(
- prompt = "Send flow to script",
- callback = self.master.run_script_once,
- args = (self.flow,)
- )
-
- if not conn and key in set(list("befgmxvz")):
- signals.status_message.send(
- message = "Tab to the request or response",
- expire = 1
- )
- elif conn:
- if key == "b":
- if self.tab_offset == TAB_REQ:
- common.ask_save_body(
- "q", self.master, self.state, self.flow
- )
- else:
- common.ask_save_body(
- "s", self.master, self.state, self.flow
- )
- elif key == "e":
- if self.tab_offset == TAB_REQ:
- signals.status_prompt_onekey.send(
- prompt = "Edit request",
- keys = (
- ("cookies", "c"),
- ("query", "q"),
- ("path", "p"),
- ("url", "u"),
- ("header", "h"),
- ("form", "f"),
- ("raw body", "r"),
- ("method", "m"),
- ),
- callback = self.edit
- )
- else:
- signals.status_prompt_onekey.send(
- prompt = "Edit response",
- keys = (
- ("cookies", "c"),
- ("code", "o"),
- ("message", "m"),
- ("header", "h"),
- ("raw body", "r"),
- ),
- callback = self.edit
- )
- key = None
- elif key == "f":
- signals.status_message.send(message="Loading all body data...")
- self.state.add_flow_setting(
- self.flow,
- (self.tab_offset, "fullcontents"),
- True
- )
- signals.flow_change.send(self, flow = self.flow)
- signals.status_message.send(message="")
- elif key == "P":
- if self.tab_offset == TAB_REQ:
- scope = "q"
- else:
- scope = "s"
- common.ask_copy_part(scope, self.flow, self.master, self.state)
- elif key == "m":
- p = list(contentviews.view_prompts)
- p.insert(0, ("Clear", "C"))
- signals.status_prompt_onekey.send(
- self,
- prompt = "Display mode",
- keys = p,
- callback = self.change_this_display_mode
- )
- key = None
- elif key == "x":
- signals.status_prompt_onekey.send(
- prompt = "Delete body",
- keys = (
- ("completely", "c"),
- ("mark as missing", "m"),
- ),
- callback = self.delete_body
- )
- key = None
- elif key == "v":
- if conn.content:
- t = conn.headers.get("content-type")
- if "EDITOR" in os.environ or "PAGER" in os.environ:
- self.master.spawn_external_viewer(conn.content, t)
- else:
- signals.status_message.send(
- message = "Error! Set $EDITOR or $PAGER."
- )
- elif key == "z":
- self.flow.backup()
- e = conn.headers.get("content-encoding", "identity")
- if e != "identity":
- if not conn.decode():
- signals.status_message.send(
- message = "Could not decode - invalid data?"
- )
- else:
- signals.status_prompt_onekey.send(
- prompt = "Select encoding: ",
- keys = (
- ("gzip", "z"),
- ("deflate", "d"),
- ),
- callback = self.encode_callback,
- args = (conn,)
- )
- signals.flow_change.send(self, flow = self.flow)
- return key
-
- def encode_callback(self, key, conn):
- encoding_map = {
- "z": "gzip",
- "d": "deflate",
- }
- conn.encode(encoding_map[key])
- signals.flow_change.send(self, flow = self.flow)
diff --git a/libmproxy/console/grideditor.py b/libmproxy/console/grideditor.py
deleted file mode 100644
index a11c962c..00000000
--- a/libmproxy/console/grideditor.py
+++ /dev/null
@@ -1,716 +0,0 @@
-from __future__ import absolute_import
-
-import copy
-import re
-import os
-import urwid
-
-from netlib import odict
-from netlib.http import user_agents
-
-from . import common, signals
-from .. import utils, filt, script
-
-
-FOOTER = [
- ('heading_key', "enter"), ":edit ",
- ('heading_key', "q"), ":back ",
-]
-FOOTER_EDITING = [
- ('heading_key', "esc"), ":stop editing ",
-]
-
-
-class TextColumn:
- subeditor = None
-
- def __init__(self, heading):
- self.heading = heading
-
- def text(self, obj):
- return SEscaped(obj or "")
-
- def blank(self):
- return ""
-
- def keypress(self, key, editor):
- if key == "r":
- if editor.walker.get_current_value() is not None:
- signals.status_prompt_path.send(
- self,
- prompt = "Read file",
- callback = editor.read_file
- )
- elif key == "R":
- if editor.walker.get_current_value() is not None:
- signals.status_prompt_path.send(
- editor,
- prompt = "Read unescaped file",
- callback = editor.read_file,
- args = (True,)
- )
- elif key == "e":
- o = editor.walker.get_current_value()
- if o is not None:
- n = editor.master.spawn_editor(o.encode("string-escape"))
- n = utils.clean_hanging_newline(n)
- editor.walker.set_current_value(n, False)
- editor.walker._modified()
- elif key in ["enter"]:
- editor.walker.start_edit()
- else:
- return key
-
-
-class SubgridColumn:
-
- def __init__(self, heading, subeditor):
- self.heading = heading
- self.subeditor = subeditor
-
- def text(self, obj):
- p = http_cookies._format_pairs(obj, sep="\n")
- return urwid.Text(p)
-
- def blank(self):
- return []
-
- def keypress(self, key, editor):
- if key in "rRe":
- signals.status_message.send(
- self,
- message = "Press enter to edit this field.",
- expire = 1000
- )
- return
- elif key in ["enter"]:
- editor.master.view_grideditor(
- self.subeditor(
- editor.master,
- editor.walker.get_current_value(),
- editor.set_subeditor_value,
- editor.walker.focus,
- editor.walker.focus_col
- )
- )
- else:
- return key
-
-
-class SEscaped(urwid.WidgetWrap):
-
- def __init__(self, txt):
- txt = txt.encode("string-escape")
- w = urwid.Text(txt, wrap="any")
- urwid.WidgetWrap.__init__(self, w)
-
- def get_text(self):
- return self._w.get_text()[0]
-
- def keypress(self, size, key):
- return key
-
- def selectable(self):
- return True
-
-
-class SEdit(urwid.WidgetWrap):
-
- def __init__(self, txt):
- txt = txt.encode("string-escape")
- w = urwid.Edit(edit_text=txt, wrap="any", multiline=True)
- w = urwid.AttrWrap(w, "editfield")
- urwid.WidgetWrap.__init__(self, w)
-
- def get_text(self):
- return self._w.get_text()[0].strip()
-
- def selectable(self):
- return True
-
-
-class GridRow(urwid.WidgetWrap):
-
- def __init__(self, focused, editing, editor, values):
- self.focused, self.editing, self.editor = focused, editing, editor
-
- errors = values[1]
- self.fields = []
- for i, v in enumerate(values[0]):
- if focused == i and editing:
- self.editing = SEdit(v)
- self.fields.append(self.editing)
- else:
- w = self.editor.columns[i].text(v)
- if focused == i:
- if i in errors:
- w = urwid.AttrWrap(w, "focusfield_error")
- else:
- w = urwid.AttrWrap(w, "focusfield")
- elif i in errors:
- w = urwid.AttrWrap(w, "field_error")
- self.fields.append(w)
-
- fspecs = self.fields[:]
- if len(self.fields) > 1:
- fspecs[0] = ("fixed", self.editor.first_width + 2, fspecs[0])
- w = urwid.Columns(
- fspecs,
- dividechars = 2
- )
- if focused is not None:
- w.set_focus_column(focused)
- urwid.WidgetWrap.__init__(self, w)
-
- def get_edit_value(self):
- return self.editing.get_text()
-
- def keypress(self, s, k):
- if self.editing:
- w = self._w.column_widths(s)[self.focused]
- k = self.editing.keypress((w,), k)
- return k
-
- def selectable(self):
- return True
-
-
-class GridWalker(urwid.ListWalker):
-
- """
- Stores rows as a list of (rows, errors) tuples, where rows is a list
- and errors is a set with an entry of each offset in rows that is an
- error.
- """
-
- def __init__(self, lst, editor):
- self.lst = [(i, set([])) for i in lst]
- self.editor = editor
- self.focus = 0
- self.focus_col = 0
- self.editing = False
-
- def _modified(self):
- self.editor.show_empty_msg()
- return urwid.ListWalker._modified(self)
-
- def add_value(self, lst):
- self.lst.append((lst[:], set([])))
- self._modified()
-
- def get_current_value(self):
- if self.lst:
- return self.lst[self.focus][0][self.focus_col]
-
- def set_current_value(self, val, unescaped):
- if not unescaped:
- try:
- val = val.decode("string-escape")
- except ValueError:
- signals.status_message.send(
- self,
- message = "Invalid Python-style string encoding.",
- expire = 1000
- )
- return
- errors = self.lst[self.focus][1]
- emsg = self.editor.is_error(self.focus_col, val)
- if emsg:
- signals.status_message.send(message = emsg, expire = 1)
- errors.add(self.focus_col)
- else:
- errors.discard(self.focus_col)
- self.set_value(val, self.focus, self.focus_col, errors)
-
- def set_value(self, val, focus, focus_col, errors=None):
- if not errors:
- errors = set([])
- row = list(self.lst[focus][0])
- row[focus_col] = val
- self.lst[focus] = [tuple(row), errors]
- self._modified()
-
- def delete_focus(self):
- if self.lst:
- del self.lst[self.focus]
- self.focus = min(len(self.lst) - 1, self.focus)
- self._modified()
-
- def _insert(self, pos):
- self.focus = pos
- self.lst.insert(
- self.focus,
- [
- [c.blank() for c in self.editor.columns], set([])
- ]
- )
- self.focus_col = 0
- self.start_edit()
-
- def insert(self):
- return self._insert(self.focus)
-
- def add(self):
- return self._insert(min(self.focus + 1, len(self.lst)))
-
- def start_edit(self):
- col = self.editor.columns[self.focus_col]
- if self.lst and not col.subeditor:
- self.editing = GridRow(
- self.focus_col, True, self.editor, self.lst[self.focus]
- )
- self.editor.master.loop.widget.footer.update(FOOTER_EDITING)
- self._modified()
-
- def stop_edit(self):
- if self.editing:
- self.editor.master.loop.widget.footer.update(FOOTER)
- self.set_current_value(self.editing.get_edit_value(), False)
- self.editing = False
- self._modified()
-
- def left(self):
- self.focus_col = max(self.focus_col - 1, 0)
- self._modified()
-
- def right(self):
- self.focus_col = min(self.focus_col + 1, len(self.editor.columns) - 1)
- self._modified()
-
- def tab_next(self):
- self.stop_edit()
- if self.focus_col < len(self.editor.columns) - 1:
- self.focus_col += 1
- elif self.focus != len(self.lst) - 1:
- self.focus_col = 0
- self.focus += 1
- self._modified()
-
- def get_focus(self):
- if self.editing:
- return self.editing, self.focus
- elif self.lst:
- return GridRow(
- self.focus_col,
- False,
- self.editor,
- self.lst[self.focus]
- ), self.focus
- else:
- return None, None
-
- def set_focus(self, focus):
- self.stop_edit()
- self.focus = focus
- self._modified()
-
- def get_next(self, pos):
- if pos + 1 >= len(self.lst):
- return None, None
- return GridRow(None, False, self.editor, self.lst[pos + 1]), pos + 1
-
- def get_prev(self, pos):
- if pos - 1 < 0:
- return None, None
- return GridRow(None, False, self.editor, self.lst[pos - 1]), pos - 1
-
-
-class GridListBox(urwid.ListBox):
-
- def __init__(self, lw):
- urwid.ListBox.__init__(self, lw)
-
-
-FIRST_WIDTH_MAX = 40
-FIRST_WIDTH_MIN = 20
-
-
-class GridEditor(urwid.WidgetWrap):
- title = None
- columns = None
-
- def __init__(self, master, value, callback, *cb_args, **cb_kwargs):
- value = self.data_in(copy.deepcopy(value))
- self.master, self.value, self.callback = master, value, callback
- self.cb_args, self.cb_kwargs = cb_args, cb_kwargs
-
- first_width = 20
- if value:
- for r in value:
- assert len(r) == len(self.columns)
- first_width = max(len(r), first_width)
- self.first_width = min(first_width, FIRST_WIDTH_MAX)
-
- title = urwid.Text(self.title)
- title = urwid.Padding(title, align="left", width=("relative", 100))
- title = urwid.AttrWrap(title, "heading")
-
- headings = []
- for i, col in enumerate(self.columns):
- c = urwid.Text(col.heading)
- if i == 0 and len(self.columns) > 1:
- headings.append(("fixed", first_width + 2, c))
- else:
- headings.append(c)
- h = urwid.Columns(
- headings,
- dividechars = 2
- )
- h = urwid.AttrWrap(h, "heading")
-
- self.walker = GridWalker(self.value, self)
- self.lb = GridListBox(self.walker)
- self._w = urwid.Frame(
- self.lb,
- header = urwid.Pile([title, h])
- )
- self.master.loop.widget.footer.update("")
- self.show_empty_msg()
-
- def show_empty_msg(self):
- if self.walker.lst:
- self._w.set_footer(None)
- else:
- self._w.set_footer(
- urwid.Text(
- [
- ("highlight", "No values. Press "),
- ("key", "a"),
- ("highlight", " to add some."),
- ]
- )
- )
-
- def encode(self, s):
- if not self.encoding:
- return s
- try:
- return s.encode(self.encoding)
- except ValueError:
- return None
-
- def read_file(self, p, unescaped=False):
- if p:
- try:
- p = os.path.expanduser(p)
- d = file(p, "rb").read()
- self.walker.set_current_value(d, unescaped)
- self.walker._modified()
- except IOError as v:
- return str(v)
-
- def set_subeditor_value(self, val, focus, focus_col):
- self.walker.set_value(val, focus, focus_col)
-
- def keypress(self, size, key):
- if self.walker.editing:
- if key in ["esc"]:
- self.walker.stop_edit()
- elif key == "tab":
- pf, pfc = self.walker.focus, self.walker.focus_col
- self.walker.tab_next()
- if self.walker.focus == pf and self.walker.focus_col != pfc:
- self.walker.start_edit()
- else:
- self._w.keypress(size, key)
- return None
-
- key = common.shortcuts(key)
- column = self.columns[self.walker.focus_col]
- if key in ["q", "esc"]:
- res = []
- for i in self.walker.lst:
- if not i[1] and any([x for x in i[0]]):
- res.append(i[0])
- self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs)
- signals.pop_view_state.send(self)
- elif key == "g":
- self.walker.set_focus(0)
- elif key == "G":
- self.walker.set_focus(len(self.walker.lst) - 1)
- elif key in ["h", "left"]:
- self.walker.left()
- elif key in ["l", "right"]:
- self.walker.right()
- elif key == "tab":
- self.walker.tab_next()
- elif key == "a":
- self.walker.add()
- elif key == "A":
- self.walker.insert()
- elif key == "d":
- self.walker.delete_focus()
- elif column.keypress(key, self) and not self.handle_key(key):
- return self._w.keypress(size, key)
-
- def data_out(self, data):
- """
- Called on raw list data, before data is returned through the
- callback.
- """
- return data
-
- def data_in(self, data):
- """
- Called to prepare provided data.
- """
- return data
-
- def is_error(self, col, val):
- """
- Return False, or a string error message.
- """
- return False
-
- def handle_key(self, key):
- return False
-
- def make_help(self):
- text = []
- text.append(urwid.Text([("text", "Editor control:\n")]))
- keys = [
- ("A", "insert row before cursor"),
- ("a", "add row after cursor"),
- ("d", "delete row"),
- ("e", "spawn external editor on current field"),
- ("q", "save changes and exit editor"),
- ("r", "read value from file"),
- ("R", "read unescaped value from file"),
- ("esc", "save changes and exit editor"),
- ("tab", "next field"),
- ("enter", "edit field"),
- ]
- text.extend(
- common.format_keyvals(keys, key="key", val="text", indent=4)
- )
- text.append(
- urwid.Text(
- [
- "\n",
- ("text", "Values are escaped Python-style strings.\n"),
- ]
- )
- )
- return text
-
-
-class QueryEditor(GridEditor):
- title = "Editing query"
- columns = [
- TextColumn("Key"),
- TextColumn("Value")
- ]
-
-
-class HeaderEditor(GridEditor):
- title = "Editing headers"
- columns = [
- TextColumn("Key"),
- TextColumn("Value")
- ]
-
- def make_help(self):
- h = GridEditor.make_help(self)
- text = []
- text.append(urwid.Text([("text", "Special keys:\n")]))
- keys = [
- ("U", "add User-Agent header"),
- ]
- text.extend(
- common.format_keyvals(keys, key="key", val="text", indent=4)
- )
- text.append(urwid.Text([("text", "\n")]))
- text.extend(h)
- return text
-
- def set_user_agent(self, k):
- ua = user_agents.get_by_shortcut(k)
- if ua:
- self.walker.add_value(
- [
- "User-Agent",
- ua[2]
- ]
- )
-
- def handle_key(self, key):
- if key == "U":
- signals.status_prompt_onekey.send(
- prompt = "Add User-Agent header:",
- keys = [(i[0], i[1]) for i in user_agents.UASTRINGS],
- callback = self.set_user_agent,
- )
- return True
-
-
-class URLEncodedFormEditor(GridEditor):
- title = "Editing URL-encoded form"
- columns = [
- TextColumn("Key"),
- TextColumn("Value")
- ]
-
-
-class ReplaceEditor(GridEditor):
- title = "Editing replacement patterns"
- columns = [
- TextColumn("Filter"),
- TextColumn("Regex"),
- TextColumn("Replacement"),
- ]
-
- def is_error(self, col, val):
- if col == 0:
- if not filt.parse(val):
- return "Invalid filter specification."
- elif col == 1:
- try:
- re.compile(val)
- except re.error:
- return "Invalid regular expression."
- return False
-
-
-class SetHeadersEditor(GridEditor):
- title = "Editing header set patterns"
- columns = [
- TextColumn("Filter"),
- TextColumn("Header"),
- TextColumn("Value"),
- ]
-
- def is_error(self, col, val):
- if col == 0:
- if not filt.parse(val):
- return "Invalid filter specification"
- return False
-
- def make_help(self):
- h = GridEditor.make_help(self)
- text = []
- text.append(urwid.Text([("text", "Special keys:\n")]))
- keys = [
- ("U", "add User-Agent header"),
- ]
- text.extend(
- common.format_keyvals(keys, key="key", val="text", indent=4)
- )
- text.append(urwid.Text([("text", "\n")]))
- text.extend(h)
- return text
-
- def set_user_agent(self, k):
- ua = user_agents.get_by_shortcut(k)
- if ua:
- self.walker.add_value(
- [
- ".*",
- "User-Agent",
- ua[2]
- ]
- )
-
- def handle_key(self, key):
- if key == "U":
- signals.status_prompt_onekey.send(
- prompt = "Add User-Agent header:",
- keys = [(i[0], i[1]) for i in user_agents.UASTRINGS],
- callback = self.set_user_agent,
- )
- return True
-
-
-class PathEditor(GridEditor):
- title = "Editing URL path components"
- columns = [
- TextColumn("Component"),
- ]
-
- def data_in(self, data):
- return [[i] for i in data]
-
- def data_out(self, data):
- return [i[0] for i in data]
-
-
-class ScriptEditor(GridEditor):
- title = "Editing scripts"
- columns = [
- TextColumn("Command"),
- ]
-
- def is_error(self, col, val):
- try:
- script.Script.parse_command(val)
- except script.ScriptException as v:
- return str(v)
-
-
-class HostPatternEditor(GridEditor):
- title = "Editing host patterns"
- columns = [
- TextColumn("Regex (matched on hostname:port / ip:port)")
- ]
-
- def is_error(self, col, val):
- try:
- re.compile(val, re.IGNORECASE)
- except re.error as e:
- return "Invalid regex: %s" % str(e)
-
- def data_in(self, data):
- return [[i] for i in data]
-
- def data_out(self, data):
- return [i[0] for i in data]
-
-
-class CookieEditor(GridEditor):
- title = "Editing request Cookie header"
- columns = [
- TextColumn("Name"),
- TextColumn("Value"),
- ]
-
-
-class CookieAttributeEditor(GridEditor):
- title = "Editing Set-Cookie attributes"
- columns = [
- TextColumn("Name"),
- TextColumn("Value"),
- ]
-
- def data_out(self, data):
- ret = []
- for i in data:
- if not i[1]:
- ret.append([i[0], None])
- else:
- ret.append(i)
- return ret
-
-
-class SetCookieEditor(GridEditor):
- title = "Editing response SetCookie header"
- columns = [
- TextColumn("Name"),
- TextColumn("Value"),
- SubgridColumn("Attributes", CookieAttributeEditor),
- ]
-
- def data_in(self, data):
- flattened = []
- for k, v in data.items():
- flattened.append([k, v[0], v[1].lst])
- return flattened
-
- def data_out(self, data):
- vals = []
- for i in data:
- vals.append(
- [
- i[0],
- [i[1], odict.ODictCaseless(i[2])]
- ]
- )
- return odict.ODict(vals)
diff --git a/libmproxy/console/help.py b/libmproxy/console/help.py
deleted file mode 100644
index 0c264ebf..00000000
--- a/libmproxy/console/help.py
+++ /dev/null
@@ -1,117 +0,0 @@
-from __future__ import absolute_import
-
-import urwid
-
-from . import common, signals
-from .. import filt, version
-
-footer = [
- ("heading", 'mitmproxy v%s ' % version.VERSION),
- ('heading_key', "q"), ":back ",
-]
-
-
-class HelpView(urwid.ListBox):
-
- def __init__(self, help_context):
- self.help_context = help_context or []
- urwid.ListBox.__init__(
- self,
- self.helptext()
- )
-
- def helptext(self):
- text = []
- text.append(urwid.Text([("head", "This view:\n")]))
- text.extend(self.help_context)
-
- text.append(urwid.Text([("head", "\n\nMovement:\n")]))
- keys = [
- ("j, k", "down, up"),
- ("h, l", "left, right (in some contexts)"),
- ("g, G", "go to beginning, end"),
- ("space", "page down"),
- ("pg up/down", "page up/down"),
- ("ctrl+b/ctrl+f", "page up/down"),
- ("arrows", "up, down, left, right"),
- ]
- text.extend(
- common.format_keyvals(
- keys,
- key="key",
- val="text",
- indent=4))
-
- text.append(urwid.Text([("head", "\n\nGlobal keys:\n")]))
- keys = [
- ("c", "client replay of HTTP requests"),
- ("i", "set interception pattern"),
- ("o", "options"),
- ("q", "quit / return to previous page"),
- ("Q", "quit without confirm prompt"),
- ("S", "server replay of HTTP responses"),
- ]
- text.extend(
- common.format_keyvals(keys, key="key", val="text", indent=4)
- )
-
- text.append(urwid.Text([("head", "\n\nFilter expressions:\n")]))
- f = []
- for i in filt.filt_unary:
- f.append(
- ("~%s" % i.code, i.help)
- )
- for i in filt.filt_rex:
- f.append(
- ("~%s regex" % i.code, i.help)
- )
- for i in filt.filt_int:
- f.append(
- ("~%s int" % i.code, i.help)
- )
- f.sort()
- f.extend(
- [
- ("!", "unary not"),
- ("&", "and"),
- ("|", "or"),
- ("(...)", "grouping"),
- ]
- )
- text.extend(common.format_keyvals(f, key="key", val="text", indent=4))
-
- text.append(
- urwid.Text(
- [
- "\n",
- ("text", " Regexes are Python-style.\n"),
- ("text", " Regexes can be specified as quoted strings.\n"),
- ("text", " Header matching (~h, ~hq, ~hs) is against a string of the form \"name: value\".\n"),
- ("text", " Expressions with no operators are regex matches against URL.\n"),
- ("text", " Default binary operator is &.\n"),
- ("head", "\n Examples:\n"),
- ]
- )
- )
- examples = [
- ("google\.com", "Url containing \"google.com"),
- ("~q ~b test", "Requests where body contains \"test\""),
- ("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."),
- ]
- text.extend(
- common.format_keyvals(examples, key="key", val="text", indent=4)
- )
- return text
-
- def keypress(self, size, key):
- key = common.shortcuts(key)
- if key == "q":
- signals.pop_view_state.send(self)
- return None
- elif key == "?":
- key = None
- elif key == "g":
- self.set_focus(0)
- elif key == "G":
- self.set_focus(len(self.body.contents))
- return urwid.ListBox.keypress(self, size, key)
diff --git a/libmproxy/console/options.py b/libmproxy/console/options.py
deleted file mode 100644
index 5c9e0cc9..00000000
--- a/libmproxy/console/options.py
+++ /dev/null
@@ -1,271 +0,0 @@
-import urwid
-
-from .. import contentviews
-from . import common, signals, grideditor
-from . import select, palettes
-
-footer = [
- ('heading_key', "enter/space"), ":toggle ",
- ('heading_key', "C"), ":clear all ",
-]
-
-
-def _mkhelp():
- text = []
- keys = [
- ("enter/space", "activate option"),
- ("C", "clear all options"),
- ]
- text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
- return text
-help_context = _mkhelp()
-
-
-class Options(urwid.WidgetWrap):
-
- def __init__(self, master):
- self.master = master
- self.lb = select.Select(
- [
- select.Heading("Traffic Manipulation"),
- select.Option(
- "Header Set Patterns",
- "H",
- lambda: master.setheaders.count(),
- self.setheaders
- ),
- select.Option(
- "Ignore Patterns",
- "I",
- lambda: master.server.config.check_ignore,
- self.ignorepatterns
- ),
- select.Option(
- "Replacement Patterns",
- "R",
- lambda: master.replacehooks.count(),
- self.replacepatterns
- ),
- select.Option(
- "Scripts",
- "S",
- lambda: master.scripts,
- self.scripts
- ),
-
- select.Heading("Interface"),
- select.Option(
- "Default Display Mode",
- "M",
- self.has_default_displaymode,
- self.default_displaymode
- ),
- select.Option(
- "Palette",
- "P",
- lambda: self.master.palette != palettes.DEFAULT,
- self.palette
- ),
- select.Option(
- "Show Host",
- "w",
- lambda: master.showhost,
- self.toggle_showhost
- ),
-
- select.Heading("Network"),
- select.Option(
- "No Upstream Certs",
- "U",
- lambda: master.server.config.no_upstream_cert,
- self.toggle_upstream_cert
- ),
- select.Option(
- "TCP Proxying",
- "T",
- lambda: master.server.config.check_tcp,
- self.tcp_proxy
- ),
-
- select.Heading("Utility"),
- select.Option(
- "Anti-Cache",
- "a",
- lambda: master.anticache,
- self.toggle_anticache
- ),
- select.Option(
- "Anti-Compression",
- "o",
- lambda: master.anticomp,
- self.toggle_anticomp
- ),
- select.Option(
- "Kill Extra",
- "x",
- lambda: master.killextra,
- self.toggle_killextra
- ),
- select.Option(
- "No Refresh",
- "f",
- lambda: not master.refresh_server_playback,
- self.toggle_refresh_server_playback
- ),
- select.Option(
- "Sticky Auth",
- "A",
- lambda: master.stickyauth_txt,
- self.sticky_auth
- ),
- select.Option(
- "Sticky Cookies",
- "t",
- lambda: master.stickycookie_txt,
- self.sticky_cookie
- ),
- ]
- )
- title = urwid.Text("Options")
- title = urwid.Padding(title, align="left", width=("relative", 100))
- title = urwid.AttrWrap(title, "heading")
- self._w = urwid.Frame(
- self.lb,
- header = title
- )
- self.master.loop.widget.footer.update("")
- signals.update_settings.connect(self.sig_update_settings)
-
- def sig_update_settings(self, sender):
- self.lb.walker._modified()
-
- def keypress(self, size, key):
- if key == "C":
- self.clearall()
- return None
- return super(self.__class__, self).keypress(size, key)
-
- def clearall(self):
- self.master.anticache = False
- self.master.anticomp = False
- self.master.killextra = False
- self.master.showhost = False
- self.master.refresh_server_playback = True
- self.master.server.config.no_upstream_cert = False
- self.master.setheaders.clear()
- self.master.replacehooks.clear()
- self.master.set_ignore_filter([])
- self.master.set_tcp_filter([])
- self.master.scripts = []
- self.master.set_stickyauth(None)
- self.master.set_stickycookie(None)
- self.master.state.default_body_view = contentviews.get("Auto")
-
- signals.update_settings.send(self)
- signals.status_message.send(
- message = "All select.Options cleared",
- expire = 1
- )
-
- def toggle_anticache(self):
- self.master.anticache = not self.master.anticache
-
- def toggle_anticomp(self):
- self.master.anticomp = not self.master.anticomp
-
- def toggle_killextra(self):
- self.master.killextra = not self.master.killextra
-
- def toggle_showhost(self):
- self.master.showhost = not self.master.showhost
-
- def toggle_refresh_server_playback(self):
- self.master.refresh_server_playback = not self.master.refresh_server_playback
-
- def toggle_upstream_cert(self):
- self.master.server.config.no_upstream_cert = not self.master.server.config.no_upstream_cert
- signals.update_settings.send(self)
-
- def setheaders(self):
- def _set(*args, **kwargs):
- self.master.setheaders.set(*args, **kwargs)
- signals.update_settings.send(self)
- self.master.view_grideditor(
- grideditor.SetHeadersEditor(
- self.master,
- self.master.setheaders.get_specs(),
- _set
- )
- )
-
- def ignorepatterns(self):
- def _set(ignore):
- self.master.set_ignore_filter(ignore)
- signals.update_settings.send(self)
- self.master.view_grideditor(
- grideditor.HostPatternEditor(
- self.master,
- self.master.get_ignore_filter(),
- _set
- )
- )
-
- def replacepatterns(self):
- def _set(*args, **kwargs):
- self.master.replacehooks.set(*args, **kwargs)
- signals.update_settings.send(self)
- self.master.view_grideditor(
- grideditor.ReplaceEditor(
- self.master,
- self.master.replacehooks.get_specs(),
- _set
- )
- )
-
- def scripts(self):
- self.master.view_grideditor(
- grideditor.ScriptEditor(
- self.master,
- [[i.command] for i in self.master.scripts],
- self.master.edit_scripts
- )
- )
-
- def default_displaymode(self):
- signals.status_prompt_onekey.send(
- prompt = "Global default display mode",
- keys = contentviews.view_prompts,
- callback = self.master.change_default_display_mode
- )
-
- def has_default_displaymode(self):
- return self.master.state.default_body_view.name != "Auto"
-
- def tcp_proxy(self):
- def _set(tcp):
- self.master.set_tcp_filter(tcp)
- signals.update_settings.send(self)
- self.master.view_grideditor(
- grideditor.HostPatternEditor(
- self.master,
- self.master.get_tcp_filter(),
- _set
- )
- )
-
- def sticky_auth(self):
- signals.status_prompt.send(
- prompt = "Sticky auth filter",
- text = self.master.stickyauth_txt,
- callback = self.master.set_stickyauth
- )
-
- def sticky_cookie(self):
- signals.status_prompt.send(
- prompt = "Sticky cookie filter",
- text = self.master.stickycookie_txt,
- callback = self.master.set_stickycookie
- )
-
- def palette(self):
- self.master.view_palette_picker()
diff --git a/libmproxy/console/palettepicker.py b/libmproxy/console/palettepicker.py
deleted file mode 100644
index 51ad0606..00000000
--- a/libmproxy/console/palettepicker.py
+++ /dev/null
@@ -1,82 +0,0 @@
-import urwid
-
-from . import select, common, palettes, signals
-
-footer = [
- ('heading_key', "enter/space"), ":select",
-]
-
-
-def _mkhelp():
- text = []
- keys = [
- ("enter/space", "select"),
- ]
- text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
- return text
-help_context = _mkhelp()
-
-
-class PalettePicker(urwid.WidgetWrap):
-
- def __init__(self, master):
- self.master = master
- low, high = [], []
- for k, v in palettes.palettes.items():
- if v.high:
- high.append(k)
- else:
- low.append(k)
- high.sort()
- low.sort()
-
- options = [
- select.Heading("High Colour")
- ]
-
- def mkopt(name):
- return select.Option(
- i,
- None,
- lambda: self.master.palette == name,
- lambda: self.select(name)
- )
-
- for i in high:
- options.append(mkopt(i))
- options.append(select.Heading("Low Colour"))
- for i in low:
- options.append(mkopt(i))
-
- options.extend(
- [
- select.Heading("Options"),
- select.Option(
- "Transparent",
- "T",
- lambda: master.palette_transparent,
- self.toggle_palette_transparent
- )
- ]
- )
-
- self.lb = select.Select(options)
- title = urwid.Text("Palettes")
- title = urwid.Padding(title, align="left", width=("relative", 100))
- title = urwid.AttrWrap(title, "heading")
- self._w = urwid.Frame(
- self.lb,
- header = title
- )
- signals.update_settings.connect(self.sig_update_settings)
-
- def sig_update_settings(self, sender):
- self.lb.walker._modified()
-
- def select(self, name):
- self.master.set_palette(name)
-
- def toggle_palette_transparent(self):
- self.master.palette_transparent = not self.master.palette_transparent
- self.master.set_palette(self.master.palette)
- signals.update_settings.send(self)
diff --git a/libmproxy/console/palettes.py b/libmproxy/console/palettes.py
deleted file mode 100644
index bd370181..00000000
--- a/libmproxy/console/palettes.py
+++ /dev/null
@@ -1,326 +0,0 @@
-# Low-color themes should ONLY use the standard foreground and background
-# colours listed here:
-#
-# http://urwid.org/manual/displayattributes.html
-#
-
-
-class Palette:
- _fields = [
- 'background',
- 'title',
-
- # Status bar & heading
- 'heading', 'heading_key', 'heading_inactive',
-
- # Help
- 'key', 'head', 'text',
-
- # Options
- 'option_selected', 'option_active', 'option_active_selected',
- 'option_selected_key',
-
- # List and Connections
- 'method', 'focus',
- 'code_200', 'code_300', 'code_400', 'code_500', 'code_other',
- 'error',
- 'header', 'highlight', 'intercept', 'replay', 'mark',
-
- # Hex view
- 'offset',
-
- # Grid Editor
- 'focusfield', 'focusfield_error', 'field_error', 'editfield',
- ]
- high = None
-
- def palette(self, transparent):
- l = []
- highback, lowback = None, None
- if not transparent:
- if self.high and self.high.get("background"):
- highback = self.high["background"][1]
- lowback = self.low["background"][1]
-
- for i in self._fields:
- if transparent and i == "background":
- l.append(["background", "default", "default"])
- else:
- v = [i]
- low = list(self.low[i])
- if lowback and low[1] == "default":
- low[1] = lowback
- v.extend(low)
- if self.high and i in self.high:
- v.append(None)
- high = list(self.high[i])
- if highback and high[1] == "default":
- high[1] = highback
- v.extend(high)
- elif highback and self.low[i][1] == "default":
- high = [None, low[0], highback]
- v.extend(high)
- l.append(tuple(v))
- return l
-
-
-class LowDark(Palette):
-
- """
- Low-color dark background
- """
- low = dict(
- background = ('white', 'black'),
- title = ('white,bold', 'default'),
-
- # Status bar & heading
- heading = ('white', 'dark blue'),
- heading_key = ('light cyan', 'dark blue'),
- heading_inactive = ('dark gray', 'light gray'),
-
- # Help
- key = ('light cyan', 'default'),
- head = ('white,bold', 'default'),
- text = ('light gray', 'default'),
-
- # Options
- option_selected = ('black', 'light gray'),
- option_selected_key = ('light cyan', 'light gray'),
- option_active = ('light red', 'default'),
- option_active_selected = ('light red', 'light gray'),
-
- # List and Connections
- method = ('dark cyan', 'default'),
- focus = ('yellow', 'default'),
-
- code_200 = ('dark green', 'default'),
- code_300 = ('light blue', 'default'),
- code_400 = ('light red', 'default'),
- code_500 = ('light red', 'default'),
- code_other = ('dark red', 'default'),
-
- error = ('light red', 'default'),
-
- header = ('dark cyan', 'default'),
- highlight = ('white,bold', 'default'),
- intercept = ('brown', 'default'),
- replay = ('light green', 'default'),
- mark = ('light red', 'default'),
-
- # Hex view
- offset = ('dark cyan', 'default'),
-
- # Grid Editor
- focusfield = ('black', 'light gray'),
- focusfield_error = ('dark red', 'light gray'),
- field_error = ('dark red', 'default'),
- editfield = ('white', 'default'),
- )
-
-
-class Dark(LowDark):
- high = dict(
- heading_inactive = ('g58', 'g11'),
- intercept = ('#f60', 'default'),
-
- option_selected = ('g85', 'g45'),
- option_selected_key = ('light cyan', 'g50'),
- option_active_selected = ('light red', 'g50'),
- )
-
-
-class LowLight(Palette):
-
- """
- Low-color light background
- """
- low = dict(
- background = ('black', 'white'),
- title = ('dark magenta', 'default'),
-
- # Status bar & heading
- heading = ('white', 'black'),
- heading_key = ('dark blue', 'black'),
- heading_inactive = ('black', 'light gray'),
-
- # Help
- key = ('dark blue', 'default'),
- head = ('black', 'default'),
- text = ('dark gray', 'default'),
-
- # Options
- option_selected = ('black', 'light gray'),
- option_selected_key = ('dark blue', 'light gray'),
- option_active = ('light red', 'default'),
- option_active_selected = ('light red', 'light gray'),
-
- # List and Connections
- method = ('dark cyan', 'default'),
- focus = ('black', 'default'),
-
- code_200 = ('dark green', 'default'),
- code_300 = ('light blue', 'default'),
- code_400 = ('dark red', 'default'),
- code_500 = ('dark red', 'default'),
- code_other = ('light red', 'default'),
-
- error = ('light red', 'default'),
-
- header = ('dark blue', 'default'),
- highlight = ('black,bold', 'default'),
- intercept = ('brown', 'default'),
- replay = ('dark green', 'default'),
- mark = ('dark red', 'default'),
-
- # Hex view
- offset = ('dark blue', 'default'),
-
- # Grid Editor
- focusfield = ('black', 'light gray'),
- focusfield_error = ('dark red', 'light gray'),
- field_error = ('dark red', 'black'),
- editfield = ('black', 'default'),
- )
-
-
-class Light(LowLight):
- high = dict(
- background = ('black', 'g100'),
- heading = ('g99', '#08f'),
- heading_key = ('#0ff,bold', '#08f'),
- heading_inactive = ('g35', 'g85'),
- replay = ('#0a0,bold', 'default'),
-
- option_selected = ('black', 'g85'),
- option_selected_key = ('dark blue', 'g85'),
- option_active_selected = ('light red', 'g85'),
- )
-
-
-# Solarized palette in Urwid-style terminal high-colour offsets
-# See: http://ethanschoonover.com/solarized
-sol_base03 = "h234"
-sol_base02 = "h235"
-sol_base01 = "h240"
-sol_base00 = "h241"
-sol_base0 = "h244"
-sol_base1 = "h245"
-sol_base2 = "h254"
-sol_base3 = "h230"
-sol_yellow = "h136"
-sol_orange = "h166"
-sol_red = "h160"
-sol_magenta = "h125"
-sol_violet = "h61"
-sol_blue = "h33"
-sol_cyan = "h37"
-sol_green = "h64"
-
-
-class SolarizedLight(LowLight):
- high = dict(
- background = (sol_base00, sol_base3),
- title = (sol_cyan, 'default'),
- text = (sol_base00, 'default'),
-
- # Status bar & heading
- heading = (sol_base2, sol_base02),
- heading_key = (sol_blue, sol_base03),
- heading_inactive = (sol_base03, sol_base1),
-
- # Help
- key = (sol_blue, 'default',),
- head = (sol_base00, 'default'),
-
- # Options
- option_selected = (sol_base03, sol_base2),
- option_selected_key = (sol_blue, sol_base2),
- option_active = (sol_orange, 'default'),
- option_active_selected = (sol_orange, sol_base2),
-
- # List and Connections
- method = (sol_cyan, 'default'),
- focus = (sol_base01, 'default'),
-
- code_200 = (sol_green, 'default'),
- code_300 = (sol_blue, 'default'),
- code_400 = (sol_orange, 'default',),
- code_500 = (sol_red, 'default'),
- code_other = (sol_magenta, 'default'),
-
- error = (sol_red, 'default'),
-
- header = (sol_blue, 'default'),
- highlight = (sol_base01, 'default'),
- intercept = (sol_red, 'default',),
- replay = (sol_green, 'default',),
-
- # Hex view
- offset = (sol_cyan, 'default'),
-
- # Grid Editor
- focusfield = (sol_base00, sol_base2),
- focusfield_error = (sol_red, sol_base2),
- field_error = (sol_red, 'default'),
- editfield = (sol_base01, 'default'),
- )
-
-
-class SolarizedDark(LowDark):
- high = dict(
- background = (sol_base2, sol_base03),
- title = (sol_blue, 'default'),
- text = (sol_base1, 'default'),
-
- # Status bar & heading
- heading = (sol_base2, sol_base01),
- heading_key = (sol_blue + ",bold", sol_base01),
- heading_inactive = (sol_base1, sol_base02),
-
- # Help
- key = (sol_blue, 'default',),
- head = (sol_base2, 'default'),
-
- # Options
- option_selected = (sol_base03, sol_base00),
- option_selected_key = (sol_blue, sol_base00),
- option_active = (sol_orange, 'default'),
- option_active_selected = (sol_orange, sol_base00),
-
- # List and Connections
- method = (sol_cyan, 'default'),
- focus = (sol_base1, 'default'),
-
- code_200 = (sol_green, 'default'),
- code_300 = (sol_blue, 'default'),
- code_400 = (sol_orange, 'default',),
- code_500 = (sol_red, 'default'),
- code_other = (sol_magenta, 'default'),
-
- error = (sol_red, 'default'),
-
- header = (sol_blue, 'default'),
- highlight = (sol_base01, 'default'),
- intercept = (sol_red, 'default',),
- replay = (sol_green, 'default',),
-
- # Hex view
- offset = (sol_cyan, 'default'),
-
- # Grid Editor
- focusfield = (sol_base0, sol_base02),
- focusfield_error = (sol_red, sol_base02),
- field_error = (sol_red, 'default'),
- editfield = (sol_base1, 'default'),
- )
-
-
-DEFAULT = "dark"
-palettes = {
- "lowlight": LowLight(),
- "lowdark": LowDark(),
- "light": Light(),
- "dark": Dark(),
- "solarized_light": SolarizedLight(),
- "solarized_dark": SolarizedDark(),
-}
diff --git a/libmproxy/console/pathedit.py b/libmproxy/console/pathedit.py
deleted file mode 100644
index 4447070b..00000000
--- a/libmproxy/console/pathedit.py
+++ /dev/null
@@ -1,71 +0,0 @@
-import glob
-import os.path
-
-import urwid
-
-
-class _PathCompleter:
-
- def __init__(self, _testing=False):
- """
- _testing: disables reloading of the lookup table to make testing
- possible.
- """
- self.lookup, self.offset = None, None
- self.final = None
- self._testing = _testing
-
- def reset(self):
- self.lookup = None
- self.offset = -1
-
- def complete(self, txt):
- """
- Returns the next completion for txt, or None if there is no
- completion.
- """
- path = os.path.expanduser(txt)
- if not self.lookup:
- if not self._testing:
- # Lookup is a set of (display value, actual value) tuples.
- self.lookup = []
- if os.path.isdir(path):
- files = glob.glob(os.path.join(path, "*"))
- prefix = txt
- else:
- files = glob.glob(path + "*")
- prefix = os.path.dirname(txt)
- prefix = prefix or "./"
- for f in files:
- display = os.path.join(prefix, os.path.basename(f))
- if os.path.isdir(f):
- display += "/"
- self.lookup.append((display, f))
- if not self.lookup:
- self.final = path
- return path
- self.lookup.sort()
- self.offset = -1
- self.lookup.append((txt, txt))
- self.offset += 1
- if self.offset >= len(self.lookup):
- self.offset = 0
- ret = self.lookup[self.offset]
- self.final = ret[1]
- return ret[0]
-
-
-class PathEdit(urwid.Edit, _PathCompleter):
-
- def __init__(self, *args, **kwargs):
- urwid.Edit.__init__(self, *args, **kwargs)
- _PathCompleter.__init__(self)
-
- def keypress(self, size, key):
- if key == "tab":
- comp = self.complete(self.get_edit_text())
- self.set_edit_text(comp)
- self.set_edit_pos(len(comp))
- else:
- self.reset()
- return urwid.Edit.keypress(self, size, key)
diff --git a/libmproxy/console/searchable.py b/libmproxy/console/searchable.py
deleted file mode 100644
index cff1f0a1..00000000
--- a/libmproxy/console/searchable.py
+++ /dev/null
@@ -1,93 +0,0 @@
-import urwid
-
-from . import signals
-
-
-class Highlight(urwid.AttrMap):
-
- def __init__(self, t):
- urwid.AttrMap.__init__(
- self,
- urwid.Text(t.text),
- "focusfield",
- )
- self.backup = t
-
-
-class Searchable(urwid.ListBox):
-
- def __init__(self, state, contents):
- self.walker = urwid.SimpleFocusListWalker(contents)
- urwid.ListBox.__init__(self, self.walker)
- self.state = state
- self.search_offset = 0
- self.current_highlight = None
- self.search_term = None
-
- def keypress(self, size, key):
- if key == "/":
- signals.status_prompt.send(
- prompt = "Search for",
- text = "",
- callback = self.set_search
- )
- elif key == "n":
- self.find_next(False)
- elif key == "N":
- self.find_next(True)
- elif key == "g":
- self.set_focus(0)
- self.walker._modified()
- elif key == "G":
- self.set_focus(len(self.walker) - 1)
- self.walker._modified()
- else:
- return super(self.__class__, self).keypress(size, key)
-
- def set_search(self, text):
- self.state.last_search = text
- self.search_term = text or None
- self.find_next(False)
-
- def set_highlight(self, offset):
- if self.current_highlight is not None:
- old = self.body[self.current_highlight]
- self.body[self.current_highlight] = old.backup
- if offset is None:
- self.current_highlight = None
- else:
- self.body[offset] = Highlight(self.body[offset])
- self.current_highlight = offset
-
- def get_text(self, w):
- if isinstance(w, urwid.Text):
- return w.text
- elif isinstance(w, Highlight):
- return w.backup.text
- else:
- return None
-
- def find_next(self, backwards):
- if not self.search_term:
- if self.state.last_search:
- self.search_term = self.state.last_search
- else:
- self.set_highlight(None)
- return
- # Start search at focus + 1
- if backwards:
- rng = xrange(len(self.body) - 1, -1, -1)
- else:
- rng = xrange(1, len(self.body) + 1)
- for i in rng:
- off = (self.focus_position + i) % len(self.body)
- w = self.body[off]
- txt = self.get_text(w)
- if txt and self.search_term in txt:
- self.set_highlight(off)
- self.set_focus(off, coming_from="above")
- self.body._modified()
- return
- else:
- self.set_highlight(None)
- signals.status_message.send(message="Search not found.", expire=1)
diff --git a/libmproxy/console/select.py b/libmproxy/console/select.py
deleted file mode 100644
index 928a7ca5..00000000
--- a/libmproxy/console/select.py
+++ /dev/null
@@ -1,120 +0,0 @@
-import urwid
-
-from . import common
-
-
-class _OptionWidget(urwid.WidgetWrap):
-
- def __init__(self, option, text, shortcut, active, focus):
- self.option = option
- textattr = "text"
- keyattr = "key"
- if focus and active:
- textattr = "option_active_selected"
- keyattr = "option_selected_key"
- elif focus:
- textattr = "option_selected"
- keyattr = "option_selected_key"
- elif active:
- textattr = "option_active"
- if shortcut:
- text = common.highlight_key(
- text,
- shortcut,
- textattr = textattr,
- keyattr = keyattr
- )
- opt = urwid.Text(text, align="left")
- opt = urwid.AttrWrap(opt, textattr)
- opt = urwid.Padding(opt, align = "center", width = 40)
- urwid.WidgetWrap.__init__(self, opt)
-
- def keypress(self, size, key):
- return key
-
- def selectable(self):
- return True
-
-
-class OptionWalker(urwid.ListWalker):
-
- def __init__(self, options):
- urwid.ListWalker.__init__(self)
- self.options = options
- self.focus = 0
-
- def set_focus(self, pos):
- self.focus = pos
-
- def get_focus(self):
- return self.options[self.focus].render(True), self.focus
-
- def get_next(self, pos):
- if pos >= len(self.options) - 1:
- return None, None
- return self.options[pos + 1].render(False), pos + 1
-
- def get_prev(self, pos):
- if pos <= 0:
- return None, None
- return self.options[pos - 1].render(False), pos - 1
-
-
-class Heading:
-
- def __init__(self, text):
- self.text = text
-
- def render(self, focus):
- opt = urwid.Text("\n" + self.text, align="left")
- opt = urwid.AttrWrap(opt, "title")
- opt = urwid.Padding(opt, align = "center", width = 40)
- return opt
-
-
-_neg = lambda: False
-
-
-class Option:
-
- def __init__(self, text, shortcut, getstate=None, activate=None):
- self.text = text
- self.shortcut = shortcut
- self.getstate = getstate or _neg
- self.activate = activate or _neg
-
- def render(self, focus):
- return _OptionWidget(
- self,
- self.text,
- self.shortcut,
- self.getstate(),
- focus)
-
-
-class Select(urwid.ListBox):
-
- def __init__(self, options):
- self.walker = OptionWalker(options)
- urwid.ListBox.__init__(
- self,
- self.walker
- )
- self.options = options
- self.keymap = {}
- for i in options:
- if hasattr(i, "shortcut") and i.shortcut:
- if i.shortcut in self.keymap:
- raise ValueError("Duplicate shortcut key: %s" % i.shortcut)
- self.keymap[i.shortcut] = i
-
- def keypress(self, size, key):
- if key == "enter" or key == " ":
- self.get_focus()[0].option.activate()
- return None
- key = common.shortcuts(key)
- if key in self.keymap:
- self.keymap[key].activate()
- self.set_focus(self.options.index(self.keymap[key]))
- return None
- return super(self.__class__, self).keypress(size, key)
diff --git a/libmproxy/console/signals.py b/libmproxy/console/signals.py
deleted file mode 100644
index 6a439bf3..00000000
--- a/libmproxy/console/signals.py
+++ /dev/null
@@ -1,43 +0,0 @@
-import blinker
-
-# Show a status message in the action bar
-sig_add_event = blinker.Signal()
-
-
-def add_event(e, level):
- sig_add_event.send(
- None,
- e=e,
- level=level
- )
-
-# Show a status message in the action bar
-status_message = blinker.Signal()
-
-# Prompt for input
-status_prompt = blinker.Signal()
-
-# Prompt for a path
-status_prompt_path = blinker.Signal()
-
-# Prompt for a single keystroke
-status_prompt_onekey = blinker.Signal()
-
-# Call a callback in N seconds
-call_in = blinker.Signal()
-
-# Focus the body, footer or header of the main window
-focus = blinker.Signal()
-
-# Fired when settings change
-update_settings = blinker.Signal()
-
-# Fired when a flow changes
-flow_change = blinker.Signal()
-
-# Fired when the flow list or focus changes
-flowlist_change = blinker.Signal()
-
-# Pop and push view state onto a stack
-pop_view_state = blinker.Signal()
-push_view_state = blinker.Signal()
diff --git a/libmproxy/console/statusbar.py b/libmproxy/console/statusbar.py
deleted file mode 100644
index 4cc63a54..00000000
--- a/libmproxy/console/statusbar.py
+++ /dev/null
@@ -1,258 +0,0 @@
-import os.path
-
-import urwid
-
-import netlib.utils
-from . import pathedit, signals, common
-
-
-class ActionBar(urwid.WidgetWrap):
-
- def __init__(self):
- urwid.WidgetWrap.__init__(self, None)
- self.clear()
- signals.status_message.connect(self.sig_message)
- signals.status_prompt.connect(self.sig_prompt)
- signals.status_prompt_path.connect(self.sig_path_prompt)
- signals.status_prompt_onekey.connect(self.sig_prompt_onekey)
-
- self.last_path = ""
-
- self.prompting = False
- self.onekey = False
- self.pathprompt = False
-
- def sig_message(self, sender, message, expire=None):
- w = urwid.Text(message)
- self._w = w
- self.prompting = False
- if expire:
- def cb(*args):
- if w == self._w:
- self.clear()
- signals.call_in.send(seconds=expire, callback=cb)
-
- def prep_prompt(self, p):
- return p.strip() + ": "
-
- def sig_prompt(self, sender, prompt, text, callback, args=()):
- signals.focus.send(self, section="footer")
- self._w = urwid.Edit(self.prep_prompt(prompt), text or "")
- self.prompting = (callback, args)
-
- def sig_path_prompt(self, sender, prompt, callback, args=()):
- signals.focus.send(self, section="footer")
- self._w = pathedit.PathEdit(
- self.prep_prompt(prompt),
- os.path.dirname(self.last_path)
- )
- self.pathprompt = True
- self.prompting = (callback, args)
-
- def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
- """
- Keys are a set of (word, key) tuples. The appropriate key in the
- word is highlighted.
- """
- signals.focus.send(self, section="footer")
- prompt = [prompt, " ("]
- mkup = []
- for i, e in enumerate(keys):
- mkup.extend(common.highlight_key(e[0], e[1]))
- if i < len(keys) - 1:
- mkup.append(",")
- prompt.extend(mkup)
- prompt.append(")? ")
- self.onekey = set(i[1] for i in keys)
- self._w = urwid.Edit(prompt, "")
- self.prompting = (callback, args)
-
- def selectable(self):
- return True
-
- def keypress(self, size, k):
- if self.prompting:
- if k == "esc":
- self.prompt_done()
- elif self.onekey:
- if k == "enter":
- self.prompt_done()
- elif k in self.onekey:
- self.prompt_execute(k)
- elif k == "enter":
- self.prompt_execute(self._w.get_edit_text())
- else:
- if common.is_keypress(k):
- self._w.keypress(size, k)
- else:
- return k
-
- def clear(self):
- self._w = urwid.Text("")
- self.prompting = False
-
- def prompt_done(self):
- self.prompting = False
- self.onekey = False
- self.pathprompt = False
- signals.status_message.send(message="")
- signals.focus.send(self, section="body")
-
- def prompt_execute(self, txt):
- if self.pathprompt:
- self.last_path = txt
- p, args = self.prompting
- self.prompt_done()
- msg = p(txt, *args)
- if msg:
- signals.status_message.send(message=msg, expire=1)
-
-
-class StatusBar(urwid.WidgetWrap):
-
- def __init__(self, master, helptext):
- self.master, self.helptext = master, helptext
- self.ab = ActionBar()
- self.ib = urwid.WidgetWrap(urwid.Text(""))
- self._w = urwid.Pile([self.ib, self.ab])
- signals.update_settings.connect(self.sig_update_settings)
- signals.flowlist_change.connect(self.sig_update_settings)
- self.redraw()
-
- def sig_update_settings(self, sender):
- self.redraw()
-
- def keypress(self, *args, **kwargs):
- return self.ab.keypress(*args, **kwargs)
-
- def get_status(self):
- r = []
-
- if self.master.setheaders.count():
- r.append("[")
- r.append(("heading_key", "H"))
- r.append("eaders]")
- if self.master.replacehooks.count():
- r.append("[")
- r.append(("heading_key", "R"))
- r.append("eplacing]")
- if self.master.client_playback:
- r.append("[")
- r.append(("heading_key", "cplayback"))
- r.append(":%s to go]" % self.master.client_playback.count())
- if self.master.server_playback:
- r.append("[")
- r.append(("heading_key", "splayback"))
- if self.master.nopop:
- r.append(":%s in file]" % self.master.server_playback.count())
- else:
- r.append(":%s to go]" % self.master.server_playback.count())
- if self.master.get_ignore_filter():
- r.append("[")
- r.append(("heading_key", "I"))
- r.append("gnore:%d]" % len(self.master.get_ignore_filter()))
- if self.master.get_tcp_filter():
- r.append("[")
- r.append(("heading_key", "T"))
- r.append("CP:%d]" % len(self.master.get_tcp_filter()))
- if self.master.state.intercept_txt:
- r.append("[")
- r.append(("heading_key", "i"))
- r.append(":%s]" % self.master.state.intercept_txt)
- if self.master.state.limit_txt:
- r.append("[")
- r.append(("heading_key", "l"))
- r.append(":%s]" % self.master.state.limit_txt)
- if self.master.stickycookie_txt:
- r.append("[")
- r.append(("heading_key", "t"))
- r.append(":%s]" % self.master.stickycookie_txt)
- if self.master.stickyauth_txt:
- r.append("[")
- r.append(("heading_key", "u"))
- r.append(":%s]" % self.master.stickyauth_txt)
- if self.master.state.default_body_view.name != "Auto":
- r.append("[")
- r.append(("heading_key", "M"))
- r.append(":%s]" % self.master.state.default_body_view.name)
-
- opts = []
- if self.master.anticache:
- opts.append("anticache")
- if self.master.anticomp:
- opts.append("anticomp")
- if self.master.showhost:
- opts.append("showhost")
- if not self.master.refresh_server_playback:
- opts.append("norefresh")
- if self.master.killextra:
- opts.append("killextra")
- if self.master.server.config.no_upstream_cert:
- opts.append("no-upstream-cert")
- if self.master.state.follow_focus:
- opts.append("following")
- if self.master.stream_large_bodies:
- opts.append(
- "stream:%s" % netlib.utils.pretty_size(
- self.master.stream_large_bodies.max_size
- )
- )
-
- if opts:
- r.append("[%s]" % (":".join(opts)))
-
- if self.master.server.config.mode in ["reverse", "upstream"]:
- dst = self.master.server.config.upstream_server
- r.append("[dest:%s]" % netlib.utils.unparse_url(
- dst.scheme,
- dst.address.host,
- dst.address.port
- ))
- if self.master.scripts:
- r.append("[")
- r.append(("heading_key", "s"))
- r.append("cripts:%s]" % len(self.master.scripts))
- # r.append("[lt:%0.3f]"%self.master.looptime)
-
- if self.master.stream:
- r.append("[W:%s]" % self.master.stream_path)
-
- return r
-
- def redraw(self):
- fc = self.master.state.flow_count()
- if self.master.state.focus is None:
- offset = 0
- else:
- offset = min(self.master.state.focus + 1, fc)
- t = [
- ('heading', ("[%s/%s]" % (offset, fc)).ljust(9))
- ]
-
- if self.master.server.bound:
- host = self.master.server.address.host
- if host == "0.0.0.0":
- host = "*"
- boundaddr = "[%s:%s]" % (host, self.master.server.address.port)
- else:
- boundaddr = ""
- t.extend(self.get_status())
- status = urwid.AttrWrap(urwid.Columns([
- urwid.Text(t),
- urwid.Text(
- [
- self.helptext,
- boundaddr
- ],
- align="right"
- ),
- ]), "heading")
- self.ib._w = status
-
- def update(self, text):
- self.helptext = text
- self.redraw()
- self.master.loop.draw_screen()
-
- def selectable(self):
- return True
diff --git a/libmproxy/console/tabs.py b/libmproxy/console/tabs.py
deleted file mode 100644
index b5423038..00000000
--- a/libmproxy/console/tabs.py
+++ /dev/null
@@ -1,70 +0,0 @@
-import urwid
-
-
-class Tab(urwid.WidgetWrap):
-
- def __init__(self, offset, content, attr, onclick):
- """
- onclick is called on click with the tab offset as argument
- """
- p = urwid.Text(content, align="center")
- p = urwid.Padding(p, align="center", width=("relative", 100))
- p = urwid.AttrWrap(p, attr)
- urwid.WidgetWrap.__init__(self, p)
- self.offset = offset
- self.onclick = onclick
-
- def mouse_event(self, size, event, button, col, row, focus):
- if event == "mouse press" and button == 1:
- self.onclick(self.offset)
- return True
-
-
-class Tabs(urwid.WidgetWrap):
-
- def __init__(self, tabs, tab_offset=0):
- urwid.WidgetWrap.__init__(self, "")
- self.tab_offset = tab_offset
- self.tabs = tabs
- self.show()
-
- def change_tab(self, offset):
- self.tab_offset = offset
- self.show()
-
- def keypress(self, size, key):
- n = len(self.tabs)
- if key in ["tab", "l"]:
- self.change_tab((self.tab_offset + 1) % n)
- elif key == "h":
- self.change_tab((self.tab_offset - 1) % n)
- return self._w.keypress(size, key)
-
- def show(self):
- headers = []
- for i in range(len(self.tabs)):
- txt = self.tabs[i][0]()
- if i == self.tab_offset:
- headers.append(
- Tab(
- i,
- txt,
- "heading",
- self.change_tab
- )
- )
- else:
- headers.append(
- Tab(
- i,
- txt,
- "heading_inactive",
- self.change_tab
- )
- )
- headers = urwid.Columns(headers, dividechars=1)
- self._w = urwid.Frame(
- body = self.tabs[self.tab_offset][1](),
- header = headers
- )
- self._w.set_focus("body")
diff --git a/libmproxy/console/window.py b/libmproxy/console/window.py
deleted file mode 100644
index 47c284e4..00000000
--- a/libmproxy/console/window.py
+++ /dev/null
@@ -1,90 +0,0 @@
-import urwid
-from . import signals
-
-
-class Window(urwid.Frame):
-
- def __init__(self, master, body, header, footer, helpctx):
- urwid.Frame.__init__(
- self,
- urwid.AttrWrap(body, "background"),
- header = urwid.AttrWrap(header, "background") if header else None,
- footer = urwid.AttrWrap(footer, "background") if footer else None
- )
- self.master = master
- self.helpctx = helpctx
- signals.focus.connect(self.sig_focus)
-
- def sig_focus(self, sender, section):
- self.focus_position = section
-
- def mouse_event(self, *args, **kwargs):
- # args: (size, event, button, col, row)
- k = super(self.__class__, self).mouse_event(*args, **kwargs)
- if not k:
- if args[1] == "mouse drag":
- signals.status_message.send(
- message = "Hold down shift, alt or ctrl to select text.",
- expire = 1
- )
- elif args[1] == "mouse press" and args[2] == 4:
- self.keypress(args[0], "up")
- elif args[1] == "mouse press" and args[2] == 5:
- self.keypress(args[0], "down")
- else:
- return False
- return True
-
- def keypress(self, size, k):
- k = super(self.__class__, self).keypress(size, k)
- if k == "?":
- self.master.view_help(self.helpctx)
- elif k == "c":
- if not self.master.client_playback:
- signals.status_prompt_path.send(
- self,
- prompt = "Client replay",
- callback = self.master.client_playback_path
- )
- else:
- signals.status_prompt_onekey.send(
- self,
- prompt = "Stop current client replay?",
- keys = (
- ("yes", "y"),
- ("no", "n"),
- ),
- callback = self.master.stop_client_playback_prompt,
- )
- elif k == "i":
- signals.status_prompt.send(
- self,
- prompt = "Intercept filter",
- text = self.master.state.intercept_txt,
- callback = self.master.set_intercept
- )
- elif k == "o":
- self.master.view_options()
- elif k == "Q":
- raise urwid.ExitMainLoop
- elif k == "q":
- signals.pop_view_state.send(self)
- elif k == "S":
- if not self.master.server_playback:
- signals.status_prompt_path.send(
- self,
- prompt = "Server replay path",
- callback = self.master.server_playback_path
- )
- else:
- signals.status_prompt_onekey.send(
- self,
- prompt = "Stop current server replay?",
- keys = (
- ("yes", "y"),
- ("no", "n"),
- ),
- callback = self.master.stop_server_playback_prompt,
- )
- else:
- return k