From cdd5a53767e51a6d992bf8d08df2733e7af916b8 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Tue, 7 Feb 2012 16:39:37 +1300 Subject: Refactor console. Split the console implementation out into logical components. --- libmproxy/console/connview.py | 490 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 490 insertions(+) create mode 100644 libmproxy/console/connview.py (limited to 'libmproxy/console/connview.py') diff --git a/libmproxy/console/connview.py b/libmproxy/console/connview.py new file mode 100644 index 00000000..1e9d0c60 --- /dev/null +++ b/libmproxy/console/connview.py @@ -0,0 +1,490 @@ +import urwid +import common +from .. import utils, encoding + +VIEW_CUTOFF = 1024*100 + +class ConnectionViewHeader(common.WWrap): + def __init__(self, master, f): + self.master, self.flow = master, f + self.w = urwid.Text(common.format_flow(f, False, extended=True, padding=0)) + + def refresh_connection(self, f): + if f == self.flow: + self.w = urwid.Text(common.format_flow(f, False, extended=True, padding=0)) + + +class CallbackCache: + @utils.LRUCache(20) + def callback(self, obj, method, *args, **kwargs): + return getattr(obj, method)(*args, **kwargs) +cache = CallbackCache() + + +class ConnectionView(common.WWrap): + REQ = 0 + RESP = 1 + methods = [ + ("get", "g"), + ("post", "p"), + ("put", "u"), + ("head", "h"), + ("trace", "t"), + ("delete", "d"), + ("options", "o"), + ] + def __init__(self, master, state, flow): + self.master, self.state, self.flow = master, state, flow + if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and flow.response: + self.view_response() + else: + self.view_request() + + def _trailer(self, clen, txt): + rem = clen - VIEW_CUTOFF + if rem > 0: + txt.append(urwid.Text("")) + txt.append( + urwid.Text( + [ + ("highlight", "... %s of data not shown"%utils.pretty_size(rem)) + ] + ) + ) + + def _view_conn_raw(self, content): + txt = [] + for i in utils.cleanBin(content[:VIEW_CUTOFF]).splitlines(): + txt.append( + urwid.Text(("text", i)) + ) + self._trailer(len(content), txt) + return txt + + def _view_conn_binary(self, content): + txt = [] + for offset, hexa, s in utils.hexdump(content[:VIEW_CUTOFF]): + txt.append(urwid.Text([ + ("offset", offset), + " ", + ("text", hexa), + " ", + ("text", s), + ])) + self._trailer(len(content), txt) + return txt + + def _view_conn_xmlish(self, content): + txt = [] + for i in utils.pretty_xmlish(content[:VIEW_CUTOFF]): + txt.append( + urwid.Text(("text", i)), + ) + self._trailer(len(content), txt) + return txt + + def _view_conn_json(self, lines): + txt = [] + sofar = 0 + for i in lines: + sofar += len(i) + txt.append( + urwid.Text(("text", i)), + ) + if sofar > VIEW_CUTOFF: + break + self._trailer(sum(len(i) for i in lines), txt) + return txt + + def _view_conn_formdata(self, content, boundary): + rx = re.compile(r'\bname="([^"]+)"') + keys = [] + vals = [] + + for i in content.split("--" + boundary): + parts = i.splitlines() + if len(parts) > 1 and parts[0][0:2] != "--": + match = rx.search(parts[1]) + if match: + keys.append(match.group(1) + ":") + vals.append(utils.cleanBin( + "\n".join(parts[3+parts[2:].index(""):]) + )) + kv = common.format_keyvals( + zip(keys, vals), + key = "header", + val = "text" + ) + return [ + urwid.Text(("highlight", "Form data:\n")), + urwid.Text(kv) + ] + + def _view_conn_urlencoded(self, lines): + kv = common.format_keyvals( + [(k+":", v) for (k, v) in lines], + key = "header", + val = "text" + ) + return [ + urwid.Text(("highlight", "URLencoded data:\n")), + urwid.Text(kv) + ] + + def _find_pretty_view(self, content, hdrItems): + ctype = None + for i in hdrItems: + if i[0].lower() == "content-type": + ctype = i[1] + break + if ctype and "x-www-form-urlencoded" in ctype: + data = utils.urldecode(content) + if data: + return self._view_conn_urlencoded(data) + if utils.isXML(content): + return self._view_conn_xmlish(content) + elif ctype and "application/json" in ctype: + lines = utils.pretty_json(content) + if lines: + return self._view_conn_json(lines) + elif ctype and "multipart/form-data" in ctype: + boundary = ctype.split('boundary=') + if len(boundary) > 1: + return self._view_conn_formdata(content, boundary[1].split(';')[0]) + return self._view_conn_raw(content) + + def _cached_conn_text(self, e, content, hdrItems, viewmode): + hdr = [] + hdr.extend( + common.format_keyvals( + [(h+":", v) for (h, v) in hdrItems], + key = "header", + val = "text" + ) + ) + hdr.append("\n") + + txt = [urwid.Text(hdr)] + if content: + if viewmode == common.VIEW_BODY_HEX: + txt.extend(self._view_conn_binary(content)) + elif viewmode == common.VIEW_BODY_PRETTY: + if e: + decoded = encoding.decode(e, content) + if decoded: + content = decoded + if e and e != "identity": + txt.append( + urwid.Text(("highlight", "Decoded %s data:\n"%e)) + ) + txt.extend(self._find_pretty_view(content, hdrItems)) + else: + txt.extend(self._view_conn_raw(content)) + return urwid.ListBox(txt) + + + + + def _tab(self, content, active): + if active: + attr = "heading" + else: + attr = "inactive" + p = urwid.Text(content) + p = urwid.Padding(p, align="left", width=("relative", 100)) + p = urwid.AttrWrap(p, attr) + return p + + def wrap_body(self, active, body): + parts = [] + + if self.flow.intercepting and not self.flow.request.acked: + qt = "Request (intercepted)" + else: + qt = "Request" + if active == common.VIEW_FLOW_REQUEST: + parts.append(self._tab(qt, True)) + else: + parts.append(self._tab(qt, False)) + + if self.flow.intercepting and not self.flow.response.acked: + st = "Response (intercepted)" + else: + st = "Response" + if active == common.VIEW_FLOW_RESPONSE: + parts.append(self._tab(st, True)) + else: + parts.append(self._tab(st, False)) + + h = urwid.Columns(parts, dividechars=1) + f = urwid.Frame( + body, + header=h + ) + return f + + def _conn_text(self, conn, viewmode): + e = conn.headers["content-encoding"] + e = e[0] if e else None + return cache.callback( + self, "_cached_conn_text", + e, + conn.content, + tuple(tuple(i) for i in conn.headers.lst), + viewmode + ) + + def view_request(self): + self.state.view_flow_mode = common.VIEW_FLOW_REQUEST + self.master.statusbar.update("Calculating view...") + body = self._conn_text( + self.flow.request, + self.state.view_body_mode + ) + self.w = self.wrap_body(common.VIEW_FLOW_REQUEST, body) + self.master.statusbar.update("") + + def view_response(self): + self.state.view_flow_mode = common.VIEW_FLOW_RESPONSE + self.master.statusbar.update("Calculating view...") + if self.flow.response: + body = self._conn_text( + self.flow.response, + self.state.view_body_mode + ) + else: + body = urwid.ListBox( + [ + urwid.Text(""), + urwid.Text( + [ + ("highlight", "No response. Press "), + ("key", "e"), + ("highlight", " and edit any aspect to add one."), + ] + ) + ] + ) + self.w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body) + self.master.statusbar.update("") + + def refresh_connection(self, c=None): + if c == self.flow: + if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response: + self.view_response() + else: + self.view_request() + + def _spawn_editor(self, data): + fd, name = tempfile.mkstemp('', "mproxy") + os.write(fd, data) + os.close(fd) + c = os.environ.get("EDITOR") + #If no EDITOR is set, assume 'vi' + if not c: + c = "vi" + cmd = [c, name] + self.master.ui.stop() + try: + subprocess.call(cmd) + except: + self.master.statusbar.message("Can't start editor: %s" % c) + self.master.ui.start() + os.unlink(name) + return data + self.master.ui.start() + data = open(name).read() + os.unlink(name) + return data + + def edit_method(self, m): + for i in self.methods: + if i[1] == m: + self.flow.request.method = i[0].upper() + self.master.refresh_connection(self.flow) + + def save_body(self, path): + if not path: + return + self.state.last_saveload = path + if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + c = self.flow.request + else: + c = self.flow.response + path = os.path.expanduser(path) + try: + f = file(path, "wb") + f.write(str(c.content)) + f.close() + except IOError, v: + self.master.statusbar.message(v.strerror) + + def set_url(self, url): + request = self.flow.request + if not request.set_url(str(url)): + return "Invalid URL." + self.master.refresh_connection(self.flow) + + def set_resp_code(self, code): + response = self.flow.response + try: + response.code = int(code) + except ValueError: + return None + import BaseHTTPServer + if BaseHTTPServer.BaseHTTPRequestHandler.responses.has_key(int(code)): + response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[int(code)][0] + self.master.refresh_connection(self.flow) + + def set_resp_msg(self, msg): + response = self.flow.response + response.msg = msg + self.master.refresh_connection(self.flow) + + def edit(self, part): + if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + conn = self.flow.request + else: + if not self.flow.response: + self.flow.response = flow.Response(self.flow.request, 200, "OK", flow.Headers(), "") + conn = self.flow.response + + self.flow.backup() + if part == "b": + c = self._spawn_editor(conn.content or "") + conn.content = c.rstrip("\n") + elif part == "h": + self.master.view_kveditor("Editing headers", conn.headers.lst, None) + #headertext = self._spawn_editor(repr(conn.headers)) + #headers = flow.Headers() + #fp = cStringIO.StringIO(headertext) + #headers.read(fp) + #conn.headers = headers + elif part == "u" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + self.master.prompt_edit("URL", conn.get_url(), self.set_url) + elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + self.master.prompt_onekey("Method", self.methods, self.edit_method) + elif part == "c" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: + self.master.prompt_edit("Code", str(conn.code), self.set_resp_code) + elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: + self.master.prompt_edit("Message", conn.msg, self.set_resp_msg) + self.master.refresh_connection(self.flow) + + def keypress(self, size, key): + if key == " ": + self.master.view_next_flow(self.flow) + return key + + key = common.shortcuts(key) + if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + conn = self.flow.request + else: + conn = self.flow.response + + if key == "q": + self.master.view_connlist() + key = None + elif key == "tab": + if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + self.view_response() + else: + self.view_request() + elif key in ("up", "down", "page up", "page down"): + # Why doesn't this just work?? + self.w.body.keypress(size, key) + elif key == "a": + self.flow.accept_intercept() + self.master.view_flow(self.flow) + elif key == "A": + self.master.accept_all() + self.master.view_flow(self.flow) + elif key == "e": + if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + self.master.prompt_onekey( + "Edit request", + ( + ("header", "h"), + ("body", "b"), + ("url", "u"), + ("method", "m"), + ), + self.edit + ) + else: + self.master.prompt_onekey( + "Edit response", + ( + ("code", "c"), + ("message", "m"), + ("header", "h"), + ("body", "b"), + ), + self.edit + ) + key = None + elif key == "p": + self.master.view_prev_flow(self.flow) + elif key == "r": + r = self.master.replay_request(self.flow) + if r: + self.master.statusbar.message(r) + self.master.refresh_connection(self.flow) + elif key == "R": + self.state.revert(self.flow) + self.master.refresh_connection(self.flow) + elif key == "W": + self.master.path_prompt( + "Save this flow: ", + self.state.last_saveload, + self.master.save_one_flow, + self.flow + ) + elif key == "v": + if conn and conn.content: + t = conn.headers["content-type"] or [None] + t = t[0] + self.master.spawn_external_viewer(conn.content, t) + elif key == "b": + if conn: + if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + self.master.path_prompt( + "Save request body: ", + self.state.last_saveload, + self.save_body + ) + else: + self.master.path_prompt( + "Save response body: ", + self.state.last_saveload, + self.save_body + ) + elif key == "|": + self.master.path_prompt( + "Send flow to script: ", self.state.last_script, + self.master.run_script_once, self.flow + ) + elif key == "z": + if conn: + e = conn.headers["content-encoding"] or ["identity"] + if e[0] != "identity": + conn.decode() + else: + self.master.prompt_onekey( + "Select encoding: ", + ( + ("gzip", "z"), + ("deflate", "d"), + ), + self.encode_callback, + conn + ) + self.master.refresh_connection(self.flow) + return key + + def encode_callback(self, key, conn): + encoding_map = { + "z": "gzip", + "d": "deflate", + } + conn.encode(encoding_map[key]) + self.master.refresh_connection(self.flow) -- cgit v1.2.3