diff options
author | Maximilian Hils <git@maximilianhils.com> | 2016-02-15 14:58:46 +0100 |
---|---|---|
committer | Maximilian Hils <git@maximilianhils.com> | 2016-02-15 14:58:46 +0100 |
commit | 33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04 (patch) | |
tree | 31914a601302579ff817504019296fd7e9e46765 /libmproxy/console | |
parent | 36f34f701991b5d474c005ec45e3b66e20f326a8 (diff) | |
download | mitmproxy-33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04.tar.gz mitmproxy-33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04.tar.bz2 mitmproxy-33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04.zip |
move mitmproxy
Diffstat (limited to 'libmproxy/console')
-rw-r--r-- | libmproxy/console/__init__.py | 744 | ||||
-rw-r--r-- | libmproxy/console/common.py | 444 | ||||
-rw-r--r-- | libmproxy/console/flowdetailview.py | 153 | ||||
-rw-r--r-- | libmproxy/console/flowlist.py | 397 | ||||
-rw-r--r-- | libmproxy/console/flowview.py | 711 | ||||
-rw-r--r-- | libmproxy/console/grideditor.py | 716 | ||||
-rw-r--r-- | libmproxy/console/help.py | 117 | ||||
-rw-r--r-- | libmproxy/console/options.py | 271 | ||||
-rw-r--r-- | libmproxy/console/palettepicker.py | 82 | ||||
-rw-r--r-- | libmproxy/console/palettes.py | 326 | ||||
-rw-r--r-- | libmproxy/console/pathedit.py | 71 | ||||
-rw-r--r-- | libmproxy/console/searchable.py | 93 | ||||
-rw-r--r-- | libmproxy/console/select.py | 120 | ||||
-rw-r--r-- | libmproxy/console/signals.py | 43 | ||||
-rw-r--r-- | libmproxy/console/statusbar.py | 258 | ||||
-rw-r--r-- | libmproxy/console/tabs.py | 70 | ||||
-rw-r--r-- | libmproxy/console/window.py | 90 |
17 files changed, 0 insertions, 4706 deletions
diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py deleted file mode 100644 index e739ec61..00000000 --- a/libmproxy/console/__init__.py +++ /dev/null @@ -1,744 +0,0 @@ -from __future__ import absolute_import - -import mailcap -import mimetypes -import tempfile -import os -import os.path -import shlex -import signal -import stat -import subprocess -import sys -import traceback -import urwid -import weakref - -from .. import controller, flow, script, contentviews -from . import flowlist, flowview, help, window, signals, options -from . import grideditor, palettes, statusbar, palettepicker - -EVENTLOG_SIZE = 500 - - -class ConsoleState(flow.State): - - def __init__(self): - flow.State.__init__(self) - self.focus = None - self.follow_focus = None - self.default_body_view = contentviews.get("Auto") - self.flowsettings = weakref.WeakKeyDictionary() - self.last_search = None - - def __setattr__(self, name, value): - self.__dict__[name] = value - signals.update_settings.send(self) - - def add_flow_setting(self, flow, key, value): - d = self.flowsettings.setdefault(flow, {}) - d[key] = value - - def get_flow_setting(self, flow, key, default=None): - d = self.flowsettings.get(flow, {}) - return d.get(key, default) - - def add_flow(self, f): - super(ConsoleState, self).add_flow(f) - if self.focus is None: - self.set_focus(0) - elif self.follow_focus: - self.set_focus(len(self.view) - 1) - self.set_flow_marked(f, False) - return f - - def update_flow(self, f): - super(ConsoleState, self).update_flow(f) - if self.focus is None: - self.set_focus(0) - return f - - def set_limit(self, limit): - ret = flow.State.set_limit(self, limit) - self.set_focus(self.focus) - return ret - - def get_focus(self): - if not self.view or self.focus is None: - return None, None - return self.view[self.focus], self.focus - - def set_focus(self, idx): - if self.view: - if idx >= len(self.view): - idx = len(self.view) - 1 - elif idx < 0: - idx = 0 - self.focus = idx - else: - self.focus = None - - def set_focus_flow(self, f): - self.set_focus(self.view.index(f)) - - def get_from_pos(self, pos): - if len(self.view) <= pos or pos < 0: - return None, None - return self.view[pos], pos - - def get_next(self, pos): - return self.get_from_pos(pos + 1) - - def get_prev(self, pos): - return self.get_from_pos(pos - 1) - - def delete_flow(self, f): - if f in self.view and self.view.index(f) <= self.focus: - self.focus -= 1 - if self.focus < 0: - self.focus = None - ret = flow.State.delete_flow(self, f) - self.set_focus(self.focus) - return ret - - def clear(self): - marked_flows = [] - for f in self.flows: - if self.flow_marked(f): - marked_flows.append(f) - - super(ConsoleState, self).clear() - - for f in marked_flows: - self.add_flow(f) - self.set_flow_marked(f, True) - - if len(self.flows.views) == 0: - self.focus = None - else: - self.focus = 0 - self.set_focus(self.focus) - - def flow_marked(self, flow): - return self.get_flow_setting(flow, "marked", False) - - def set_flow_marked(self, flow, marked): - self.add_flow_setting(flow, "marked", marked) - - -class Options(object): - attributes = [ - "app", - "app_domain", - "app_ip", - "anticache", - "anticomp", - "client_replay", - "eventlog", - "follow", - "keepserving", - "kill", - "intercept", - "limit", - "no_server", - "refresh_server_playback", - "rfile", - "scripts", - "showhost", - "replacements", - "rheaders", - "setheaders", - "server_replay", - "stickycookie", - "stickyauth", - "stream_large_bodies", - "verbosity", - "wfile", - "nopop", - "palette", - "palette_transparent", - "no_mouse" - ] - - def __init__(self, **kwargs): - for k, v in kwargs.items(): - setattr(self, k, v) - for i in self.attributes: - if not hasattr(self, i): - setattr(self, i, None) - - -class ConsoleMaster(flow.FlowMaster): - palette = [] - - def __init__(self, server, options): - flow.FlowMaster.__init__(self, server, ConsoleState()) - self.stream_path = None - self.options = options - - for i in options.replacements: - self.replacehooks.add(*i) - - for i in options.setheaders: - self.setheaders.add(*i) - - r = self.set_intercept(options.intercept) - if r: - print >> sys.stderr, "Intercept error:", r - sys.exit(1) - - if options.limit: - self.set_limit(options.limit) - - r = self.set_stickycookie(options.stickycookie) - if r: - print >> sys.stderr, "Sticky cookies error:", r - sys.exit(1) - - r = self.set_stickyauth(options.stickyauth) - if r: - print >> sys.stderr, "Sticky auth error:", r - sys.exit(1) - - self.set_stream_large_bodies(options.stream_large_bodies) - - self.refresh_server_playback = options.refresh_server_playback - self.anticache = options.anticache - self.anticomp = options.anticomp - self.killextra = options.kill - self.rheaders = options.rheaders - self.nopop = options.nopop - self.showhost = options.showhost - self.palette = options.palette - self.palette_transparent = options.palette_transparent - - self.eventlog = options.eventlog - self.eventlist = urwid.SimpleListWalker([]) - self.follow = options.follow - - if options.client_replay: - self.client_playback_path(options.client_replay) - - if options.server_replay: - self.server_playback_path(options.server_replay) - - if options.scripts: - for i in options.scripts: - err = self.load_script(i) - if err: - print >> sys.stderr, "Script load error:", err - sys.exit(1) - - if options.outfile: - err = self.start_stream_to_path( - options.outfile[0], - options.outfile[1] - ) - if err: - print >> sys.stderr, "Stream file error:", err - sys.exit(1) - - self.view_stack = [] - - if options.app: - self.start_app(self.options.app_host, self.options.app_port) - signals.call_in.connect(self.sig_call_in) - signals.pop_view_state.connect(self.sig_pop_view_state) - signals.push_view_state.connect(self.sig_push_view_state) - signals.sig_add_event.connect(self.sig_add_event) - - def __setattr__(self, name, value): - self.__dict__[name] = value - signals.update_settings.send(self) - - def load_script(self, command, use_reloader=True): - # We default to using the reloader in the console ui. - super(ConsoleMaster, self).load_script(command, use_reloader) - - def sig_add_event(self, sender, e, level): - needed = dict(error=0, info=1, debug=2).get(level, 1) - if self.options.verbosity < needed: - return - - if level == "error": - e = urwid.Text(("error", str(e))) - else: - e = urwid.Text(str(e)) - self.eventlist.append(e) - if len(self.eventlist) > EVENTLOG_SIZE: - self.eventlist.pop(0) - self.eventlist.set_focus(len(self.eventlist) - 1) - - def add_event(self, e, level): - signals.add_event(e, level) - - def sig_call_in(self, sender, seconds, callback, args=()): - def cb(*_): - return callback(*args) - self.loop.set_alarm_in(seconds, cb) - - def sig_pop_view_state(self, sender): - if len(self.view_stack) > 1: - self.view_stack.pop() - self.loop.widget = self.view_stack[-1] - else: - signals.status_prompt_onekey.send( - self, - prompt = "Quit", - keys = ( - ("yes", "y"), - ("no", "n"), - ), - callback = self.quit, - ) - - def sig_push_view_state(self, sender, window): - self.view_stack.append(window) - self.loop.widget = window - self.loop.draw_screen() - - def _run_script_method(self, method, s, f): - status, val = s.run(method, f) - if val: - if status: - signals.add_event("Method %s return: %s" % (method, val), "debug") - else: - signals.add_event( - "Method %s error: %s" % - (method, val[1]), "error") - - def run_script_once(self, command, f): - if not command: - return - signals.add_event("Running script on flow: %s" % command, "debug") - - try: - s = script.Script(command, script.ScriptContext(self)) - except script.ScriptException as v: - signals.status_message.send( - message = "Error loading script." - ) - signals.add_event("Error loading script:\n%s" % v.args[0], "error") - return - - if f.request: - self._run_script_method("request", s, f) - if f.response: - self._run_script_method("response", s, f) - if f.error: - self._run_script_method("error", s, f) - s.unload() - signals.flow_change.send(self, flow = f) - - def set_script(self, command): - if not command: - return - ret = self.load_script(command) - if ret: - signals.status_message.send(message=ret) - - def toggle_eventlog(self): - self.eventlog = not self.eventlog - signals.pop_view_state.send(self) - self.view_flowlist() - - def _readflows(self, path): - """ - Utitility function that reads a list of flows - or prints an error to the UI if that fails. - Returns - - None, if there was an error. - - a list of flows, otherwise. - """ - try: - return flow.read_flows_from_paths(path) - except flow.FlowReadError as e: - signals.status_message.send(message=e.strerror) - - def client_playback_path(self, path): - if not isinstance(path, list): - path = [path] - flows = self._readflows(path) - if flows: - self.start_client_playback(flows, False) - - def server_playback_path(self, path): - if not isinstance(path, list): - path = [path] - flows = self._readflows(path) - if flows: - self.start_server_playback( - flows, - self.killextra, self.rheaders, - False, self.nopop, - self.options.replay_ignore_params, - self.options.replay_ignore_content, - self.options.replay_ignore_payload_params, - self.options.replay_ignore_host - ) - - def spawn_editor(self, data): - fd, name = tempfile.mkstemp('', "mproxy") - os.write(fd, data) - os.close(fd) - c = os.environ.get("EDITOR") - # if no EDITOR is set, assume 'vi' - if not c: - c = "vi" - cmd = shlex.split(c) - cmd.append(name) - self.ui.stop() - try: - subprocess.call(cmd) - except: - signals.status_message.send( - message = "Can't start editor: %s" % " ".join(c) - ) - else: - data = open(name, "rb").read() - self.ui.start() - os.unlink(name) - return data - - def spawn_external_viewer(self, data, contenttype): - if contenttype: - contenttype = contenttype.split(";")[0] - ext = mimetypes.guess_extension(contenttype) or "" - else: - ext = "" - fd, name = tempfile.mkstemp(ext, "mproxy") - os.write(fd, data) - os.close(fd) - - # read-only to remind the user that this is a view function - os.chmod(name, stat.S_IREAD) - - cmd = None - shell = False - - if contenttype: - c = mailcap.getcaps() - cmd, _ = mailcap.findmatch(c, contenttype, filename=name) - if cmd: - shell = True - if not cmd: - # hm which one should get priority? - c = os.environ.get("PAGER") or os.environ.get("EDITOR") - if not c: - c = "less" - cmd = shlex.split(c) - cmd.append(name) - self.ui.stop() - try: - subprocess.call(cmd, shell=shell) - except: - signals.status_message.send( - message="Can't start external viewer: %s" % " ".join(c) - ) - self.ui.start() - os.unlink(name) - - def set_palette(self, name): - self.palette = name - self.ui.register_palette( - palettes.palettes[name].palette(self.palette_transparent) - ) - self.ui.clear() - - def ticker(self, *userdata): - changed = self.tick(self.masterq, timeout=0) - if changed: - self.loop.draw_screen() - signals.update_settings.send() - self.loop.set_alarm_in(0.01, self.ticker) - - def run(self): - self.ui = urwid.raw_display.Screen() - self.ui.set_terminal_properties(256) - self.set_palette(self.palette) - self.loop = urwid.MainLoop( - urwid.SolidFill("x"), - screen = self.ui, - handle_mouse = not self.options.no_mouse, - ) - - self.server.start_slave( - controller.Slave, - controller.Channel(self.masterq, self.should_exit) - ) - - if self.options.rfile: - ret = self.load_flows_path(self.options.rfile) - if ret and self.state.flow_count(): - signals.add_event( - "File truncated or corrupted. " - "Loaded as many flows as possible.", - "error" - ) - elif ret and not self.state.flow_count(): - self.shutdown() - print >> sys.stderr, "Could not load file:", ret - sys.exit(1) - - self.loop.set_alarm_in(0.01, self.ticker) - - # It's not clear why we need to handle this explicitly - without this, - # mitmproxy hangs on keyboard interrupt. Remove if we ever figure it - # out. - def exit(s, f): - raise urwid.ExitMainLoop - signal.signal(signal.SIGINT, exit) - - self.loop.set_alarm_in( - 0.0001, - lambda *args: self.view_flowlist() - ) - - try: - self.loop.run() - except Exception: - self.loop.stop() - sys.stdout.flush() - print >> sys.stderr, traceback.format_exc() - print >> sys.stderr, "mitmproxy has crashed!" - print >> sys.stderr, "Please lodge a bug report at:" - print >> sys.stderr, "\thttps://github.com/mitmproxy/mitmproxy" - print >> sys.stderr, "Shutting down..." - sys.stderr.flush() - self.shutdown() - - def view_help(self, helpctx): - signals.push_view_state.send( - self, - window = window.Window( - self, - help.HelpView(helpctx), - None, - statusbar.StatusBar(self, help.footer), - None - ) - ) - - def view_options(self): - for i in self.view_stack: - if isinstance(i["body"], options.Options): - return - signals.push_view_state.send( - self, - window = window.Window( - self, - options.Options(self), - None, - statusbar.StatusBar(self, options.footer), - options.help_context, - ) - ) - - def view_palette_picker(self): - signals.push_view_state.send( - self, - window = window.Window( - self, - palettepicker.PalettePicker(self), - None, - statusbar.StatusBar(self, palettepicker.footer), - palettepicker.help_context, - ) - ) - - def view_grideditor(self, ge): - signals.push_view_state.send( - self, - window = window.Window( - self, - ge, - None, - statusbar.StatusBar(self, grideditor.FOOTER), - ge.make_help() - ) - ) - - def view_flowlist(self): - if self.ui.started: - self.ui.clear() - if self.state.follow_focus: - self.state.set_focus(self.state.flow_count()) - - if self.eventlog: - body = flowlist.BodyPile(self) - else: - body = flowlist.FlowListBox(self) - - if self.follow: - self.toggle_follow_flows() - - signals.push_view_state.send( - self, - window = window.Window( - self, - body, - None, - statusbar.StatusBar(self, flowlist.footer), - flowlist.help_context - ) - ) - - def view_flow(self, flow, tab_offset=0): - self.state.set_focus_flow(flow) - signals.push_view_state.send( - self, - window = window.Window( - self, - flowview.FlowView(self, self.state, flow, tab_offset), - flowview.FlowViewHeader(self, flow), - statusbar.StatusBar(self, flowview.footer), - flowview.help_context - ) - ) - - def _write_flows(self, path, flows): - if not path: - return - path = os.path.expanduser(path) - try: - f = file(path, "wb") - fw = flow.FlowWriter(f) - for i in flows: - fw.add(i) - f.close() - except IOError as v: - signals.status_message.send(message=v.strerror) - - def save_one_flow(self, path, flow): - return self._write_flows(path, [flow]) - - def save_flows(self, path): - return self._write_flows(path, self.state.view) - - def save_marked_flows(self, path): - marked_flows = [] - for f in self.state.view: - if self.state.flow_marked(f): - marked_flows.append(f) - return self._write_flows(path, marked_flows) - - def load_flows_callback(self, path): - if not path: - return - ret = self.load_flows_path(path) - return ret or "Flows loaded from %s" % path - - def load_flows_path(self, path): - reterr = None - try: - flow.FlowMaster.load_flows_file(self, path) - except flow.FlowReadError as v: - reterr = str(v) - signals.flowlist_change.send(self) - return reterr - - def accept_all(self): - self.state.accept_all(self) - - def set_limit(self, txt): - v = self.state.set_limit(txt) - signals.flowlist_change.send(self) - return v - - def set_intercept(self, txt): - return self.state.set_intercept(txt) - - def change_default_display_mode(self, t): - v = contentviews.get_by_shortcut(t) - self.state.default_body_view = v - self.refresh_focus() - - def edit_scripts(self, scripts): - commands = [x[0] for x in scripts] # remove outer array - if commands == [s.command for s in self.scripts]: - return - - self.unload_scripts() - for command in commands: - self.load_script(command) - signals.update_settings.send(self) - - def stop_client_playback_prompt(self, a): - if a != "n": - self.stop_client_playback() - - def stop_server_playback_prompt(self, a): - if a != "n": - self.stop_server_playback() - - def quit(self, a): - if a != "n": - raise urwid.ExitMainLoop - - def shutdown(self): - self.state.killall(self) - flow.FlowMaster.shutdown(self) - - def clear_flows(self): - self.state.clear() - signals.flowlist_change.send(self) - - def toggle_follow_flows(self): - # toggle flow follow - self.state.follow_focus = not self.state.follow_focus - # jump to most recent flow if follow is now on - if self.state.follow_focus: - self.state.set_focus(self.state.flow_count()) - signals.flowlist_change.send(self) - - def delete_flow(self, f): - self.state.delete_flow(f) - signals.flowlist_change.send(self) - - def refresh_focus(self): - if self.state.view: - signals.flow_change.send( - self, - flow = self.state.view[self.state.focus] - ) - - def process_flow(self, f): - if self.state.intercept and f.match( - self.state.intercept) and not f.request.is_replay: - f.intercept(self) - else: - # check if flow was intercepted within an inline script by flow.intercept() - if f.intercepted: - f.intercept(self) - else: - f.reply() - signals.flowlist_change.send(self) - signals.flow_change.send(self, flow = f) - - def clear_events(self): - self.eventlist[:] = [] - - # Handlers - def handle_error(self, f): - f = flow.FlowMaster.handle_error(self, f) - if f: - self.process_flow(f) - return f - - def handle_request(self, f): - f = flow.FlowMaster.handle_request(self, f) - if f: - self.process_flow(f) - return f - - def handle_response(self, f): - f = flow.FlowMaster.handle_response(self, f) - if f: - self.process_flow(f) - return f - - def handle_script_change(self, script): - if super(ConsoleMaster, self).handle_script_change(script): - signals.status_message.send(message='"{}" reloaded.'.format(script.filename)) - else: - signals.status_message.send(message='Error reloading "{}".'.format(script.filename)) diff --git a/libmproxy/console/common.py b/libmproxy/console/common.py deleted file mode 100644 index c29ffddc..00000000 --- a/libmproxy/console/common.py +++ /dev/null @@ -1,444 +0,0 @@ -from __future__ import absolute_import - -import urwid -import urwid.util -import os - -from netlib.http import CONTENT_MISSING -import netlib.utils - -from .. import utils -from .. import flow_export -from ..models import decoded -from . import signals - - -try: - import pyperclip -except: - pyperclip = False - - -VIEW_FLOW_REQUEST = 0 -VIEW_FLOW_RESPONSE = 1 - -METHOD_OPTIONS = [ - ("get", "g"), - ("post", "p"), - ("put", "u"), - ("head", "h"), - ("trace", "t"), - ("delete", "d"), - ("options", "o"), - ("edit raw", "e"), -] - - -def is_keypress(k): - """ - Is this input event a keypress? - """ - if isinstance(k, basestring): - return True - - -def highlight_key(str, key, textattr="text", keyattr="key"): - l = [] - parts = str.split(key, 1) - if parts[0]: - l.append((textattr, parts[0])) - l.append((keyattr, key)) - if parts[1]: - l.append((textattr, parts[1])) - return l - - -KEY_MAX = 30 - - -def format_keyvals(lst, key="key", val="text", indent=0): - """ - Format a list of (key, value) tuples. - - If key is None, it's treated specially: - - We assume a sub-value, and add an extra indent. - - The value is treated as a pre-formatted list of directives. - """ - ret = [] - if lst: - maxk = min(max(len(i[0]) for i in lst if i and i[0]), KEY_MAX) - for i, kv in enumerate(lst): - if kv is None: - ret.append(urwid.Text("")) - else: - if isinstance(kv[1], urwid.Widget): - v = kv[1] - elif kv[1] is None: - v = urwid.Text("") - else: - v = urwid.Text([(val, kv[1])]) - ret.append( - urwid.Columns( - [ - ("fixed", indent, urwid.Text("")), - ( - "fixed", - maxk, - urwid.Text([(key, kv[0] or "")]) - ), - v - ], - dividechars = 2 - ) - ) - return ret - - -def shortcuts(k): - if k == " ": - k = "page down" - elif k == "ctrl f": - k = "page down" - elif k == "ctrl b": - k = "page up" - elif k == "j": - k = "down" - elif k == "k": - k = "up" - return k - - -def fcol(s, attr): - s = unicode(s) - return ( - "fixed", - len(s), - urwid.Text( - [ - (attr, s) - ] - ) - ) - -if urwid.util.detected_encoding: - SYMBOL_REPLAY = u"\u21ba" - SYMBOL_RETURN = u"\u2190" - SYMBOL_MARK = u"\u25cf" -else: - SYMBOL_REPLAY = u"[r]" - SYMBOL_RETURN = u"<-" - SYMBOL_MARK = "[m]" - - -def raw_format_flow(f, focus, extended): - f = dict(f) - pile = [] - req = [] - if extended: - req.append( - fcol( - utils.format_timestamp(f["req_timestamp"]), - "highlight" - ) - ) - else: - req.append(fcol(">>" if focus else " ", "focus")) - - if f["marked"]: - req.append(fcol(SYMBOL_MARK, "mark")) - - if f["req_is_replay"]: - req.append(fcol(SYMBOL_REPLAY, "replay")) - req.append(fcol(f["req_method"], "method")) - - preamble = sum(i[1] for i in req) + len(req) - 1 - - if f["intercepted"] and not f["acked"]: - uc = "intercept" - elif f["resp_code"] or f["err_msg"]: - uc = "text" - else: - uc = "title" - - url = f["req_url"] - if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"): - url += " " + f["req_http_version"] - req.append( - urwid.Text([(uc, url)]) - ) - - pile.append(urwid.Columns(req, dividechars=1)) - - resp = [] - resp.append( - ("fixed", preamble, urwid.Text("")) - ) - - if f["resp_code"]: - codes = { - 2: "code_200", - 3: "code_300", - 4: "code_400", - 5: "code_500", - } - ccol = codes.get(f["resp_code"] / 100, "code_other") - resp.append(fcol(SYMBOL_RETURN, ccol)) - if f["resp_is_replay"]: - resp.append(fcol(SYMBOL_REPLAY, "replay")) - resp.append(fcol(f["resp_code"], ccol)) - if f["intercepted"] and f["resp_code"] and not f["acked"]: - rc = "intercept" - else: - rc = "text" - - if f["resp_ctype"]: - resp.append(fcol(f["resp_ctype"], rc)) - resp.append(fcol(f["resp_clen"], rc)) - resp.append(fcol(f["roundtrip"], rc)) - - elif f["err_msg"]: - resp.append(fcol(SYMBOL_RETURN, "error")) - resp.append( - urwid.Text([ - ( - "error", - f["err_msg"] - ) - ]) - ) - pile.append(urwid.Columns(resp, dividechars=1)) - return urwid.Pile(pile) - - -# Save file to disk -def save_data(path, data): - if not path: - return - try: - with file(path, "wb") as f: - f.write(data) - except IOError as v: - signals.status_message.send(message=v.strerror) - - -def ask_save_overwrite(path, data): - if not path: - return - path = os.path.expanduser(path) - if os.path.exists(path): - def save_overwrite(k): - if k == "y": - save_data(path, data) - - signals.status_prompt_onekey.send( - prompt = "'" + path + "' already exists. Overwrite?", - keys = ( - ("yes", "y"), - ("no", "n"), - ), - callback = save_overwrite - ) - else: - save_data(path, data) - - -def ask_save_path(prompt, data): - signals.status_prompt_path.send( - prompt = prompt, - callback = ask_save_overwrite, - args = (data, ) - ) - - -def copy_flow_format_data(part, scope, flow): - if part == "u": - data = flow.request.url - else: - data = "" - if scope in ("q", "a"): - if flow.request.content is None or flow.request.content == CONTENT_MISSING: - return None, "Request content is missing" - with decoded(flow.request): - if part == "h": - data += netlib.http.http1.assemble_request(flow.request) - elif part == "c": - data += flow.request.content - else: - raise ValueError("Unknown part: {}".format(part)) - if scope == "a" and flow.request.content and flow.response: - # Add padding between request and response - data += "\r\n" * 2 - if scope in ("s", "a") and flow.response: - if flow.response.content is None or flow.response.content == CONTENT_MISSING: - return None, "Response content is missing" - with decoded(flow.response): - if part == "h": - data += netlib.http.http1.assemble_response(flow.response) - elif part == "c": - data += flow.response.content - else: - raise ValueError("Unknown part: {}".format(part)) - return data, False - - -def export_prompt(k, flow): - exporters = { - "c": flow_export.curl_command, - "p": flow_export.python_code, - "r": flow_export.raw_request, - } - if k in exporters: - copy_to_clipboard_or_prompt(exporters[k](flow)) - - -def copy_to_clipboard_or_prompt(data): - # pyperclip calls encode('utf-8') on data to be copied without checking. - # if data are already encoded that way UnicodeDecodeError is thrown. - toclip = "" - try: - toclip = data.decode('utf-8') - except (UnicodeDecodeError): - toclip = data - - try: - pyperclip.copy(toclip) - except (RuntimeError, UnicodeDecodeError, AttributeError): - def save(k): - if k == "y": - ask_save_path("Save data", data) - signals.status_prompt_onekey.send( - prompt = "Cannot copy data to clipboard. Save as file?", - keys = ( - ("yes", "y"), - ("no", "n"), - ), - callback = save - ) - - -def copy_flow(part, scope, flow, master, state): - """ - part: _c_ontent, _h_eaders+content, _u_rl - scope: _a_ll, re_q_uest, re_s_ponse - """ - data, err = copy_flow_format_data(part, scope, flow) - - if err: - signals.status_message.send(message=err) - return - - if not data: - if scope == "q": - signals.status_message.send(message="No request content to copy.") - elif scope == "s": - signals.status_message.send(message="No response content to copy.") - else: - signals.status_message.send(message="No contents to copy.") - return - - copy_to_clipboard_or_prompt(data) - - -def ask_copy_part(scope, flow, master, state): - choices = [ - ("content", "c"), - ("headers+content", "h") - ] - if scope != "s": - choices.append(("url", "u")) - - signals.status_prompt_onekey.send( - prompt = "Copy", - keys = choices, - callback = copy_flow, - args = (scope, flow, master, state) - ) - - -def ask_save_body(part, master, state, flow): - """ - Save either the request or the response body to disk. part can either be - "q" (request), "s" (response) or None (ask user if necessary). - """ - - request_has_content = flow.request and flow.request.content - response_has_content = flow.response and flow.response.content - - if part is None: - # We first need to determine whether we want to save the request or the - # response content. - if request_has_content and response_has_content: - signals.status_prompt_onekey.send( - prompt = "Save", - keys = ( - ("request", "q"), - ("response", "s"), - ), - callback = ask_save_body, - args = (master, state, flow) - ) - elif response_has_content: - ask_save_body("s", master, state, flow) - else: - ask_save_body("q", master, state, flow) - - elif part == "q" and request_has_content: - ask_save_path( - "Save request content", - flow.request.get_decoded_content() - ) - elif part == "s" and response_has_content: - ask_save_path( - "Save response content", - flow.response.get_decoded_content() - ) - else: - signals.status_message.send(message="No content to save.") - - -flowcache = utils.LRUCache(800) - - -def format_flow(f, focus, extended=False, hostheader=False, marked=False): - d = dict( - intercepted = f.intercepted, - acked = f.reply.acked, - - req_timestamp = f.request.timestamp_start, - req_is_replay = f.request.is_replay, - req_method = f.request.method, - req_url = f.request.pretty_url if hostheader else f.request.url, - req_http_version = f.request.http_version, - - err_msg = f.error.msg if f.error else None, - resp_code = f.response.status_code if f.response else None, - - marked = marked, - ) - if f.response: - if f.response.content: - contentdesc = netlib.utils.pretty_size(len(f.response.content)) - elif f.response.content == CONTENT_MISSING: - contentdesc = "[content missing]" - else: - contentdesc = "[no content]" - duration = 0 - if f.response.timestamp_end and f.request.timestamp_start: - duration = f.response.timestamp_end - f.request.timestamp_start - roundtrip = utils.pretty_duration(duration) - - d.update(dict( - resp_code = f.response.status_code, - resp_is_replay = f.response.is_replay, - resp_clen = contentdesc, - roundtrip = roundtrip, - )) - t = f.response.headers.get("content-type") - if t: - d["resp_ctype"] = t.split(";")[0] - else: - d["resp_ctype"] = "" - return flowcache.get( - raw_format_flow, - tuple(sorted(d.items())), focus, extended - ) diff --git a/libmproxy/console/flowdetailview.py b/libmproxy/console/flowdetailview.py deleted file mode 100644 index f4b4262e..00000000 --- a/libmproxy/console/flowdetailview.py +++ /dev/null @@ -1,153 +0,0 @@ -from __future__ import absolute_import -import urwid -from . import common, searchable -from .. import utils - - -def maybe_timestamp(base, attr): - if base and getattr(base, attr): - return utils.format_timestamp_with_milli(getattr(base, attr)) - else: - return "active" - - -def flowdetails(state, flow): - text = [] - - cc = flow.client_conn - sc = flow.server_conn - req = flow.request - resp = flow.response - - if sc is not None: - text.append(urwid.Text([("head", "Server Connection:")])) - parts = [ - ["Address", "%s:%s" % sc.address()], - ] - - text.extend( - common.format_keyvals(parts, key="key", val="text", indent=4) - ) - - c = sc.cert - if c: - text.append(urwid.Text([("head", "Server Certificate:")])) - parts = [ - ["Type", "%s, %s bits" % c.keyinfo], - ["SHA1 digest", c.digest("sha1")], - ["Valid to", str(c.notafter)], - ["Valid from", str(c.notbefore)], - ["Serial", str(c.serial)], - [ - "Subject", - urwid.BoxAdapter( - urwid.ListBox( - common.format_keyvals( - c.subject, - key="highlight", - val="text" - ) - ), - len(c.subject) - ) - ], - [ - "Issuer", - urwid.BoxAdapter( - urwid.ListBox( - common.format_keyvals( - c.issuer, key="highlight", val="text" - ) - ), - len(c.issuer) - ) - ] - ] - - if c.altnames: - parts.append( - [ - "Alt names", - ", ".join(c.altnames) - ] - ) - text.extend( - common.format_keyvals(parts, key="key", val="text", indent=4) - ) - - if cc is not None: - text.append(urwid.Text([("head", "Client Connection:")])) - - parts = [ - ["Address", "%s:%s" % cc.address()], - # ["Requests", "%s"%cc.requestcount], - ] - - text.extend( - common.format_keyvals(parts, key="key", val="text", indent=4) - ) - - parts = [] - - parts.append( - [ - "Client conn. established", - maybe_timestamp(cc, "timestamp_start") - ] - ) - parts.append( - [ - "Server conn. initiated", - maybe_timestamp(sc, "timestamp_start") - ] - ) - parts.append( - [ - "Server conn. TCP handshake", - maybe_timestamp(sc, "timestamp_tcp_setup") - ] - ) - if sc.ssl_established: - parts.append( - [ - "Server conn. SSL handshake", - maybe_timestamp(sc, "timestamp_ssl_setup") - ] - ) - parts.append( - [ - "Client conn. SSL handshake", - maybe_timestamp(cc, "timestamp_ssl_setup") - ] - ) - parts.append( - [ - "First request byte", - maybe_timestamp(req, "timestamp_start") - ] - ) - parts.append( - [ - "Request complete", - maybe_timestamp(req, "timestamp_end") - ] - ) - parts.append( - [ - "First response byte", - maybe_timestamp(resp, "timestamp_start") - ] - ) - parts.append( - [ - "Response complete", - maybe_timestamp(resp, "timestamp_end") - ] - ) - - # sort operations by timestamp - parts = sorted(parts, key=lambda p: p[1]) - - text.append(urwid.Text([("head", "Timing:")])) - text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) - return searchable.Searchable(state, text) diff --git a/libmproxy/console/flowlist.py b/libmproxy/console/flowlist.py deleted file mode 100644 index c2201055..00000000 --- a/libmproxy/console/flowlist.py +++ /dev/null @@ -1,397 +0,0 @@ -from __future__ import absolute_import -import urwid - -import netlib.utils - -from . import common, signals - - -def _mkhelp(): - text = [] - keys = [ - ("A", "accept all intercepted flows"), - ("a", "accept this intercepted flow"), - ("b", "save request/response body"), - ("C", "clear flow list or eventlog"), - ("d", "delete flow"), - ("D", "duplicate flow"), - ("E", "export"), - ("e", "toggle eventlog"), - ("F", "toggle follow flow list"), - ("l", "set limit filter pattern"), - ("L", "load saved flows"), - ("m", "toggle flow mark"), - ("n", "create a new request"), - ("P", "copy flow to clipboard"), - ("r", "replay request"), - ("U", "unmark all marked flows"), - ("V", "revert changes to request"), - ("w", "save flows "), - ("W", "stream flows to file"), - ("X", "kill and delete flow, even if it's mid-intercept"), - ("tab", "tab between eventlog and flow list"), - ("enter", "view flow"), - ("|", "run script on this flow"), - ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) - return text -help_context = _mkhelp() - -footer = [ - ('heading_key', "?"), ":help ", -] - - -class EventListBox(urwid.ListBox): - - def __init__(self, master): - self.master = master - urwid.ListBox.__init__(self, master.eventlist) - - def keypress(self, size, key): - key = common.shortcuts(key) - if key == "C": - self.master.clear_events() - key = None - elif key == "G": - self.set_focus(len(self.master.eventlist) - 1) - elif key == "g": - self.set_focus(0) - return urwid.ListBox.keypress(self, size, key) - - -class BodyPile(urwid.Pile): - - def __init__(self, master): - h = urwid.Text("Event log") - h = urwid.Padding(h, align="left", width=("relative", 100)) - - self.inactive_header = urwid.AttrWrap(h, "heading_inactive") - self.active_header = urwid.AttrWrap(h, "heading") - - urwid.Pile.__init__( - self, - [ - FlowListBox(master), - urwid.Frame( - EventListBox(master), - header = self.inactive_header - ) - ] - ) - self.master = master - - def keypress(self, size, key): - if key == "tab": - self.focus_position = ( - self.focus_position + 1) % len(self.widget_list) - if self.focus_position == 1: - self.widget_list[1].header = self.active_header - else: - self.widget_list[1].header = self.inactive_header - key = None - elif key == "e": - self.master.toggle_eventlog() - key = None - - # This is essentially a copypasta from urwid.Pile's keypress handler. - # So much for "closed for modification, but open for extension". - item_rows = None - if len(size) == 2: - item_rows = self.get_item_rows(size, focus = True) - i = self.widget_list.index(self.focus_item) - tsize = self.get_item_size(size, i, True, item_rows) - return self.focus_item.keypress(tsize, key) - - -class ConnectionItem(urwid.WidgetWrap): - - def __init__(self, master, state, flow, focus): - self.master, self.state, self.flow = master, state, flow - self.f = focus - w = self.get_text() - urwid.WidgetWrap.__init__(self, w) - - def get_text(self): - return common.format_flow( - self.flow, - self.f, - hostheader = self.master.showhost, - marked=self.state.flow_marked(self.flow) - ) - - def selectable(self): - return True - - def save_flows_prompt(self, k): - if k == "a": - signals.status_prompt_path.send( - prompt = "Save all flows to", - callback = self.master.save_flows - ) - elif k == "m": - signals.status_prompt_path.send( - prompt = "Save marked flows to", - callback = self.master.save_marked_flows - ) - else: - signals.status_prompt_path.send( - prompt = "Save this flow to", - callback = self.master.save_one_flow, - args = (self.flow,) - ) - - def stop_server_playback_prompt(self, a): - if a != "n": - self.master.stop_server_playback() - - def server_replay_prompt(self, k): - if k == "a": - self.master.start_server_playback( - [i.copy() for i in self.master.state.view], - self.master.killextra, self.master.rheaders, - False, self.master.nopop, - self.master.options.replay_ignore_params, - self.master.options.replay_ignore_content, - self.master.options.replay_ignore_payload_params, - self.master.options.replay_ignore_host - ) - elif k == "t": - self.master.start_server_playback( - [self.flow.copy()], - self.master.killextra, self.master.rheaders, - False, self.master.nopop, - self.master.options.replay_ignore_params, - self.master.options.replay_ignore_content, - self.master.options.replay_ignore_payload_params, - self.master.options.replay_ignore_host - ) - else: - signals.status_prompt_path.send( - prompt = "Server replay path", - callback = self.master.server_playback_path - ) - - def mouse_event(self, size, event, button, col, row, focus): - if event == "mouse press" and button == 1: - if self.flow.request: - self.master.view_flow(self.flow) - return True - - def keypress(self, xxx_todo_changeme, key): - (maxcol,) = xxx_todo_changeme - key = common.shortcuts(key) - if key == "a": - self.flow.accept_intercept(self.master) - signals.flowlist_change.send(self) - elif key == "d": - self.flow.kill(self.master) - self.state.delete_flow(self.flow) - signals.flowlist_change.send(self) - elif key == "D": - f = self.master.duplicate_flow(self.flow) - self.master.view_flow(f) - elif key == "m": - if self.state.flow_marked(self.flow): - self.state.set_flow_marked(self.flow, False) - else: - self.state.set_flow_marked(self.flow, True) - signals.flowlist_change.send(self) - elif key == "r": - r = self.master.replay_request(self.flow) - if r: - signals.status_message.send(message=r) - signals.flowlist_change.send(self) - elif key == "S": - if not self.master.server_playback: - signals.status_prompt_onekey.send( - prompt = "Server Replay", - keys = ( - ("all flows", "a"), - ("this flow", "t"), - ("file", "f"), - ), - callback = self.server_replay_prompt, - ) - else: - signals.status_prompt_onekey.send( - prompt = "Stop current server replay?", - keys = ( - ("yes", "y"), - ("no", "n"), - ), - callback = self.stop_server_playback_prompt, - ) - elif key == "U": - for f in self.state.flows: - self.state.set_flow_marked(f, False) - signals.flowlist_change.send(self) - elif key == "V": - if not self.flow.modified(): - signals.status_message.send(message="Flow not modified.") - return - self.state.revert(self.flow) - signals.flowlist_change.send(self) - signals.status_message.send(message="Reverted.") - elif key == "w": - signals.status_prompt_onekey.send( - self, - prompt = "Save", - keys = ( - ("all flows", "a"), - ("this flow", "t"), - ("marked flows", "m"), - ), - callback = self.save_flows_prompt, - ) - elif key == "X": - self.flow.kill(self.master) - elif key == "enter": - if self.flow.request: - self.master.view_flow(self.flow) - elif key == "|": - signals.status_prompt_path.send( - prompt = "Send flow to script", - callback = self.master.run_script_once, - args = (self.flow,) - ) - elif key == "P": - common.ask_copy_part("a", self.flow, self.master, self.state) - elif key == "E": - signals.status_prompt_onekey.send( - self, - prompt = "Export", - keys = ( - ("as curl command", "c"), - ("as python code", "p"), - ("as raw request", "r"), - ), - callback = common.export_prompt, - args = (self.flow,) - ) - elif key == "b": - common.ask_save_body(None, self.master, self.state, self.flow) - else: - return key - - -class FlowListWalker(urwid.ListWalker): - - def __init__(self, master, state): - self.master, self.state = master, state - signals.flowlist_change.connect(self.sig_flowlist_change) - - def sig_flowlist_change(self, sender): - self._modified() - - def get_focus(self): - f, i = self.state.get_focus() - f = ConnectionItem(self.master, self.state, f, True) if f else None - return f, i - - def set_focus(self, focus): - ret = self.state.set_focus(focus) - return ret - - def get_next(self, pos): - f, i = self.state.get_next(pos) - f = ConnectionItem(self.master, self.state, f, False) if f else None - return f, i - - def get_prev(self, pos): - f, i = self.state.get_prev(pos) - f = ConnectionItem(self.master, self.state, f, False) if f else None - return f, i - - -class FlowListBox(urwid.ListBox): - - def __init__(self, master): - self.master = master - urwid.ListBox.__init__( - self, - FlowListWalker(master, master.state) - ) - - def get_method_raw(self, k): - if k: - self.get_url(k) - - def get_method(self, k): - if k == "e": - signals.status_prompt.send( - self, - prompt = "Method", - text = "", - callback = self.get_method_raw - ) - else: - method = "" - for i in common.METHOD_OPTIONS: - if i[1] == k: - method = i[0].upper() - self.get_url(method) - - def get_url(self, method): - signals.status_prompt.send( - prompt = "URL", - text = "http://www.example.com/", - callback = self.new_request, - args = (method,) - ) - - def new_request(self, url, method): - parts = netlib.utils.parse_url(str(url)) - if not parts: - signals.status_message.send(message="Invalid Url") - return - scheme, host, port, path = parts - f = self.master.create_request(method, scheme, host, port, path) - self.master.view_flow(f) - - def keypress(self, size, key): - key = common.shortcuts(key) - if key == "A": - self.master.accept_all() - signals.flowlist_change.send(self) - elif key == "C": - self.master.clear_flows() - elif key == "e": - self.master.toggle_eventlog() - elif key == "g": - self.master.state.set_focus(0) - signals.flowlist_change.send(self) - elif key == "G": - self.master.state.set_focus(self.master.state.flow_count()) - signals.flowlist_change.send(self) - elif key == "l": - signals.status_prompt.send( - prompt = "Limit", - text = self.master.state.limit_txt, - callback = self.master.set_limit - ) - elif key == "L": - signals.status_prompt_path.send( - self, - prompt = "Load flows", - callback = self.master.load_flows_callback - ) - elif key == "n": - signals.status_prompt_onekey.send( - prompt = "Method", - keys = common.METHOD_OPTIONS, - callback = self.get_method - ) - elif key == "F": - self.master.toggle_follow_flows() - elif key == "W": - if self.master.stream: - self.master.stop_stream() - else: - signals.status_prompt_path.send( - self, - prompt = "Stream flows to", - callback = self.master.start_stream_to_path - ) - else: - return urwid.ListBox.keypress(self, size, key) diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py deleted file mode 100644 index d2b98b68..00000000 --- a/libmproxy/console/flowview.py +++ /dev/null @@ -1,711 +0,0 @@ -from __future__ import absolute_import, division -import os -import traceback -import sys - -import math -import urwid - -from netlib import odict -from netlib.http import CONTENT_MISSING, Headers -from . import common, grideditor, signals, searchable, tabs -from . import flowdetailview -from .. import utils, controller, contentviews -from ..models import HTTPRequest, HTTPResponse, decoded -from ..exceptions import ContentViewException - - -class SearchError(Exception): - pass - - -def _mkhelp(): - text = [] - keys = [ - ("A", "accept all intercepted flows"), - ("a", "accept this intercepted flow"), - ("b", "save request/response body"), - ("D", "duplicate flow"), - ("d", "delete flow"), - ("E", "export"), - ("e", "edit request/response"), - ("f", "load full body data"), - ("m", "change body display mode for this entity"), - (None, - common.highlight_key("automatic", "a") + - [("text", ": automatic detection")] - ), - (None, - common.highlight_key("hex", "e") + - [("text", ": Hex")] - ), - (None, - common.highlight_key("html", "h") + - [("text", ": HTML")] - ), - (None, - common.highlight_key("image", "i") + - [("text", ": Image")] - ), - (None, - common.highlight_key("javascript", "j") + - [("text", ": JavaScript")] - ), - (None, - common.highlight_key("json", "s") + - [("text", ": JSON")] - ), - (None, - common.highlight_key("urlencoded", "u") + - [("text", ": URL-encoded data")] - ), - (None, - common.highlight_key("raw", "r") + - [("text", ": raw data")] - ), - (None, - common.highlight_key("xml", "x") + - [("text", ": XML")] - ), - ("M", "change default body display mode"), - ("p", "previous flow"), - ("P", "copy response(content/headers) to clipboard"), - ("r", "replay request"), - ("V", "revert changes to request"), - ("v", "view body in external viewer"), - ("w", "save all flows matching current limit"), - ("W", "save this flow"), - ("x", "delete body"), - ("z", "encode/decode a request/response"), - ("tab", "next tab"), - ("h, l", "previous tab, next tab"), - ("space", "next flow"), - ("|", "run script on this flow"), - ("/", "search (case sensitive)"), - ("n", "repeat search forward"), - ("N", "repeat search backwards"), - ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) - return text -help_context = _mkhelp() - -footer = [ - ('heading_key', "?"), ":help ", - ('heading_key', "q"), ":back ", -] - - -class FlowViewHeader(urwid.WidgetWrap): - - def __init__(self, master, f): - self.master, self.flow = master, f - self._w = common.format_flow( - f, - False, - extended=True, - hostheader=self.master.showhost - ) - signals.flow_change.connect(self.sig_flow_change) - - def sig_flow_change(self, sender, flow): - if flow == self.flow: - self._w = common.format_flow( - flow, - False, - extended=True, - hostheader=self.master.showhost - ) - - -cache = utils.LRUCache(200) - -TAB_REQ = 0 -TAB_RESP = 1 - - -class FlowView(tabs.Tabs): - highlight_color = "focusfield" - - def __init__(self, master, state, flow, tab_offset): - self.master, self.state, self.flow = master, state, flow - tabs.Tabs.__init__(self, - [ - (self.tab_request, self.view_request), - (self.tab_response, self.view_response), - (self.tab_details, self.view_details), - ], - tab_offset - ) - self.show() - self.last_displayed_body = None - signals.flow_change.connect(self.sig_flow_change) - - def tab_request(self): - if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response: - return "Request intercepted" - else: - return "Request" - - def tab_response(self): - if self.flow.intercepted and not self.flow.reply.acked and self.flow.response: - return "Response intercepted" - else: - return "Response" - - def tab_details(self): - return "Detail" - - def view_request(self): - return self.conn_text(self.flow.request) - - def view_response(self): - return self.conn_text(self.flow.response) - - def view_details(self): - return flowdetailview.flowdetails(self.state, self.flow) - - def sig_flow_change(self, sender, flow): - if flow == self.flow: - self.show() - - def content_view(self, viewmode, message): - if message.content == CONTENT_MISSING: - msg, body = "", [urwid.Text([("error", "[content missing]")])] - return msg, body - else: - full = self.state.get_flow_setting( - self.flow, - (self.tab_offset, "fullcontents"), - False - ) - if full: - limit = sys.maxsize - else: - limit = contentviews.VIEW_CUTOFF - return cache.get( - self._get_content_view, - viewmode, - message, - limit, - (bytes(message.headers), message.content) # Cache invalidation - ) - - def _get_content_view(self, viewmode, message, max_lines, _): - - try: - description, lines = contentviews.get_content_view( - viewmode, message.content, headers=message.headers - ) - except ContentViewException: - s = "Content viewer failed: \n" + traceback.format_exc() - signals.add_event(s, "error") - description, lines = contentviews.get_content_view( - contentviews.get("Raw"), message.content, headers=message.headers - ) - description = description.replace("Raw", "Couldn't parse: falling back to Raw") - - # Give hint that you have to tab for the response. - if description == "No content" and isinstance(message, HTTPRequest): - description = "No request content (press tab to view response)" - - # If the users has a wide terminal, he gets fewer lines; this should not be an issue. - chars_per_line = 80 - max_chars = max_lines * chars_per_line - total_chars = 0 - text_objects = [] - for line in lines: - txt = [] - for (style, text) in line: - if total_chars + len(text) > max_chars: - text = text[:max_chars - total_chars] - txt.append((style, text)) - total_chars += len(text) - if total_chars == max_chars: - break - - # round up to the next line. - total_chars = int(math.ceil(total_chars / chars_per_line) * chars_per_line) - - text_objects.append(urwid.Text(txt)) - if total_chars == max_chars: - text_objects.append(urwid.Text([ - ("highlight", "Stopped displaying data after %d lines. Press " % max_lines), - ("key", "f"), - ("highlight", " to load all data.") - ])) - break - - return description, text_objects - - def viewmode_get(self): - override = self.state.get_flow_setting( - self.flow, - (self.tab_offset, "prettyview") - ) - return self.state.default_body_view if override is None else override - - def conn_text(self, conn): - if conn: - txt = common.format_keyvals( - [(h + ":", v) for (h, v) in conn.headers.fields], - key = "header", - val = "text" - ) - viewmode = self.viewmode_get() - msg, body = self.content_view(viewmode, conn) - - cols = [ - urwid.Text( - [ - ("heading", msg), - ] - ), - urwid.Text( - [ - " ", - ('heading', "["), - ('heading_key', "m"), - ('heading', (":%s]" % viewmode.name)), - ], - align="right" - ) - ] - title = urwid.AttrWrap(urwid.Columns(cols), "heading") - - txt.append(title) - txt.extend(body) - else: - txt = [ - urwid.Text(""), - urwid.Text( - [ - ("highlight", "No response. Press "), - ("key", "e"), - ("highlight", " and edit any aspect to add one."), - ] - ) - ] - return searchable.Searchable(self.state, txt) - - def set_method_raw(self, m): - if m: - self.flow.request.method = m - signals.flow_change.send(self, flow = self.flow) - - def edit_method(self, m): - if m == "e": - signals.status_prompt.send( - prompt = "Method", - text = self.flow.request.method, - callback = self.set_method_raw - ) - else: - for i in common.METHOD_OPTIONS: - if i[1] == m: - self.flow.request.method = i[0].upper() - signals.flow_change.send(self, flow = self.flow) - - def set_url(self, url): - request = self.flow.request - try: - request.url = str(url) - except ValueError: - return "Invalid URL." - signals.flow_change.send(self, flow = self.flow) - - def set_resp_code(self, code): - response = self.flow.response - try: - response.status_code = int(code) - except ValueError: - return None - import BaseHTTPServer - if int(code) in BaseHTTPServer.BaseHTTPRequestHandler.responses: - response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[ - int(code)][0] - signals.flow_change.send(self, flow = self.flow) - - def set_resp_msg(self, msg): - response = self.flow.response - response.msg = msg - signals.flow_change.send(self, flow = self.flow) - - def set_headers(self, fields, conn): - conn.headers = Headers(fields) - signals.flow_change.send(self, flow = self.flow) - - def set_query(self, lst, conn): - conn.set_query(odict.ODict(lst)) - signals.flow_change.send(self, flow = self.flow) - - def set_path_components(self, lst, conn): - conn.set_path_components(lst) - signals.flow_change.send(self, flow = self.flow) - - def set_form(self, lst, conn): - conn.set_form_urlencoded(odict.ODict(lst)) - signals.flow_change.send(self, flow = self.flow) - - def edit_form(self, conn): - self.master.view_grideditor( - grideditor.URLEncodedFormEditor( - self.master, - conn.get_form_urlencoded().lst, - self.set_form, - conn - ) - ) - - def edit_form_confirm(self, key, conn): - if key == "y": - self.edit_form(conn) - - def set_cookies(self, lst, conn): - od = odict.ODict(lst) - conn.set_cookies(od) - signals.flow_change.send(self, flow = self.flow) - - def set_setcookies(self, data, conn): - conn.set_cookies(data) - signals.flow_change.send(self, flow = self.flow) - - def edit(self, part): - if self.tab_offset == TAB_REQ: - message = self.flow.request - else: - if not self.flow.response: - self.flow.response = HTTPResponse( - self.flow.request.http_version, - 200, "OK", Headers(), "" - ) - self.flow.response.reply = controller.DummyReply() - message = self.flow.response - - self.flow.backup() - if message == self.flow.request and part == "c": - self.master.view_grideditor( - grideditor.CookieEditor( - self.master, - message.get_cookies().lst, - self.set_cookies, - message - ) - ) - if message == self.flow.response and part == "c": - self.master.view_grideditor( - grideditor.SetCookieEditor( - self.master, - message.get_cookies(), - self.set_setcookies, - message - ) - ) - if part == "r": - with decoded(message): - # Fix an issue caused by some editors when editing a - # request/response body. Many editors make it hard to save a - # file without a terminating newline on the last line. When - # editing message bodies, this can cause problems. For now, I - # just strip the newlines off the end of the body when we return - # from an editor. - c = self.master.spawn_editor(message.content or "") - message.content = c.rstrip("\n") - elif part == "f": - if not message.get_form_urlencoded() and message.content: - signals.status_prompt_onekey.send( - prompt = "Existing body is not a URL-encoded form. Clear and edit?", - keys = [ - ("yes", "y"), - ("no", "n"), - ], - callback = self.edit_form_confirm, - args = (message,) - ) - else: - self.edit_form(message) - elif part == "h": - self.master.view_grideditor( - grideditor.HeaderEditor( - self.master, - message.headers.fields, - self.set_headers, - message - ) - ) - elif part == "p": - p = message.get_path_components() - self.master.view_grideditor( - grideditor.PathEditor( - self.master, - p, - self.set_path_components, - message - ) - ) - elif part == "q": - self.master.view_grideditor( - grideditor.QueryEditor( - self.master, - message.get_query().lst, - self.set_query, message - ) - ) - elif part == "u": - signals.status_prompt.send( - prompt = "URL", - text = message.url, - callback = self.set_url - ) - elif part == "m": - signals.status_prompt_onekey.send( - prompt = "Method", - keys = common.METHOD_OPTIONS, - callback = self.edit_method - ) - elif part == "o": - signals.status_prompt.send( - prompt = "Code", - text = str(message.status_code), - callback = self.set_resp_code - ) - elif part == "m": - signals.status_prompt.send( - prompt = "Message", - text = message.msg, - callback = self.set_resp_msg - ) - signals.flow_change.send(self, flow = self.flow) - - def _view_nextprev_flow(self, np, flow): - try: - idx = self.state.view.index(flow) - except IndexError: - return - if np == "next": - new_flow, new_idx = self.state.get_next(idx) - else: - new_flow, new_idx = self.state.get_prev(idx) - if new_flow is None: - signals.status_message.send(message="No more flows!") - else: - signals.pop_view_state.send(self) - self.master.view_flow(new_flow, self.tab_offset) - - def view_next_flow(self, flow): - return self._view_nextprev_flow("next", flow) - - def view_prev_flow(self, flow): - return self._view_nextprev_flow("prev", flow) - - def change_this_display_mode(self, t): - self.state.add_flow_setting( - self.flow, - (self.tab_offset, "prettyview"), - contentviews.get_by_shortcut(t) - ) - signals.flow_change.send(self, flow = self.flow) - - def delete_body(self, t): - if t == "m": - val = CONTENT_MISSING - else: - val = None - if self.tab_offset == TAB_REQ: - self.flow.request.content = val - else: - self.flow.response.content = val - signals.flow_change.send(self, flow = self.flow) - - def keypress(self, size, key): - key = super(self.__class__, self).keypress(size, key) - - if key == " ": - self.view_next_flow(self.flow) - return - - key = common.shortcuts(key) - if self.tab_offset == TAB_REQ: - conn = self.flow.request - elif self.tab_offset == TAB_RESP: - conn = self.flow.response - else: - conn = None - - if key in ("up", "down", "page up", "page down"): - # Why doesn't this just work?? - self._w.keypress(size, key) - elif key == "a": - self.flow.accept_intercept(self.master) - signals.flow_change.send(self, flow = self.flow) - elif key == "A": - self.master.accept_all() - signals.flow_change.send(self, flow = self.flow) - elif key == "d": - if self.state.flow_count() == 1: - self.master.view_flowlist() - elif self.state.view.index(self.flow) == len(self.state.view) - 1: - self.view_prev_flow(self.flow) - else: - self.view_next_flow(self.flow) - f = self.flow - f.kill(self.master) - self.state.delete_flow(f) - elif key == "D": - f = self.master.duplicate_flow(self.flow) - self.master.view_flow(f) - signals.status_message.send(message="Duplicated.") - elif key == "p": - self.view_prev_flow(self.flow) - elif key == "r": - r = self.master.replay_request(self.flow) - if r: - signals.status_message.send(message=r) - signals.flow_change.send(self, flow = self.flow) - elif key == "V": - if not self.flow.modified(): - signals.status_message.send(message="Flow not modified.") - return - self.state.revert(self.flow) - signals.flow_change.send(self, flow = self.flow) - signals.status_message.send(message="Reverted.") - elif key == "W": - signals.status_prompt_path.send( - prompt = "Save this flow", - callback = self.master.save_one_flow, - args = (self.flow,) - ) - elif key == "E": - signals.status_prompt_onekey.send( - self, - prompt = "Export", - keys = ( - ("as curl command", "c"), - ("as python code", "p"), - ("as raw request", "r"), - ), - callback = common.export_prompt, - args = (self.flow,) - ) - elif key == "|": - signals.status_prompt_path.send( - prompt = "Send flow to script", - callback = self.master.run_script_once, - args = (self.flow,) - ) - - if not conn and key in set(list("befgmxvz")): - signals.status_message.send( - message = "Tab to the request or response", - expire = 1 - ) - elif conn: - if key == "b": - if self.tab_offset == TAB_REQ: - common.ask_save_body( - "q", self.master, self.state, self.flow - ) - else: - common.ask_save_body( - "s", self.master, self.state, self.flow - ) - elif key == "e": - if self.tab_offset == TAB_REQ: - signals.status_prompt_onekey.send( - prompt = "Edit request", - keys = ( - ("cookies", "c"), - ("query", "q"), - ("path", "p"), - ("url", "u"), - ("header", "h"), - ("form", "f"), - ("raw body", "r"), - ("method", "m"), - ), - callback = self.edit - ) - else: - signals.status_prompt_onekey.send( - prompt = "Edit response", - keys = ( - ("cookies", "c"), - ("code", "o"), - ("message", "m"), - ("header", "h"), - ("raw body", "r"), - ), - callback = self.edit - ) - key = None - elif key == "f": - signals.status_message.send(message="Loading all body data...") - self.state.add_flow_setting( - self.flow, - (self.tab_offset, "fullcontents"), - True - ) - signals.flow_change.send(self, flow = self.flow) - signals.status_message.send(message="") - elif key == "P": - if self.tab_offset == TAB_REQ: - scope = "q" - else: - scope = "s" - common.ask_copy_part(scope, self.flow, self.master, self.state) - elif key == "m": - p = list(contentviews.view_prompts) - p.insert(0, ("Clear", "C")) - signals.status_prompt_onekey.send( - self, - prompt = "Display mode", - keys = p, - callback = self.change_this_display_mode - ) - key = None - elif key == "x": - signals.status_prompt_onekey.send( - prompt = "Delete body", - keys = ( - ("completely", "c"), - ("mark as missing", "m"), - ), - callback = self.delete_body - ) - key = None - elif key == "v": - if conn.content: - t = conn.headers.get("content-type") - if "EDITOR" in os.environ or "PAGER" in os.environ: - self.master.spawn_external_viewer(conn.content, t) - else: - signals.status_message.send( - message = "Error! Set $EDITOR or $PAGER." - ) - elif key == "z": - self.flow.backup() - e = conn.headers.get("content-encoding", "identity") - if e != "identity": - if not conn.decode(): - signals.status_message.send( - message = "Could not decode - invalid data?" - ) - else: - signals.status_prompt_onekey.send( - prompt = "Select encoding: ", - keys = ( - ("gzip", "z"), - ("deflate", "d"), - ), - callback = self.encode_callback, - args = (conn,) - ) - signals.flow_change.send(self, flow = self.flow) - return key - - def encode_callback(self, key, conn): - encoding_map = { - "z": "gzip", - "d": "deflate", - } - conn.encode(encoding_map[key]) - signals.flow_change.send(self, flow = self.flow) diff --git a/libmproxy/console/grideditor.py b/libmproxy/console/grideditor.py deleted file mode 100644 index a11c962c..00000000 --- a/libmproxy/console/grideditor.py +++ /dev/null @@ -1,716 +0,0 @@ -from __future__ import absolute_import - -import copy -import re -import os -import urwid - -from netlib import odict -from netlib.http import user_agents - -from . import common, signals -from .. import utils, filt, script - - -FOOTER = [ - ('heading_key', "enter"), ":edit ", - ('heading_key', "q"), ":back ", -] -FOOTER_EDITING = [ - ('heading_key', "esc"), ":stop editing ", -] - - -class TextColumn: - subeditor = None - - def __init__(self, heading): - self.heading = heading - - def text(self, obj): - return SEscaped(obj or "") - - def blank(self): - return "" - - def keypress(self, key, editor): - if key == "r": - if editor.walker.get_current_value() is not None: - signals.status_prompt_path.send( - self, - prompt = "Read file", - callback = editor.read_file - ) - elif key == "R": - if editor.walker.get_current_value() is not None: - signals.status_prompt_path.send( - editor, - prompt = "Read unescaped file", - callback = editor.read_file, - args = (True,) - ) - elif key == "e": - o = editor.walker.get_current_value() - if o is not None: - n = editor.master.spawn_editor(o.encode("string-escape")) - n = utils.clean_hanging_newline(n) - editor.walker.set_current_value(n, False) - editor.walker._modified() - elif key in ["enter"]: - editor.walker.start_edit() - else: - return key - - -class SubgridColumn: - - def __init__(self, heading, subeditor): - self.heading = heading - self.subeditor = subeditor - - def text(self, obj): - p = http_cookies._format_pairs(obj, sep="\n") - return urwid.Text(p) - - def blank(self): - return [] - - def keypress(self, key, editor): - if key in "rRe": - signals.status_message.send( - self, - message = "Press enter to edit this field.", - expire = 1000 - ) - return - elif key in ["enter"]: - editor.master.view_grideditor( - self.subeditor( - editor.master, - editor.walker.get_current_value(), - editor.set_subeditor_value, - editor.walker.focus, - editor.walker.focus_col - ) - ) - else: - return key - - -class SEscaped(urwid.WidgetWrap): - - def __init__(self, txt): - txt = txt.encode("string-escape") - w = urwid.Text(txt, wrap="any") - urwid.WidgetWrap.__init__(self, w) - - def get_text(self): - return self._w.get_text()[0] - - def keypress(self, size, key): - return key - - def selectable(self): - return True - - -class SEdit(urwid.WidgetWrap): - - def __init__(self, txt): - txt = txt.encode("string-escape") - w = urwid.Edit(edit_text=txt, wrap="any", multiline=True) - w = urwid.AttrWrap(w, "editfield") - urwid.WidgetWrap.__init__(self, w) - - def get_text(self): - return self._w.get_text()[0].strip() - - def selectable(self): - return True - - -class GridRow(urwid.WidgetWrap): - - def __init__(self, focused, editing, editor, values): - self.focused, self.editing, self.editor = focused, editing, editor - - errors = values[1] - self.fields = [] - for i, v in enumerate(values[0]): - if focused == i and editing: - self.editing = SEdit(v) - self.fields.append(self.editing) - else: - w = self.editor.columns[i].text(v) - if focused == i: - if i in errors: - w = urwid.AttrWrap(w, "focusfield_error") - else: - w = urwid.AttrWrap(w, "focusfield") - elif i in errors: - w = urwid.AttrWrap(w, "field_error") - self.fields.append(w) - - fspecs = self.fields[:] - if len(self.fields) > 1: - fspecs[0] = ("fixed", self.editor.first_width + 2, fspecs[0]) - w = urwid.Columns( - fspecs, - dividechars = 2 - ) - if focused is not None: - w.set_focus_column(focused) - urwid.WidgetWrap.__init__(self, w) - - def get_edit_value(self): - return self.editing.get_text() - - def keypress(self, s, k): - if self.editing: - w = self._w.column_widths(s)[self.focused] - k = self.editing.keypress((w,), k) - return k - - def selectable(self): - return True - - -class GridWalker(urwid.ListWalker): - - """ - Stores rows as a list of (rows, errors) tuples, where rows is a list - and errors is a set with an entry of each offset in rows that is an - error. - """ - - def __init__(self, lst, editor): - self.lst = [(i, set([])) for i in lst] - self.editor = editor - self.focus = 0 - self.focus_col = 0 - self.editing = False - - def _modified(self): - self.editor.show_empty_msg() - return urwid.ListWalker._modified(self) - - def add_value(self, lst): - self.lst.append((lst[:], set([]))) - self._modified() - - def get_current_value(self): - if self.lst: - return self.lst[self.focus][0][self.focus_col] - - def set_current_value(self, val, unescaped): - if not unescaped: - try: - val = val.decode("string-escape") - except ValueError: - signals.status_message.send( - self, - message = "Invalid Python-style string encoding.", - expire = 1000 - ) - return - errors = self.lst[self.focus][1] - emsg = self.editor.is_error(self.focus_col, val) - if emsg: - signals.status_message.send(message = emsg, expire = 1) - errors.add(self.focus_col) - else: - errors.discard(self.focus_col) - self.set_value(val, self.focus, self.focus_col, errors) - - def set_value(self, val, focus, focus_col, errors=None): - if not errors: - errors = set([]) - row = list(self.lst[focus][0]) - row[focus_col] = val - self.lst[focus] = [tuple(row), errors] - self._modified() - - def delete_focus(self): - if self.lst: - del self.lst[self.focus] - self.focus = min(len(self.lst) - 1, self.focus) - self._modified() - - def _insert(self, pos): - self.focus = pos - self.lst.insert( - self.focus, - [ - [c.blank() for c in self.editor.columns], set([]) - ] - ) - self.focus_col = 0 - self.start_edit() - - def insert(self): - return self._insert(self.focus) - - def add(self): - return self._insert(min(self.focus + 1, len(self.lst))) - - def start_edit(self): - col = self.editor.columns[self.focus_col] - if self.lst and not col.subeditor: - self.editing = GridRow( - self.focus_col, True, self.editor, self.lst[self.focus] - ) - self.editor.master.loop.widget.footer.update(FOOTER_EDITING) - self._modified() - - def stop_edit(self): - if self.editing: - self.editor.master.loop.widget.footer.update(FOOTER) - self.set_current_value(self.editing.get_edit_value(), False) - self.editing = False - self._modified() - - def left(self): - self.focus_col = max(self.focus_col - 1, 0) - self._modified() - - def right(self): - self.focus_col = min(self.focus_col + 1, len(self.editor.columns) - 1) - self._modified() - - def tab_next(self): - self.stop_edit() - if self.focus_col < len(self.editor.columns) - 1: - self.focus_col += 1 - elif self.focus != len(self.lst) - 1: - self.focus_col = 0 - self.focus += 1 - self._modified() - - def get_focus(self): - if self.editing: - return self.editing, self.focus - elif self.lst: - return GridRow( - self.focus_col, - False, - self.editor, - self.lst[self.focus] - ), self.focus - else: - return None, None - - def set_focus(self, focus): - self.stop_edit() - self.focus = focus - self._modified() - - def get_next(self, pos): - if pos + 1 >= len(self.lst): - return None, None - return GridRow(None, False, self.editor, self.lst[pos + 1]), pos + 1 - - def get_prev(self, pos): - if pos - 1 < 0: - return None, None - return GridRow(None, False, self.editor, self.lst[pos - 1]), pos - 1 - - -class GridListBox(urwid.ListBox): - - def __init__(self, lw): - urwid.ListBox.__init__(self, lw) - - -FIRST_WIDTH_MAX = 40 -FIRST_WIDTH_MIN = 20 - - -class GridEditor(urwid.WidgetWrap): - title = None - columns = None - - def __init__(self, master, value, callback, *cb_args, **cb_kwargs): - value = self.data_in(copy.deepcopy(value)) - self.master, self.value, self.callback = master, value, callback - self.cb_args, self.cb_kwargs = cb_args, cb_kwargs - - first_width = 20 - if value: - for r in value: - assert len(r) == len(self.columns) - first_width = max(len(r), first_width) - self.first_width = min(first_width, FIRST_WIDTH_MAX) - - title = urwid.Text(self.title) - title = urwid.Padding(title, align="left", width=("relative", 100)) - title = urwid.AttrWrap(title, "heading") - - headings = [] - for i, col in enumerate(self.columns): - c = urwid.Text(col.heading) - if i == 0 and len(self.columns) > 1: - headings.append(("fixed", first_width + 2, c)) - else: - headings.append(c) - h = urwid.Columns( - headings, - dividechars = 2 - ) - h = urwid.AttrWrap(h, "heading") - - self.walker = GridWalker(self.value, self) - self.lb = GridListBox(self.walker) - self._w = urwid.Frame( - self.lb, - header = urwid.Pile([title, h]) - ) - self.master.loop.widget.footer.update("") - self.show_empty_msg() - - def show_empty_msg(self): - if self.walker.lst: - self._w.set_footer(None) - else: - self._w.set_footer( - urwid.Text( - [ - ("highlight", "No values. Press "), - ("key", "a"), - ("highlight", " to add some."), - ] - ) - ) - - def encode(self, s): - if not self.encoding: - return s - try: - return s.encode(self.encoding) - except ValueError: - return None - - def read_file(self, p, unescaped=False): - if p: - try: - p = os.path.expanduser(p) - d = file(p, "rb").read() - self.walker.set_current_value(d, unescaped) - self.walker._modified() - except IOError as v: - return str(v) - - def set_subeditor_value(self, val, focus, focus_col): - self.walker.set_value(val, focus, focus_col) - - def keypress(self, size, key): - if self.walker.editing: - if key in ["esc"]: - self.walker.stop_edit() - elif key == "tab": - pf, pfc = self.walker.focus, self.walker.focus_col - self.walker.tab_next() - if self.walker.focus == pf and self.walker.focus_col != pfc: - self.walker.start_edit() - else: - self._w.keypress(size, key) - return None - - key = common.shortcuts(key) - column = self.columns[self.walker.focus_col] - if key in ["q", "esc"]: - res = [] - for i in self.walker.lst: - if not i[1] and any([x for x in i[0]]): - res.append(i[0]) - self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs) - signals.pop_view_state.send(self) - elif key == "g": - self.walker.set_focus(0) - elif key == "G": - self.walker.set_focus(len(self.walker.lst) - 1) - elif key in ["h", "left"]: - self.walker.left() - elif key in ["l", "right"]: - self.walker.right() - elif key == "tab": - self.walker.tab_next() - elif key == "a": - self.walker.add() - elif key == "A": - self.walker.insert() - elif key == "d": - self.walker.delete_focus() - elif column.keypress(key, self) and not self.handle_key(key): - return self._w.keypress(size, key) - - def data_out(self, data): - """ - Called on raw list data, before data is returned through the - callback. - """ - return data - - def data_in(self, data): - """ - Called to prepare provided data. - """ - return data - - def is_error(self, col, val): - """ - Return False, or a string error message. - """ - return False - - def handle_key(self, key): - return False - - def make_help(self): - text = [] - text.append(urwid.Text([("text", "Editor control:\n")])) - keys = [ - ("A", "insert row before cursor"), - ("a", "add row after cursor"), - ("d", "delete row"), - ("e", "spawn external editor on current field"), - ("q", "save changes and exit editor"), - ("r", "read value from file"), - ("R", "read unescaped value from file"), - ("esc", "save changes and exit editor"), - ("tab", "next field"), - ("enter", "edit field"), - ] - text.extend( - common.format_keyvals(keys, key="key", val="text", indent=4) - ) - text.append( - urwid.Text( - [ - "\n", - ("text", "Values are escaped Python-style strings.\n"), - ] - ) - ) - return text - - -class QueryEditor(GridEditor): - title = "Editing query" - columns = [ - TextColumn("Key"), - TextColumn("Value") - ] - - -class HeaderEditor(GridEditor): - title = "Editing headers" - columns = [ - TextColumn("Key"), - TextColumn("Value") - ] - - def make_help(self): - h = GridEditor.make_help(self) - text = [] - text.append(urwid.Text([("text", "Special keys:\n")])) - keys = [ - ("U", "add User-Agent header"), - ] - text.extend( - common.format_keyvals(keys, key="key", val="text", indent=4) - ) - text.append(urwid.Text([("text", "\n")])) - text.extend(h) - return text - - def set_user_agent(self, k): - ua = user_agents.get_by_shortcut(k) - if ua: - self.walker.add_value( - [ - "User-Agent", - ua[2] - ] - ) - - def handle_key(self, key): - if key == "U": - signals.status_prompt_onekey.send( - prompt = "Add User-Agent header:", - keys = [(i[0], i[1]) for i in user_agents.UASTRINGS], - callback = self.set_user_agent, - ) - return True - - -class URLEncodedFormEditor(GridEditor): - title = "Editing URL-encoded form" - columns = [ - TextColumn("Key"), - TextColumn("Value") - ] - - -class ReplaceEditor(GridEditor): - title = "Editing replacement patterns" - columns = [ - TextColumn("Filter"), - TextColumn("Regex"), - TextColumn("Replacement"), - ] - - def is_error(self, col, val): - if col == 0: - if not filt.parse(val): - return "Invalid filter specification." - elif col == 1: - try: - re.compile(val) - except re.error: - return "Invalid regular expression." - return False - - -class SetHeadersEditor(GridEditor): - title = "Editing header set patterns" - columns = [ - TextColumn("Filter"), - TextColumn("Header"), - TextColumn("Value"), - ] - - def is_error(self, col, val): - if col == 0: - if not filt.parse(val): - return "Invalid filter specification" - return False - - def make_help(self): - h = GridEditor.make_help(self) - text = [] - text.append(urwid.Text([("text", "Special keys:\n")])) - keys = [ - ("U", "add User-Agent header"), - ] - text.extend( - common.format_keyvals(keys, key="key", val="text", indent=4) - ) - text.append(urwid.Text([("text", "\n")])) - text.extend(h) - return text - - def set_user_agent(self, k): - ua = user_agents.get_by_shortcut(k) - if ua: - self.walker.add_value( - [ - ".*", - "User-Agent", - ua[2] - ] - ) - - def handle_key(self, key): - if key == "U": - signals.status_prompt_onekey.send( - prompt = "Add User-Agent header:", - keys = [(i[0], i[1]) for i in user_agents.UASTRINGS], - callback = self.set_user_agent, - ) - return True - - -class PathEditor(GridEditor): - title = "Editing URL path components" - columns = [ - TextColumn("Component"), - ] - - def data_in(self, data): - return [[i] for i in data] - - def data_out(self, data): - return [i[0] for i in data] - - -class ScriptEditor(GridEditor): - title = "Editing scripts" - columns = [ - TextColumn("Command"), - ] - - def is_error(self, col, val): - try: - script.Script.parse_command(val) - except script.ScriptException as v: - return str(v) - - -class HostPatternEditor(GridEditor): - title = "Editing host patterns" - columns = [ - TextColumn("Regex (matched on hostname:port / ip:port)") - ] - - def is_error(self, col, val): - try: - re.compile(val, re.IGNORECASE) - except re.error as e: - return "Invalid regex: %s" % str(e) - - def data_in(self, data): - return [[i] for i in data] - - def data_out(self, data): - return [i[0] for i in data] - - -class CookieEditor(GridEditor): - title = "Editing request Cookie header" - columns = [ - TextColumn("Name"), - TextColumn("Value"), - ] - - -class CookieAttributeEditor(GridEditor): - title = "Editing Set-Cookie attributes" - columns = [ - TextColumn("Name"), - TextColumn("Value"), - ] - - def data_out(self, data): - ret = [] - for i in data: - if not i[1]: - ret.append([i[0], None]) - else: - ret.append(i) - return ret - - -class SetCookieEditor(GridEditor): - title = "Editing response SetCookie header" - columns = [ - TextColumn("Name"), - TextColumn("Value"), - SubgridColumn("Attributes", CookieAttributeEditor), - ] - - def data_in(self, data): - flattened = [] - for k, v in data.items(): - flattened.append([k, v[0], v[1].lst]) - return flattened - - def data_out(self, data): - vals = [] - for i in data: - vals.append( - [ - i[0], - [i[1], odict.ODictCaseless(i[2])] - ] - ) - return odict.ODict(vals) diff --git a/libmproxy/console/help.py b/libmproxy/console/help.py deleted file mode 100644 index 0c264ebf..00000000 --- a/libmproxy/console/help.py +++ /dev/null @@ -1,117 +0,0 @@ -from __future__ import absolute_import - -import urwid - -from . import common, signals -from .. import filt, version - -footer = [ - ("heading", 'mitmproxy v%s ' % version.VERSION), - ('heading_key', "q"), ":back ", -] - - -class HelpView(urwid.ListBox): - - def __init__(self, help_context): - self.help_context = help_context or [] - urwid.ListBox.__init__( - self, - self.helptext() - ) - - def helptext(self): - text = [] - text.append(urwid.Text([("head", "This view:\n")])) - text.extend(self.help_context) - - text.append(urwid.Text([("head", "\n\nMovement:\n")])) - keys = [ - ("j, k", "down, up"), - ("h, l", "left, right (in some contexts)"), - ("g, G", "go to beginning, end"), - ("space", "page down"), - ("pg up/down", "page up/down"), - ("ctrl+b/ctrl+f", "page up/down"), - ("arrows", "up, down, left, right"), - ] - text.extend( - common.format_keyvals( - keys, - key="key", - val="text", - indent=4)) - - text.append(urwid.Text([("head", "\n\nGlobal keys:\n")])) - keys = [ - ("c", "client replay of HTTP requests"), - ("i", "set interception pattern"), - ("o", "options"), - ("q", "quit / return to previous page"), - ("Q", "quit without confirm prompt"), - ("S", "server replay of HTTP responses"), - ] - text.extend( - common.format_keyvals(keys, key="key", val="text", indent=4) - ) - - text.append(urwid.Text([("head", "\n\nFilter expressions:\n")])) - f = [] - for i in filt.filt_unary: - f.append( - ("~%s" % i.code, i.help) - ) - for i in filt.filt_rex: - f.append( - ("~%s regex" % i.code, i.help) - ) - for i in filt.filt_int: - f.append( - ("~%s int" % i.code, i.help) - ) - f.sort() - f.extend( - [ - ("!", "unary not"), - ("&", "and"), - ("|", "or"), - ("(...)", "grouping"), - ] - ) - text.extend(common.format_keyvals(f, key="key", val="text", indent=4)) - - text.append( - urwid.Text( - [ - "\n", - ("text", " Regexes are Python-style.\n"), - ("text", " Regexes can be specified as quoted strings.\n"), - ("text", " Header matching (~h, ~hq, ~hs) is against a string of the form \"name: value\".\n"), - ("text", " Expressions with no operators are regex matches against URL.\n"), - ("text", " Default binary operator is &.\n"), - ("head", "\n Examples:\n"), - ] - ) - ) - examples = [ - ("google\.com", "Url containing \"google.com"), - ("~q ~b test", "Requests where body contains \"test\""), - ("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."), - ] - text.extend( - common.format_keyvals(examples, key="key", val="text", indent=4) - ) - return text - - def keypress(self, size, key): - key = common.shortcuts(key) - if key == "q": - signals.pop_view_state.send(self) - return None - elif key == "?": - key = None - elif key == "g": - self.set_focus(0) - elif key == "G": - self.set_focus(len(self.body.contents)) - return urwid.ListBox.keypress(self, size, key) diff --git a/libmproxy/console/options.py b/libmproxy/console/options.py deleted file mode 100644 index 5c9e0cc9..00000000 --- a/libmproxy/console/options.py +++ /dev/null @@ -1,271 +0,0 @@ -import urwid - -from .. import contentviews -from . import common, signals, grideditor -from . import select, palettes - -footer = [ - ('heading_key', "enter/space"), ":toggle ", - ('heading_key', "C"), ":clear all ", -] - - -def _mkhelp(): - text = [] - keys = [ - ("enter/space", "activate option"), - ("C", "clear all options"), - ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) - return text -help_context = _mkhelp() - - -class Options(urwid.WidgetWrap): - - def __init__(self, master): - self.master = master - self.lb = select.Select( - [ - select.Heading("Traffic Manipulation"), - select.Option( - "Header Set Patterns", - "H", - lambda: master.setheaders.count(), - self.setheaders - ), - select.Option( - "Ignore Patterns", - "I", - lambda: master.server.config.check_ignore, - self.ignorepatterns - ), - select.Option( - "Replacement Patterns", - "R", - lambda: master.replacehooks.count(), - self.replacepatterns - ), - select.Option( - "Scripts", - "S", - lambda: master.scripts, - self.scripts - ), - - select.Heading("Interface"), - select.Option( - "Default Display Mode", - "M", - self.has_default_displaymode, - self.default_displaymode - ), - select.Option( - "Palette", - "P", - lambda: self.master.palette != palettes.DEFAULT, - self.palette - ), - select.Option( - "Show Host", - "w", - lambda: master.showhost, - self.toggle_showhost - ), - - select.Heading("Network"), - select.Option( - "No Upstream Certs", - "U", - lambda: master.server.config.no_upstream_cert, - self.toggle_upstream_cert - ), - select.Option( - "TCP Proxying", - "T", - lambda: master.server.config.check_tcp, - self.tcp_proxy - ), - - select.Heading("Utility"), - select.Option( - "Anti-Cache", - "a", - lambda: master.anticache, - self.toggle_anticache - ), - select.Option( - "Anti-Compression", - "o", - lambda: master.anticomp, - self.toggle_anticomp - ), - select.Option( - "Kill Extra", - "x", - lambda: master.killextra, - self.toggle_killextra - ), - select.Option( - "No Refresh", - "f", - lambda: not master.refresh_server_playback, - self.toggle_refresh_server_playback - ), - select.Option( - "Sticky Auth", - "A", - lambda: master.stickyauth_txt, - self.sticky_auth - ), - select.Option( - "Sticky Cookies", - "t", - lambda: master.stickycookie_txt, - self.sticky_cookie - ), - ] - ) - title = urwid.Text("Options") - title = urwid.Padding(title, align="left", width=("relative", 100)) - title = urwid.AttrWrap(title, "heading") - self._w = urwid.Frame( - self.lb, - header = title - ) - self.master.loop.widget.footer.update("") - signals.update_settings.connect(self.sig_update_settings) - - def sig_update_settings(self, sender): - self.lb.walker._modified() - - def keypress(self, size, key): - if key == "C": - self.clearall() - return None - return super(self.__class__, self).keypress(size, key) - - def clearall(self): - self.master.anticache = False - self.master.anticomp = False - self.master.killextra = False - self.master.showhost = False - self.master.refresh_server_playback = True - self.master.server.config.no_upstream_cert = False - self.master.setheaders.clear() - self.master.replacehooks.clear() - self.master.set_ignore_filter([]) - self.master.set_tcp_filter([]) - self.master.scripts = [] - self.master.set_stickyauth(None) - self.master.set_stickycookie(None) - self.master.state.default_body_view = contentviews.get("Auto") - - signals.update_settings.send(self) - signals.status_message.send( - message = "All select.Options cleared", - expire = 1 - ) - - def toggle_anticache(self): - self.master.anticache = not self.master.anticache - - def toggle_anticomp(self): - self.master.anticomp = not self.master.anticomp - - def toggle_killextra(self): - self.master.killextra = not self.master.killextra - - def toggle_showhost(self): - self.master.showhost = not self.master.showhost - - def toggle_refresh_server_playback(self): - self.master.refresh_server_playback = not self.master.refresh_server_playback - - def toggle_upstream_cert(self): - self.master.server.config.no_upstream_cert = not self.master.server.config.no_upstream_cert - signals.update_settings.send(self) - - def setheaders(self): - def _set(*args, **kwargs): - self.master.setheaders.set(*args, **kwargs) - signals.update_settings.send(self) - self.master.view_grideditor( - grideditor.SetHeadersEditor( - self.master, - self.master.setheaders.get_specs(), - _set - ) - ) - - def ignorepatterns(self): - def _set(ignore): - self.master.set_ignore_filter(ignore) - signals.update_settings.send(self) - self.master.view_grideditor( - grideditor.HostPatternEditor( - self.master, - self.master.get_ignore_filter(), - _set - ) - ) - - def replacepatterns(self): - def _set(*args, **kwargs): - self.master.replacehooks.set(*args, **kwargs) - signals.update_settings.send(self) - self.master.view_grideditor( - grideditor.ReplaceEditor( - self.master, - self.master.replacehooks.get_specs(), - _set - ) - ) - - def scripts(self): - self.master.view_grideditor( - grideditor.ScriptEditor( - self.master, - [[i.command] for i in self.master.scripts], - self.master.edit_scripts - ) - ) - - def default_displaymode(self): - signals.status_prompt_onekey.send( - prompt = "Global default display mode", - keys = contentviews.view_prompts, - callback = self.master.change_default_display_mode - ) - - def has_default_displaymode(self): - return self.master.state.default_body_view.name != "Auto" - - def tcp_proxy(self): - def _set(tcp): - self.master.set_tcp_filter(tcp) - signals.update_settings.send(self) - self.master.view_grideditor( - grideditor.HostPatternEditor( - self.master, - self.master.get_tcp_filter(), - _set - ) - ) - - def sticky_auth(self): - signals.status_prompt.send( - prompt = "Sticky auth filter", - text = self.master.stickyauth_txt, - callback = self.master.set_stickyauth - ) - - def sticky_cookie(self): - signals.status_prompt.send( - prompt = "Sticky cookie filter", - text = self.master.stickycookie_txt, - callback = self.master.set_stickycookie - ) - - def palette(self): - self.master.view_palette_picker() diff --git a/libmproxy/console/palettepicker.py b/libmproxy/console/palettepicker.py deleted file mode 100644 index 51ad0606..00000000 --- a/libmproxy/console/palettepicker.py +++ /dev/null @@ -1,82 +0,0 @@ -import urwid - -from . import select, common, palettes, signals - -footer = [ - ('heading_key', "enter/space"), ":select", -] - - -def _mkhelp(): - text = [] - keys = [ - ("enter/space", "select"), - ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) - return text -help_context = _mkhelp() - - -class PalettePicker(urwid.WidgetWrap): - - def __init__(self, master): - self.master = master - low, high = [], [] - for k, v in palettes.palettes.items(): - if v.high: - high.append(k) - else: - low.append(k) - high.sort() - low.sort() - - options = [ - select.Heading("High Colour") - ] - - def mkopt(name): - return select.Option( - i, - None, - lambda: self.master.palette == name, - lambda: self.select(name) - ) - - for i in high: - options.append(mkopt(i)) - options.append(select.Heading("Low Colour")) - for i in low: - options.append(mkopt(i)) - - options.extend( - [ - select.Heading("Options"), - select.Option( - "Transparent", - "T", - lambda: master.palette_transparent, - self.toggle_palette_transparent - ) - ] - ) - - self.lb = select.Select(options) - title = urwid.Text("Palettes") - title = urwid.Padding(title, align="left", width=("relative", 100)) - title = urwid.AttrWrap(title, "heading") - self._w = urwid.Frame( - self.lb, - header = title - ) - signals.update_settings.connect(self.sig_update_settings) - - def sig_update_settings(self, sender): - self.lb.walker._modified() - - def select(self, name): - self.master.set_palette(name) - - def toggle_palette_transparent(self): - self.master.palette_transparent = not self.master.palette_transparent - self.master.set_palette(self.master.palette) - signals.update_settings.send(self) diff --git a/libmproxy/console/palettes.py b/libmproxy/console/palettes.py deleted file mode 100644 index bd370181..00000000 --- a/libmproxy/console/palettes.py +++ /dev/null @@ -1,326 +0,0 @@ -# Low-color themes should ONLY use the standard foreground and background -# colours listed here: -# -# http://urwid.org/manual/displayattributes.html -# - - -class Palette: - _fields = [ - 'background', - 'title', - - # Status bar & heading - 'heading', 'heading_key', 'heading_inactive', - - # Help - 'key', 'head', 'text', - - # Options - 'option_selected', 'option_active', 'option_active_selected', - 'option_selected_key', - - # List and Connections - 'method', 'focus', - 'code_200', 'code_300', 'code_400', 'code_500', 'code_other', - 'error', - 'header', 'highlight', 'intercept', 'replay', 'mark', - - # Hex view - 'offset', - - # Grid Editor - 'focusfield', 'focusfield_error', 'field_error', 'editfield', - ] - high = None - - def palette(self, transparent): - l = [] - highback, lowback = None, None - if not transparent: - if self.high and self.high.get("background"): - highback = self.high["background"][1] - lowback = self.low["background"][1] - - for i in self._fields: - if transparent and i == "background": - l.append(["background", "default", "default"]) - else: - v = [i] - low = list(self.low[i]) - if lowback and low[1] == "default": - low[1] = lowback - v.extend(low) - if self.high and i in self.high: - v.append(None) - high = list(self.high[i]) - if highback and high[1] == "default": - high[1] = highback - v.extend(high) - elif highback and self.low[i][1] == "default": - high = [None, low[0], highback] - v.extend(high) - l.append(tuple(v)) - return l - - -class LowDark(Palette): - - """ - Low-color dark background - """ - low = dict( - background = ('white', 'black'), - title = ('white,bold', 'default'), - - # Status bar & heading - heading = ('white', 'dark blue'), - heading_key = ('light cyan', 'dark blue'), - heading_inactive = ('dark gray', 'light gray'), - - # Help - key = ('light cyan', 'default'), - head = ('white,bold', 'default'), - text = ('light gray', 'default'), - - # Options - option_selected = ('black', 'light gray'), - option_selected_key = ('light cyan', 'light gray'), - option_active = ('light red', 'default'), - option_active_selected = ('light red', 'light gray'), - - # List and Connections - method = ('dark cyan', 'default'), - focus = ('yellow', 'default'), - - code_200 = ('dark green', 'default'), - code_300 = ('light blue', 'default'), - code_400 = ('light red', 'default'), - code_500 = ('light red', 'default'), - code_other = ('dark red', 'default'), - - error = ('light red', 'default'), - - header = ('dark cyan', 'default'), - highlight = ('white,bold', 'default'), - intercept = ('brown', 'default'), - replay = ('light green', 'default'), - mark = ('light red', 'default'), - - # Hex view - offset = ('dark cyan', 'default'), - - # Grid Editor - focusfield = ('black', 'light gray'), - focusfield_error = ('dark red', 'light gray'), - field_error = ('dark red', 'default'), - editfield = ('white', 'default'), - ) - - -class Dark(LowDark): - high = dict( - heading_inactive = ('g58', 'g11'), - intercept = ('#f60', 'default'), - - option_selected = ('g85', 'g45'), - option_selected_key = ('light cyan', 'g50'), - option_active_selected = ('light red', 'g50'), - ) - - -class LowLight(Palette): - - """ - Low-color light background - """ - low = dict( - background = ('black', 'white'), - title = ('dark magenta', 'default'), - - # Status bar & heading - heading = ('white', 'black'), - heading_key = ('dark blue', 'black'), - heading_inactive = ('black', 'light gray'), - - # Help - key = ('dark blue', 'default'), - head = ('black', 'default'), - text = ('dark gray', 'default'), - - # Options - option_selected = ('black', 'light gray'), - option_selected_key = ('dark blue', 'light gray'), - option_active = ('light red', 'default'), - option_active_selected = ('light red', 'light gray'), - - # List and Connections - method = ('dark cyan', 'default'), - focus = ('black', 'default'), - - code_200 = ('dark green', 'default'), - code_300 = ('light blue', 'default'), - code_400 = ('dark red', 'default'), - code_500 = ('dark red', 'default'), - code_other = ('light red', 'default'), - - error = ('light red', 'default'), - - header = ('dark blue', 'default'), - highlight = ('black,bold', 'default'), - intercept = ('brown', 'default'), - replay = ('dark green', 'default'), - mark = ('dark red', 'default'), - - # Hex view - offset = ('dark blue', 'default'), - - # Grid Editor - focusfield = ('black', 'light gray'), - focusfield_error = ('dark red', 'light gray'), - field_error = ('dark red', 'black'), - editfield = ('black', 'default'), - ) - - -class Light(LowLight): - high = dict( - background = ('black', 'g100'), - heading = ('g99', '#08f'), - heading_key = ('#0ff,bold', '#08f'), - heading_inactive = ('g35', 'g85'), - replay = ('#0a0,bold', 'default'), - - option_selected = ('black', 'g85'), - option_selected_key = ('dark blue', 'g85'), - option_active_selected = ('light red', 'g85'), - ) - - -# Solarized palette in Urwid-style terminal high-colour offsets -# See: http://ethanschoonover.com/solarized -sol_base03 = "h234" -sol_base02 = "h235" -sol_base01 = "h240" -sol_base00 = "h241" -sol_base0 = "h244" -sol_base1 = "h245" -sol_base2 = "h254" -sol_base3 = "h230" -sol_yellow = "h136" -sol_orange = "h166" -sol_red = "h160" -sol_magenta = "h125" -sol_violet = "h61" -sol_blue = "h33" -sol_cyan = "h37" -sol_green = "h64" - - -class SolarizedLight(LowLight): - high = dict( - background = (sol_base00, sol_base3), - title = (sol_cyan, 'default'), - text = (sol_base00, 'default'), - - # Status bar & heading - heading = (sol_base2, sol_base02), - heading_key = (sol_blue, sol_base03), - heading_inactive = (sol_base03, sol_base1), - - # Help - key = (sol_blue, 'default',), - head = (sol_base00, 'default'), - - # Options - option_selected = (sol_base03, sol_base2), - option_selected_key = (sol_blue, sol_base2), - option_active = (sol_orange, 'default'), - option_active_selected = (sol_orange, sol_base2), - - # List and Connections - method = (sol_cyan, 'default'), - focus = (sol_base01, 'default'), - - code_200 = (sol_green, 'default'), - code_300 = (sol_blue, 'default'), - code_400 = (sol_orange, 'default',), - code_500 = (sol_red, 'default'), - code_other = (sol_magenta, 'default'), - - error = (sol_red, 'default'), - - header = (sol_blue, 'default'), - highlight = (sol_base01, 'default'), - intercept = (sol_red, 'default',), - replay = (sol_green, 'default',), - - # Hex view - offset = (sol_cyan, 'default'), - - # Grid Editor - focusfield = (sol_base00, sol_base2), - focusfield_error = (sol_red, sol_base2), - field_error = (sol_red, 'default'), - editfield = (sol_base01, 'default'), - ) - - -class SolarizedDark(LowDark): - high = dict( - background = (sol_base2, sol_base03), - title = (sol_blue, 'default'), - text = (sol_base1, 'default'), - - # Status bar & heading - heading = (sol_base2, sol_base01), - heading_key = (sol_blue + ",bold", sol_base01), - heading_inactive = (sol_base1, sol_base02), - - # Help - key = (sol_blue, 'default',), - head = (sol_base2, 'default'), - - # Options - option_selected = (sol_base03, sol_base00), - option_selected_key = (sol_blue, sol_base00), - option_active = (sol_orange, 'default'), - option_active_selected = (sol_orange, sol_base00), - - # List and Connections - method = (sol_cyan, 'default'), - focus = (sol_base1, 'default'), - - code_200 = (sol_green, 'default'), - code_300 = (sol_blue, 'default'), - code_400 = (sol_orange, 'default',), - code_500 = (sol_red, 'default'), - code_other = (sol_magenta, 'default'), - - error = (sol_red, 'default'), - - header = (sol_blue, 'default'), - highlight = (sol_base01, 'default'), - intercept = (sol_red, 'default',), - replay = (sol_green, 'default',), - - # Hex view - offset = (sol_cyan, 'default'), - - # Grid Editor - focusfield = (sol_base0, sol_base02), - focusfield_error = (sol_red, sol_base02), - field_error = (sol_red, 'default'), - editfield = (sol_base1, 'default'), - ) - - -DEFAULT = "dark" -palettes = { - "lowlight": LowLight(), - "lowdark": LowDark(), - "light": Light(), - "dark": Dark(), - "solarized_light": SolarizedLight(), - "solarized_dark": SolarizedDark(), -} diff --git a/libmproxy/console/pathedit.py b/libmproxy/console/pathedit.py deleted file mode 100644 index 4447070b..00000000 --- a/libmproxy/console/pathedit.py +++ /dev/null @@ -1,71 +0,0 @@ -import glob -import os.path - -import urwid - - -class _PathCompleter: - - def __init__(self, _testing=False): - """ - _testing: disables reloading of the lookup table to make testing - possible. - """ - self.lookup, self.offset = None, None - self.final = None - self._testing = _testing - - def reset(self): - self.lookup = None - self.offset = -1 - - def complete(self, txt): - """ - Returns the next completion for txt, or None if there is no - completion. - """ - path = os.path.expanduser(txt) - if not self.lookup: - if not self._testing: - # Lookup is a set of (display value, actual value) tuples. - self.lookup = [] - if os.path.isdir(path): - files = glob.glob(os.path.join(path, "*")) - prefix = txt - else: - files = glob.glob(path + "*") - prefix = os.path.dirname(txt) - prefix = prefix or "./" - for f in files: - display = os.path.join(prefix, os.path.basename(f)) - if os.path.isdir(f): - display += "/" - self.lookup.append((display, f)) - if not self.lookup: - self.final = path - return path - self.lookup.sort() - self.offset = -1 - self.lookup.append((txt, txt)) - self.offset += 1 - if self.offset >= len(self.lookup): - self.offset = 0 - ret = self.lookup[self.offset] - self.final = ret[1] - return ret[0] - - -class PathEdit(urwid.Edit, _PathCompleter): - - def __init__(self, *args, **kwargs): - urwid.Edit.__init__(self, *args, **kwargs) - _PathCompleter.__init__(self) - - def keypress(self, size, key): - if key == "tab": - comp = self.complete(self.get_edit_text()) - self.set_edit_text(comp) - self.set_edit_pos(len(comp)) - else: - self.reset() - return urwid.Edit.keypress(self, size, key) diff --git a/libmproxy/console/searchable.py b/libmproxy/console/searchable.py deleted file mode 100644 index cff1f0a1..00000000 --- a/libmproxy/console/searchable.py +++ /dev/null @@ -1,93 +0,0 @@ -import urwid - -from . import signals - - -class Highlight(urwid.AttrMap): - - def __init__(self, t): - urwid.AttrMap.__init__( - self, - urwid.Text(t.text), - "focusfield", - ) - self.backup = t - - -class Searchable(urwid.ListBox): - - def __init__(self, state, contents): - self.walker = urwid.SimpleFocusListWalker(contents) - urwid.ListBox.__init__(self, self.walker) - self.state = state - self.search_offset = 0 - self.current_highlight = None - self.search_term = None - - def keypress(self, size, key): - if key == "/": - signals.status_prompt.send( - prompt = "Search for", - text = "", - callback = self.set_search - ) - elif key == "n": - self.find_next(False) - elif key == "N": - self.find_next(True) - elif key == "g": - self.set_focus(0) - self.walker._modified() - elif key == "G": - self.set_focus(len(self.walker) - 1) - self.walker._modified() - else: - return super(self.__class__, self).keypress(size, key) - - def set_search(self, text): - self.state.last_search = text - self.search_term = text or None - self.find_next(False) - - def set_highlight(self, offset): - if self.current_highlight is not None: - old = self.body[self.current_highlight] - self.body[self.current_highlight] = old.backup - if offset is None: - self.current_highlight = None - else: - self.body[offset] = Highlight(self.body[offset]) - self.current_highlight = offset - - def get_text(self, w): - if isinstance(w, urwid.Text): - return w.text - elif isinstance(w, Highlight): - return w.backup.text - else: - return None - - def find_next(self, backwards): - if not self.search_term: - if self.state.last_search: - self.search_term = self.state.last_search - else: - self.set_highlight(None) - return - # Start search at focus + 1 - if backwards: - rng = xrange(len(self.body) - 1, -1, -1) - else: - rng = xrange(1, len(self.body) + 1) - for i in rng: - off = (self.focus_position + i) % len(self.body) - w = self.body[off] - txt = self.get_text(w) - if txt and self.search_term in txt: - self.set_highlight(off) - self.set_focus(off, coming_from="above") - self.body._modified() - return - else: - self.set_highlight(None) - signals.status_message.send(message="Search not found.", expire=1) diff --git a/libmproxy/console/select.py b/libmproxy/console/select.py deleted file mode 100644 index 928a7ca5..00000000 --- a/libmproxy/console/select.py +++ /dev/null @@ -1,120 +0,0 @@ -import urwid - -from . import common - - -class _OptionWidget(urwid.WidgetWrap): - - def __init__(self, option, text, shortcut, active, focus): - self.option = option - textattr = "text" - keyattr = "key" - if focus and active: - textattr = "option_active_selected" - keyattr = "option_selected_key" - elif focus: - textattr = "option_selected" - keyattr = "option_selected_key" - elif active: - textattr = "option_active" - if shortcut: - text = common.highlight_key( - text, - shortcut, - textattr = textattr, - keyattr = keyattr - ) - opt = urwid.Text(text, align="left") - opt = urwid.AttrWrap(opt, textattr) - opt = urwid.Padding(opt, align = "center", width = 40) - urwid.WidgetWrap.__init__(self, opt) - - def keypress(self, size, key): - return key - - def selectable(self): - return True - - -class OptionWalker(urwid.ListWalker): - - def __init__(self, options): - urwid.ListWalker.__init__(self) - self.options = options - self.focus = 0 - - def set_focus(self, pos): - self.focus = pos - - def get_focus(self): - return self.options[self.focus].render(True), self.focus - - def get_next(self, pos): - if pos >= len(self.options) - 1: - return None, None - return self.options[pos + 1].render(False), pos + 1 - - def get_prev(self, pos): - if pos <= 0: - return None, None - return self.options[pos - 1].render(False), pos - 1 - - -class Heading: - - def __init__(self, text): - self.text = text - - def render(self, focus): - opt = urwid.Text("\n" + self.text, align="left") - opt = urwid.AttrWrap(opt, "title") - opt = urwid.Padding(opt, align = "center", width = 40) - return opt - - -_neg = lambda: False - - -class Option: - - def __init__(self, text, shortcut, getstate=None, activate=None): - self.text = text - self.shortcut = shortcut - self.getstate = getstate or _neg - self.activate = activate or _neg - - def render(self, focus): - return _OptionWidget( - self, - self.text, - self.shortcut, - self.getstate(), - focus) - - -class Select(urwid.ListBox): - - def __init__(self, options): - self.walker = OptionWalker(options) - urwid.ListBox.__init__( - self, - self.walker - ) - self.options = options - self.keymap = {} - for i in options: - if hasattr(i, "shortcut") and i.shortcut: - if i.shortcut in self.keymap: - raise ValueError("Duplicate shortcut key: %s" % i.shortcut) - self.keymap[i.shortcut] = i - - def keypress(self, size, key): - if key == "enter" or key == " ": - self.get_focus()[0].option.activate() - return None - key = common.shortcuts(key) - if key in self.keymap: - self.keymap[key].activate() - self.set_focus(self.options.index(self.keymap[key])) - return None - return super(self.__class__, self).keypress(size, key) diff --git a/libmproxy/console/signals.py b/libmproxy/console/signals.py deleted file mode 100644 index 6a439bf3..00000000 --- a/libmproxy/console/signals.py +++ /dev/null @@ -1,43 +0,0 @@ -import blinker - -# Show a status message in the action bar -sig_add_event = blinker.Signal() - - -def add_event(e, level): - sig_add_event.send( - None, - e=e, - level=level - ) - -# Show a status message in the action bar -status_message = blinker.Signal() - -# Prompt for input -status_prompt = blinker.Signal() - -# Prompt for a path -status_prompt_path = blinker.Signal() - -# Prompt for a single keystroke -status_prompt_onekey = blinker.Signal() - -# Call a callback in N seconds -call_in = blinker.Signal() - -# Focus the body, footer or header of the main window -focus = blinker.Signal() - -# Fired when settings change -update_settings = blinker.Signal() - -# Fired when a flow changes -flow_change = blinker.Signal() - -# Fired when the flow list or focus changes -flowlist_change = blinker.Signal() - -# Pop and push view state onto a stack -pop_view_state = blinker.Signal() -push_view_state = blinker.Signal() diff --git a/libmproxy/console/statusbar.py b/libmproxy/console/statusbar.py deleted file mode 100644 index 4cc63a54..00000000 --- a/libmproxy/console/statusbar.py +++ /dev/null @@ -1,258 +0,0 @@ -import os.path - -import urwid - -import netlib.utils -from . import pathedit, signals, common - - -class ActionBar(urwid.WidgetWrap): - - def __init__(self): - urwid.WidgetWrap.__init__(self, None) - self.clear() - signals.status_message.connect(self.sig_message) - signals.status_prompt.connect(self.sig_prompt) - signals.status_prompt_path.connect(self.sig_path_prompt) - signals.status_prompt_onekey.connect(self.sig_prompt_onekey) - - self.last_path = "" - - self.prompting = False - self.onekey = False - self.pathprompt = False - - def sig_message(self, sender, message, expire=None): - w = urwid.Text(message) - self._w = w - self.prompting = False - if expire: - def cb(*args): - if w == self._w: - self.clear() - signals.call_in.send(seconds=expire, callback=cb) - - def prep_prompt(self, p): - return p.strip() + ": " - - def sig_prompt(self, sender, prompt, text, callback, args=()): - signals.focus.send(self, section="footer") - self._w = urwid.Edit(self.prep_prompt(prompt), text or "") - self.prompting = (callback, args) - - def sig_path_prompt(self, sender, prompt, callback, args=()): - signals.focus.send(self, section="footer") - self._w = pathedit.PathEdit( - self.prep_prompt(prompt), - os.path.dirname(self.last_path) - ) - self.pathprompt = True - self.prompting = (callback, args) - - def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()): - """ - Keys are a set of (word, key) tuples. The appropriate key in the - word is highlighted. - """ - signals.focus.send(self, section="footer") - prompt = [prompt, " ("] - mkup = [] - for i, e in enumerate(keys): - mkup.extend(common.highlight_key(e[0], e[1])) - if i < len(keys) - 1: - mkup.append(",") - prompt.extend(mkup) - prompt.append(")? ") - self.onekey = set(i[1] for i in keys) - self._w = urwid.Edit(prompt, "") - self.prompting = (callback, args) - - def selectable(self): - return True - - def keypress(self, size, k): - if self.prompting: - if k == "esc": - self.prompt_done() - elif self.onekey: - if k == "enter": - self.prompt_done() - elif k in self.onekey: - self.prompt_execute(k) - elif k == "enter": - self.prompt_execute(self._w.get_edit_text()) - else: - if common.is_keypress(k): - self._w.keypress(size, k) - else: - return k - - def clear(self): - self._w = urwid.Text("") - self.prompting = False - - def prompt_done(self): - self.prompting = False - self.onekey = False - self.pathprompt = False - signals.status_message.send(message="") - signals.focus.send(self, section="body") - - def prompt_execute(self, txt): - if self.pathprompt: - self.last_path = txt - p, args = self.prompting - self.prompt_done() - msg = p(txt, *args) - if msg: - signals.status_message.send(message=msg, expire=1) - - -class StatusBar(urwid.WidgetWrap): - - def __init__(self, master, helptext): - self.master, self.helptext = master, helptext - self.ab = ActionBar() - self.ib = urwid.WidgetWrap(urwid.Text("")) - self._w = urwid.Pile([self.ib, self.ab]) - signals.update_settings.connect(self.sig_update_settings) - signals.flowlist_change.connect(self.sig_update_settings) - self.redraw() - - def sig_update_settings(self, sender): - self.redraw() - - def keypress(self, *args, **kwargs): - return self.ab.keypress(*args, **kwargs) - - def get_status(self): - r = [] - - if self.master.setheaders.count(): - r.append("[") - r.append(("heading_key", "H")) - r.append("eaders]") - if self.master.replacehooks.count(): - r.append("[") - r.append(("heading_key", "R")) - r.append("eplacing]") - if self.master.client_playback: - r.append("[") - r.append(("heading_key", "cplayback")) - r.append(":%s to go]" % self.master.client_playback.count()) - if self.master.server_playback: - r.append("[") - r.append(("heading_key", "splayback")) - if self.master.nopop: - r.append(":%s in file]" % self.master.server_playback.count()) - else: - r.append(":%s to go]" % self.master.server_playback.count()) - if self.master.get_ignore_filter(): - r.append("[") - r.append(("heading_key", "I")) - r.append("gnore:%d]" % len(self.master.get_ignore_filter())) - if self.master.get_tcp_filter(): - r.append("[") - r.append(("heading_key", "T")) - r.append("CP:%d]" % len(self.master.get_tcp_filter())) - if self.master.state.intercept_txt: - r.append("[") - r.append(("heading_key", "i")) - r.append(":%s]" % self.master.state.intercept_txt) - if self.master.state.limit_txt: - r.append("[") - r.append(("heading_key", "l")) - r.append(":%s]" % self.master.state.limit_txt) - if self.master.stickycookie_txt: - r.append("[") - r.append(("heading_key", "t")) - r.append(":%s]" % self.master.stickycookie_txt) - if self.master.stickyauth_txt: - r.append("[") - r.append(("heading_key", "u")) - r.append(":%s]" % self.master.stickyauth_txt) - if self.master.state.default_body_view.name != "Auto": - r.append("[") - r.append(("heading_key", "M")) - r.append(":%s]" % self.master.state.default_body_view.name) - - opts = [] - if self.master.anticache: - opts.append("anticache") - if self.master.anticomp: - opts.append("anticomp") - if self.master.showhost: - opts.append("showhost") - if not self.master.refresh_server_playback: - opts.append("norefresh") - if self.master.killextra: - opts.append("killextra") - if self.master.server.config.no_upstream_cert: - opts.append("no-upstream-cert") - if self.master.state.follow_focus: - opts.append("following") - if self.master.stream_large_bodies: - opts.append( - "stream:%s" % netlib.utils.pretty_size( - self.master.stream_large_bodies.max_size - ) - ) - - if opts: - r.append("[%s]" % (":".join(opts))) - - if self.master.server.config.mode in ["reverse", "upstream"]: - dst = self.master.server.config.upstream_server - r.append("[dest:%s]" % netlib.utils.unparse_url( - dst.scheme, - dst.address.host, - dst.address.port - )) - if self.master.scripts: - r.append("[") - r.append(("heading_key", "s")) - r.append("cripts:%s]" % len(self.master.scripts)) - # r.append("[lt:%0.3f]"%self.master.looptime) - - if self.master.stream: - r.append("[W:%s]" % self.master.stream_path) - - return r - - def redraw(self): - fc = self.master.state.flow_count() - if self.master.state.focus is None: - offset = 0 - else: - offset = min(self.master.state.focus + 1, fc) - t = [ - ('heading', ("[%s/%s]" % (offset, fc)).ljust(9)) - ] - - if self.master.server.bound: - host = self.master.server.address.host - if host == "0.0.0.0": - host = "*" - boundaddr = "[%s:%s]" % (host, self.master.server.address.port) - else: - boundaddr = "" - t.extend(self.get_status()) - status = urwid.AttrWrap(urwid.Columns([ - urwid.Text(t), - urwid.Text( - [ - self.helptext, - boundaddr - ], - align="right" - ), - ]), "heading") - self.ib._w = status - - def update(self, text): - self.helptext = text - self.redraw() - self.master.loop.draw_screen() - - def selectable(self): - return True diff --git a/libmproxy/console/tabs.py b/libmproxy/console/tabs.py deleted file mode 100644 index b5423038..00000000 --- a/libmproxy/console/tabs.py +++ /dev/null @@ -1,70 +0,0 @@ -import urwid - - -class Tab(urwid.WidgetWrap): - - def __init__(self, offset, content, attr, onclick): - """ - onclick is called on click with the tab offset as argument - """ - p = urwid.Text(content, align="center") - p = urwid.Padding(p, align="center", width=("relative", 100)) - p = urwid.AttrWrap(p, attr) - urwid.WidgetWrap.__init__(self, p) - self.offset = offset - self.onclick = onclick - - def mouse_event(self, size, event, button, col, row, focus): - if event == "mouse press" and button == 1: - self.onclick(self.offset) - return True - - -class Tabs(urwid.WidgetWrap): - - def __init__(self, tabs, tab_offset=0): - urwid.WidgetWrap.__init__(self, "") - self.tab_offset = tab_offset - self.tabs = tabs - self.show() - - def change_tab(self, offset): - self.tab_offset = offset - self.show() - - def keypress(self, size, key): - n = len(self.tabs) - if key in ["tab", "l"]: - self.change_tab((self.tab_offset + 1) % n) - elif key == "h": - self.change_tab((self.tab_offset - 1) % n) - return self._w.keypress(size, key) - - def show(self): - headers = [] - for i in range(len(self.tabs)): - txt = self.tabs[i][0]() - if i == self.tab_offset: - headers.append( - Tab( - i, - txt, - "heading", - self.change_tab - ) - ) - else: - headers.append( - Tab( - i, - txt, - "heading_inactive", - self.change_tab - ) - ) - headers = urwid.Columns(headers, dividechars=1) - self._w = urwid.Frame( - body = self.tabs[self.tab_offset][1](), - header = headers - ) - self._w.set_focus("body") diff --git a/libmproxy/console/window.py b/libmproxy/console/window.py deleted file mode 100644 index 47c284e4..00000000 --- a/libmproxy/console/window.py +++ /dev/null @@ -1,90 +0,0 @@ -import urwid -from . import signals - - -class Window(urwid.Frame): - - def __init__(self, master, body, header, footer, helpctx): - urwid.Frame.__init__( - self, - urwid.AttrWrap(body, "background"), - header = urwid.AttrWrap(header, "background") if header else None, - footer = urwid.AttrWrap(footer, "background") if footer else None - ) - self.master = master - self.helpctx = helpctx - signals.focus.connect(self.sig_focus) - - def sig_focus(self, sender, section): - self.focus_position = section - - def mouse_event(self, *args, **kwargs): - # args: (size, event, button, col, row) - k = super(self.__class__, self).mouse_event(*args, **kwargs) - if not k: - if args[1] == "mouse drag": - signals.status_message.send( - message = "Hold down shift, alt or ctrl to select text.", - expire = 1 - ) - elif args[1] == "mouse press" and args[2] == 4: - self.keypress(args[0], "up") - elif args[1] == "mouse press" and args[2] == 5: - self.keypress(args[0], "down") - else: - return False - return True - - def keypress(self, size, k): - k = super(self.__class__, self).keypress(size, k) - if k == "?": - self.master.view_help(self.helpctx) - elif k == "c": - if not self.master.client_playback: - signals.status_prompt_path.send( - self, - prompt = "Client replay", - callback = self.master.client_playback_path - ) - else: - signals.status_prompt_onekey.send( - self, - prompt = "Stop current client replay?", - keys = ( - ("yes", "y"), - ("no", "n"), - ), - callback = self.master.stop_client_playback_prompt, - ) - elif k == "i": - signals.status_prompt.send( - self, - prompt = "Intercept filter", - text = self.master.state.intercept_txt, - callback = self.master.set_intercept - ) - elif k == "o": - self.master.view_options() - elif k == "Q": - raise urwid.ExitMainLoop - elif k == "q": - signals.pop_view_state.send(self) - elif k == "S": - if not self.master.server_playback: - signals.status_prompt_path.send( - self, - prompt = "Server replay path", - callback = self.master.server_playback_path - ) - else: - signals.status_prompt_onekey.send( - self, - prompt = "Stop current server replay?", - keys = ( - ("yes", "y"), - ("no", "n"), - ), - callback = self.master.stop_server_playback_prompt, - ) - else: - return k |