diff options
author | Aldo Cortesi <aldo@nullcube.com> | 2012-03-24 14:02:41 +1300 |
---|---|---|
committer | Aldo Cortesi <aldo@nullcube.com> | 2012-03-24 14:20:24 +1300 |
commit | 62e51018d0b7041d49e42b8e7d9b602ece356456 (patch) | |
tree | 59d798308ed40a20ac9489c35514a4298d61406f /libmproxy | |
parent | 0d05068f911adf619522b67c49c7a1fe24ecf70c (diff) | |
download | mitmproxy-62e51018d0b7041d49e42b8e7d9b602ece356456.tar.gz mitmproxy-62e51018d0b7041d49e42b8e7d9b602ece356456.tar.bz2 mitmproxy-62e51018d0b7041d49e42b8e7d9b602ece356456.zip |
Refactor pretty view mechanism.
Also start adding unit tests for this subsystem.
Diffstat (limited to 'libmproxy')
-rw-r--r-- | libmproxy/console/contentview.py | 166 | ||||
-rw-r--r-- | libmproxy/console/flowview.py | 8 | ||||
-rw-r--r-- | libmproxy/flow.py | 6 |
3 files changed, 95 insertions, 85 deletions
diff --git a/libmproxy/console/contentview.py b/libmproxy/console/contentview.py index 446eb350..c6e43a27 100644 --- a/libmproxy/console/contentview.py +++ b/libmproxy/console/contentview.py @@ -1,6 +1,6 @@ import urwid import common -from .. import utils, encoding +from .. import utils, encoding, flow VIEW_CUTOFF = 1024*100 @@ -20,19 +20,18 @@ VIEW_CONTENT_PRETTY_TYPE_XML = 2 VIEW_CONTENT_PRETTY_TYPE_URLENCODED = 3 CONTENT_PRETTY_NAMES = { - VIEW_CONTENT_PRETTY_TYPE_JSON: "json", - VIEW_CONTENT_PRETTY_TYPE_XML: "xmlish", - VIEW_CONTENT_PRETTY_TYPE_URLENCODED: "urlencoded" + VIEW_CONTENT_PRETTY_TYPE_JSON: "JSON", + VIEW_CONTENT_PRETTY_TYPE_XML: "XML", + VIEW_CONTENT_PRETTY_TYPE_URLENCODED: "URL-encoded" } -CONTENT_PRETTY_TYPES = { +CONTENT_TYPES_MAP = { "text/html": VIEW_CONTENT_PRETTY_TYPE_XML, "application/json": VIEW_CONTENT_PRETTY_TYPE_JSON, "text/xml": VIEW_CONTENT_PRETTY_TYPE_XML, "multipart/form-data": VIEW_CONTENT_PRETTY_TYPE_URLENCODED } - def trailer(clen, txt): rem = clen - VIEW_CUTOFF if rem > 0: @@ -45,16 +44,18 @@ def trailer(clen, txt): ) ) -def view_flow_raw(content): + +def view_raw(hdrs, content): txt = [] for i in utils.cleanBin(content[:VIEW_CUTOFF]).splitlines(): txt.append( urwid.Text(("text", i)) ) trailer(len(content), txt) - return txt + return "Raw", txt + -def view_flow_binary(content): +def view_hex(hdrs, content): txt = [] for offset, hexa, s in utils.hexdump(content[:VIEW_CUTOFF]): txt.append(urwid.Text([ @@ -65,31 +66,37 @@ def view_flow_binary(content): ("text", s), ])) trailer(len(content), txt) - return txt + return "HEX", txt -def view_flow_xmlish(content): + +def view_xmlish(hdrs, content): txt = [] for i in utils.pretty_xmlish(content[:VIEW_CUTOFF]): txt.append( urwid.Text(("text", i)), ) trailer(len(content), txt) - return txt + return "XML-like data", txt + + +def view_json(hdrs, content): + lines = utils.pretty_json(content) + if lines: + txt = [] + sofar = 0 + for i in lines: + sofar += len(i) + txt.append( + urwid.Text(("text", i)), + ) + if sofar > VIEW_CUTOFF: + break + trailer(sum(len(i) for i in lines), txt) + return "JSON", txt -def view_flow_json(lines): - txt = [] - sofar = 0 - for i in lines: - sofar += len(i) - txt.append( - urwid.Text(("text", i)), - ) - if sofar > VIEW_CUTOFF: - break - trailer(sum(len(i) for i in lines), txt) - return txt -def view_flow_formdata(content, boundary): +# FIXME +def view_formdata(content, boundary): rx = re.compile(r'\bname="([^"]+)"') keys = [] vals = [] @@ -113,65 +120,66 @@ def view_flow_formdata(content, boundary): )) return r -def view_flow_urlencoded(lines): - return common.format_keyvals( - [(k+":", v) for (k, v) in lines], - key = "header", - val = "text" - ) - -def find_pretty_view(content, hdrItems, pretty_type=VIEW_CONTENT_PRETTY_TYPE_AUTO): - ctype = None - if pretty_type == VIEW_CONTENT_PRETTY_TYPE_AUTO: - pretty_type == None - for i in hdrItems: - if i[0].lower() == "content-type": - ctype = i[1] - break - ct = utils.parse_content_type(ctype) if ctype else None - if ct: - pretty_type = CONTENT_PRETTY_TYPES.get("%s/%s"%(ct[0], ct[1])) - if not pretty_type and utils.isXML(content): - pretty_type = VIEW_CONTENT_PRETTY_TYPE_XML - - if pretty_type == VIEW_CONTENT_PRETTY_TYPE_URLENCODED: - data = utils.urldecode(content) - if data: - return "URLEncoded form", view_flow_urlencoded(data) - - if pretty_type == VIEW_CONTENT_PRETTY_TYPE_XML: - return "Indented XML-ish", view_flow_xmlish(content) - if pretty_type == VIEW_CONTENT_PRETTY_TYPE_JSON: - lines = utils.pretty_json(content) - if lines: - return "JSON", view_flow_json(lines) +def view_urlencoded(hdrs, content): + lines = utils.urldecode(content) + if lines: + body = common.format_keyvals( + [(k+":", v) for (k, v) in lines], + key = "header", + val = "text" + ) + return "URLEncoded form", body - return "Falling back to raw.", view_flow_raw(content) +PRETTY_FUNCTION_MAP = { + VIEW_CONTENT_PRETTY_TYPE_XML: view_xmlish, + VIEW_CONTENT_PRETTY_TYPE_JSON: view_json, + VIEW_CONTENT_PRETTY_TYPE_URLENCODED: view_urlencoded +} -def get_content_view(viewmode, pretty_type, enc, content, hdrItems): +def get_view_func(viewmode, pretty_type, hdrs, content): """ - Returns a (msg, body) tuple. + Returns a function object. """ - msg = "" if viewmode == VIEW_CONTENT_HEX: - body = view_flow_binary(content) - elif viewmode == VIEW_CONTENT_PRETTY: - emsg = "" - if enc: - decoded = encoding.decode(enc, content) - if decoded: - content = decoded - if enc and enc != "identity": - emsg = "[decoded %s]"%enc - msg, body = find_pretty_view(content, hdrItems, pretty_type) - if pretty_type != VIEW_CONTENT_PRETTY_TYPE_AUTO: - emsg += " (forced to %s)"%(CONTENT_PRETTY_NAMES[pretty_type]) - if emsg: - msg = emsg + " " + msg + return view_hex + elif viewmode == VIEW_CONTENT_RAW: + return view_raw else: - body = view_flow_raw(content) - return msg, body - - + if pretty_type == VIEW_CONTENT_PRETTY_TYPE_AUTO: + ctype = hdrs.get("content-type") + if ctype: + ctype = ctype[0] + ct = utils.parse_content_type(ctype) if ctype else None + if ct: + pretty_type = CONTENT_TYPES_MAP.get("%s/%s"%(ct[0], ct[1])) + if not pretty_type and utils.isXML(content): + pretty_type = VIEW_CONTENT_PRETTY_TYPE_XML + return PRETTY_FUNCTION_MAP.get(pretty_type, view_raw) + + +def get_content_view(viewmode, pretty_type, hdrItems, content): + """ + Returns a (msg, body) tuple. + """ + msg = [] + + hdrs = flow.ODictCaseless([list(i) for i in hdrItems]) + + enc = hdrs.get("content-encoding") + if enc and enc[0] != "identity": + decoded = encoding.decode(enc[0], content) + if decoded: + content = decoded + msg.append("[decoded %s]"%enc[0]) + + if viewmode == VIEW_CONTENT_PRETTY and pretty_type != VIEW_CONTENT_PRETTY_TYPE_AUTO: + msg.append("[forced to %s]"%(CONTENT_PRETTY_NAMES[pretty_type])) + func = get_view_func(viewmode, pretty_type, hdrs, content) + + ret = func(hdrs, content) + if not ret: + ret = view_raw(hdrs, content) + msg.append(ret[0]) + return " ".join(msg), ret[1] diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py index 95fc6d67..3a6f02d5 100644 --- a/libmproxy/console/flowview.py +++ b/libmproxy/console/flowview.py @@ -116,15 +116,14 @@ class FlowView(common.WWrap): else: self.view_request() - def _cached_conn_text(self, e, content, hdrItems, viewmode, pretty_type): + def _cached_conn_text(self, content, hdrItems, viewmode, pretty_type): txt = common.format_keyvals( [(h+":", v) for (h, v) in hdrItems], key = "header", val = "text" ) - if content: - msg, body = contentview.get_content_view(viewmode, pretty_type, e, content, hdrItems) + msg, body = contentview.get_content_view(viewmode, pretty_type, hdrItems, content) title = urwid.AttrWrap(urwid.Columns([ urwid.Text( [ @@ -180,11 +179,8 @@ class FlowView(common.WWrap): return f def _conn_text(self, conn, viewmode, pretty_type): - e = conn.headers["content-encoding"] - e = e[0] if e else None return cache.callback( self, "_cached_conn_text", - e, conn.content, tuple(tuple(i) for i in conn.headers.lst), viewmode, diff --git a/libmproxy/flow.py b/libmproxy/flow.py index 5ff12b5b..bb079a8d 100644 --- a/libmproxy/flow.py +++ b/libmproxy/flow.py @@ -181,6 +181,12 @@ class ODict: def add(self, key, value): self.lst.append([key, str(value)]) + def get(self, k, d=None): + if k in self: + return self[k] + else: + return d + def _get_state(self): return [tuple(i) for i in self.lst] |