diff options
Diffstat (limited to 'libmproxy/contentviews.py')
-rw-r--r-- | libmproxy/contentviews.py | 577 |
1 files changed, 577 insertions, 0 deletions
diff --git a/libmproxy/contentviews.py b/libmproxy/contentviews.py new file mode 100644 index 00000000..9af08033 --- /dev/null +++ b/libmproxy/contentviews.py @@ -0,0 +1,577 @@ +""" +Mitmproxy Content Views +======================= + +mitmproxy includes a set of content views which can be used to format/decode/highlight data. +While they are currently used for HTTP message bodies only, the may be used in other contexts +in the future, e.g. to decode protobuf messages sent as WebSocket frames. + +Thus, the View API is very minimalistic. The only arguments are `data` and `**metadata`, +where `data` is the actual content (as bytes). The contents on metadata depend on the protocol in +use. For HTTP, the message headers are passed as the ``headers`` keyword argument. + +""" +from __future__ import (absolute_import, print_function, division) +import cStringIO +import json +import logging +import subprocess +import sys + +import lxml.html +import lxml.etree +from PIL import Image +from PIL.ExifTags import TAGS +import html2text +import six + +from netlib.odict import ODict +from netlib import encoding +from netlib.utils import clean_bin, hexdump, urldecode, multipartdecode, parse_content_type + +from . import utils +from .exceptions import ContentViewException +from .contrib import jsbeautifier +from .contrib.wbxml.ASCommandResponse import ASCommandResponse + +try: + import pyamf + from pyamf import remoting, flex +except ImportError: # pragma nocover + pyamf = None + +try: + import cssutils +except ImportError: # pragma nocover + cssutils = None +else: + cssutils.log.setLevel(logging.CRITICAL) + + cssutils.ser.prefs.keepComments = True + cssutils.ser.prefs.omitLastSemicolon = False + cssutils.ser.prefs.indentClosingBrace = False + cssutils.ser.prefs.validOnly = False + +# Default view cutoff *in lines* +VIEW_CUTOFF = 512 + +KEY_MAX = 30 + + +def format_dict(d): + """ + Helper function that transforms the given dictionary into a list of + ("key", key ) + ("value", value) + tuples, where key is padded to a uniform width. + """ + max_key_len = max(len(k) for k in d.keys()) + max_key_len = min(max_key_len, KEY_MAX) + for key, value in d.items(): + key += ":" + key = key.ljust(max_key_len + 2) + yield [ + ("header", key), + ("text", value) + ] + + +def format_text(text): + """ + Helper function that transforms bytes into the view output format. + """ + for line in text.splitlines(): + yield [("text", line)] + + +class View(object): + name = None + prompt = () + content_types = [] + + def __call__(self, data, **metadata): + """ + Transform raw data into human-readable output. + + Args: + data: the data to decode/format as bytes. + metadata: optional keyword-only arguments for metadata. Implementations must not + rely on a given argument being present. + + Returns: + A (description, content generator) tuple. + + The content generator yields lists of (style, text) tuples, where each list represents + a single line. ``text`` is a unfiltered byte string which may need to be escaped, + depending on the used output. + + Caveats: + The content generator must not yield tuples of tuples, + because urwid cannot process that. You have to yield a *list* of tuples per line. + """ + raise NotImplementedError() + + +class ViewAuto(View): + name = "Auto" + prompt = ("auto", "a") + content_types = [] + + def __call__(self, data, **metadata): + headers = metadata.get("headers", {}) + ctype = headers.get("content-type") + if ctype: + ct = parse_content_type(ctype) if ctype else None + ct = "%s/%s" % (ct[0], ct[1]) + if ct in content_types_map: + return content_types_map[ct][0](data, **metadata) + elif utils.isXML(data): + return get("XML")(data, **metadata) + if utils.isMostlyBin(data): + return get("Hex")(data) + return get("Raw")(data) + + +class ViewRaw(View): + name = "Raw" + prompt = ("raw", "r") + content_types = [] + + def __call__(self, data, **metadata): + return "Raw", format_text(data) + + +class ViewHex(View): + name = "Hex" + prompt = ("hex", "e") + content_types = [] + + @staticmethod + def _format(data): + for offset, hexa, s in hexdump(data): + yield [ + ("offset", offset + " "), + ("text", hexa + " "), + ("text", s) + ] + + def __call__(self, data, **metadata): + return "Hex", self._format(data) + + +class ViewXML(View): + name = "XML" + prompt = ("xml", "x") + content_types = ["text/xml"] + + def __call__(self, data, **metadata): + parser = lxml.etree.XMLParser( + remove_blank_text=True, + resolve_entities=False, + strip_cdata=False, + recover=False + ) + try: + document = lxml.etree.fromstring(data, parser) + except lxml.etree.XMLSyntaxError: + return None + docinfo = document.getroottree().docinfo + + prev = [] + p = document.getroottree().getroot().getprevious() + while p is not None: + prev.insert( + 0, + lxml.etree.tostring(p) + ) + p = p.getprevious() + doctype = docinfo.doctype + if prev: + doctype += "\n".join(prev).strip() + doctype = doctype.strip() + + s = lxml.etree.tostring( + document, + pretty_print=True, + xml_declaration=True, + doctype=doctype or None, + encoding=docinfo.encoding + ) + + return "XML-like data", format_text(s) + + +class ViewJSON(View): + name = "JSON" + prompt = ("json", "s") + content_types = ["application/json"] + + def __call__(self, data, **metadata): + pretty_json = utils.pretty_json(data) + if pretty_json: + return "JSON", format_text(pretty_json) + + +class ViewHTML(View): + name = "HTML" + prompt = ("html", "h") + content_types = ["text/html"] + + def __call__(self, data, **metadata): + if utils.isXML(data): + parser = lxml.etree.HTMLParser( + strip_cdata=True, + remove_blank_text=True + ) + d = lxml.html.fromstring(data, parser=parser) + docinfo = d.getroottree().docinfo + s = lxml.etree.tostring( + d, + pretty_print=True, + doctype=docinfo.doctype + ) + return "HTML", format_text(s) + + +class ViewHTMLOutline(View): + name = "HTML Outline" + prompt = ("html outline", "o") + content_types = ["text/html"] + + def __call__(self, data, **metadata): + data = data.decode("utf-8") + h = html2text.HTML2Text(baseurl="") + h.ignore_images = True + h.body_width = 0 + outline = h.handle(data) + return "HTML Outline", format_text(outline) + + +class ViewURLEncoded(View): + name = "URL-encoded" + prompt = ("urlencoded", "u") + content_types = ["application/x-www-form-urlencoded"] + + def __call__(self, data, **metadata): + d = urldecode(data) + return "URLEncoded form", format_dict(ODict(d)) + + +class ViewMultipart(View): + name = "Multipart Form" + prompt = ("multipart", "m") + content_types = ["multipart/form-data"] + + @staticmethod + def _format(v): + yield [("highlight", "Form data:\n")] + for message in format_dict(ODict(v)): + yield message + + def __call__(self, data, **metadata): + headers = metadata.get("headers", {}) + v = multipartdecode(headers, data) + if v: + return "Multipart form", self._format(v) + + +if pyamf: + class DummyObject(dict): + def __init__(self, alias): + dict.__init__(self) + + def __readamf__(self, input): + data = input.readObject() + self["data"] = data + + + def pyamf_class_loader(s): + for i in pyamf.CLASS_LOADERS: + if i != pyamf_class_loader: + v = i(s) + if v: + return v + return DummyObject + + + pyamf.register_class_loader(pyamf_class_loader) + + + class ViewAMF(View): + name = "AMF" + prompt = ("amf", "f") + content_types = ["application/x-amf"] + + def unpack(self, b, seen=set([])): + if hasattr(b, "body"): + return self.unpack(b.body, seen) + if isinstance(b, DummyObject): + if id(b) in seen: + return "<recursion>" + else: + seen.add(id(b)) + for k, v in b.items(): + b[k] = self.unpack(v, seen) + return b + elif isinstance(b, dict): + for k, v in b.items(): + b[k] = self.unpack(v, seen) + return b + elif isinstance(b, list): + return [self.unpack(i) for i in b] + elif isinstance(b, flex.ArrayCollection): + return [self.unpack(i, seen) for i in b] + else: + return b + + def _format(self, envelope): + for target, message in iter(envelope): + if isinstance(message, pyamf.remoting.Request): + yield [ + ("header", "Request: "), + ("text", str(target)), + ] + else: + yield [ + ("header", "Response: "), + ("text", "%s, code %s" % (target, message.status)), + ] + + s = json.dumps(self.unpack(message), indent=4) + for msg in format_text(s): + yield msg + + def __call__(self, data, **metadata): + envelope = remoting.decode(data, strict=False) + if envelope: + return "AMF v%s" % envelope.amfVersion, self._format(envelope) + + +class ViewJavaScript(View): + name = "JavaScript" + prompt = ("javascript", "j") + content_types = [ + "application/x-javascript", + "application/javascript", + "text/javascript" + ] + + def __call__(self, data, **metadata): + opts = jsbeautifier.default_options() + opts.indent_size = 2 + res = jsbeautifier.beautify(data, opts) + return "JavaScript", format_text(res) + + +class ViewCSS(View): + name = "CSS" + prompt = ("css", "c") + content_types = [ + "text/css" + ] + + def __call__(self, data, **metadata): + if cssutils: + sheet = cssutils.parseString(data) + beautified = sheet.cssText + else: + beautified = data + + return "CSS", format_text(beautified) + + +class ViewImage(View): + name = "Image" + prompt = ("image", "i") + content_types = [ + "image/png", + "image/jpeg", + "image/gif", + "image/vnd.microsoft.icon", + "image/x-icon", + ] + + def __call__(self, data, **metadata): + try: + img = Image.open(cStringIO.StringIO(data)) + except IOError: + return None + parts = [ + ("Format", str(img.format_description)), + ("Size", "%s x %s px" % img.size), + ("Mode", str(img.mode)), + ] + for i in sorted(img.info.keys()): + if i != "exif": + parts.append( + (str(i), str(img.info[i])) + ) + if hasattr(img, "_getexif"): + ex = img._getexif() + if ex: + for i in sorted(ex.keys()): + tag = TAGS.get(i, i) + parts.append( + (str(tag), str(ex[i])) + ) + fmt = format_dict(ODict(parts)) + return "%s image" % img.format, fmt + + +class ViewProtobuf(View): + """Human friendly view of protocol buffers + The view uses the protoc compiler to decode the binary + """ + + name = "Protocol Buffer" + prompt = ("protobuf", "p") + content_types = [ + "application/x-protobuf", + "application/x-protobuffer", + ] + + @staticmethod + def is_available(): + try: + p = subprocess.Popen( + ["protoc", "--version"], + stdout=subprocess.PIPE + ) + out, _ = p.communicate() + return out.startswith("libprotoc") + except: + return False + + def decode_protobuf(self, content): + # if Popen raises OSError, it will be caught in + # get_content_view and fall back to Raw + p = subprocess.Popen(['protoc', '--decode_raw'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = p.communicate(input=content) + if out: + return out + else: + return err + + def __call__(self, data, **metadata): + decoded = self.decode_protobuf(data) + return "Protobuf", format_text(decoded) + + +class ViewWBXML(View): + name = "WBXML" + prompt = ("wbxml", "w") + content_types = [ + "application/vnd.wap.wbxml", + "application/vnd.ms-sync.wbxml" + ] + + def __call__(self, data, **metadata): + + try: + parser = ASCommandResponse(data) + parsedContent = parser.xmlString + if parsedContent: + return "WBXML", format_text(parsedContent) + except: + return None + + +views = [ + ViewAuto(), + ViewRaw(), + ViewHex(), + ViewJSON(), + ViewXML(), + ViewWBXML(), + ViewHTML(), + ViewHTMLOutline(), + ViewJavaScript(), + ViewCSS(), + ViewURLEncoded(), + ViewMultipart(), + ViewImage(), +] +if pyamf: + views.append(ViewAMF()) + +if ViewProtobuf.is_available(): + views.append(ViewProtobuf()) + +content_types_map = {} +for i in views: + for ct in i.content_types: + l = content_types_map.setdefault(ct, []) + l.append(i) + +view_prompts = [i.prompt for i in views] + + +def get_by_shortcut(c): + for i in views: + if i.prompt[1] == c: + return i + + +def get(name): + for i in views: + if i.name == name: + return i + + +def safe_to_print(lines, encoding="utf8"): + """ + Wraps a content generator so that each text portion is a *safe to print* unicode string. + """ + for line in lines: + clean_line = [] + for (style, text) in line: + try: + text = clean_bin(text.decode(encoding, "strict")) + except UnicodeDecodeError: + text = clean_bin(text).decode(encoding, "strict") + clean_line.append((style, text)) + yield clean_line + + +def get_content_view(viewmode, data, **metadata): + """ + Args: + viewmode: the view to use. + data, **metadata: arguments passed to View instance. + + Returns: + A (description, content generator) tuple. + In contrast to calling the views directly, text is always safe-to-print unicode. + + Raises: + ContentViewException, if the content view threw an error. + """ + if not data: + return "No content", [] + msg = [] + + headers = metadata.get("headers", {}) + enc = headers.get("content-encoding") + if enc and enc != "identity": + decoded = encoding.decode(enc, data) + if decoded: + data = decoded + msg.append("[decoded %s]" % enc) + try: + ret = viewmode(data, **metadata) + # Third-party viewers can fail in unexpected ways... + except Exception as e: + six.reraise( + ContentViewException, + ContentViewException(str(e)), + sys.exc_info()[2] + ) + if not ret: + ret = get("Raw")(data, **metadata) + msg.append("Couldn't parse: falling back to Raw") + else: + msg.append(ret[0]) + return " ".join(msg), safe_to_print(ret[1]) |