diff options
-rw-r--r-- | mitmproxy/addons/cut.py | 79 | ||||
-rw-r--r-- | mitmproxy/command.py | 10 | ||||
-rw-r--r-- | mitmproxy/tools/console/defaultkeys.py | 2 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_cut.py | 103 | ||||
-rw-r--r-- | test/mitmproxy/test_command.py | 2 |
5 files changed, 78 insertions, 118 deletions
diff --git a/mitmproxy/addons/cut.py b/mitmproxy/addons/cut.py index 5ec4c99e..6b9dc723 100644 --- a/mitmproxy/addons/cut.py +++ b/mitmproxy/addons/cut.py @@ -56,49 +56,37 @@ def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]: return str(current or "") -def parse_cutspec(s: str) -> typing.Tuple[str, typing.Sequence[str]]: - """ - Returns (flowspec, [cuts]). - - Raises exceptions.CommandError if input is invalid. - """ - parts = s.split("|", maxsplit=1) - flowspec = "@all" - if len(parts) == 2: - flowspec = parts[1].strip() - cuts = parts[0] - cutparts = [i.strip() for i in cuts.split(",") if i.strip()] - if len(cutparts) == 0: - raise exceptions.CommandError("Invalid cut specification.") - return flowspec, cutparts - - class Cut: @command.command("cut") - def cut(self, cutspec: str) -> command.Cuts: + def cut( + self, + flows: typing.Sequence[flow.Flow], + cuts: typing.Sequence[command.Cut] + ) -> command.Cuts: """ - Resolve a cut specification of the form "cuts|flowspec". The cuts - are a comma-separated list of cut snippets. Cut snippets are - attribute paths from the base of the flow object, with a few - conveniences - "q", "s", "cc" and "sc" are shortcuts for request, - response, client_conn and server_conn, "port" and "host" retrieve - parts of an address tuple, ".header[key]" retrieves a header value. - Return values converted sensibly: SSL certicates are converted to PEM + Cut data from a set of flows. Cut specifications are attribute paths + from the base of the flow object, with a few conveniences - "q", + "s", "cc" and "sc" are shortcuts for request, response, client_conn + and server_conn, "port" and "host" retrieve parts of an address + tuple, ".header[key]" retrieves a header value. Return values + converted to strings or bytes: SSL certicates are converted to PEM format, bools are "true" or "false", "bytes" are preserved, and all - other values are converted to strings. The flowspec is optional, and - if it is not specified, it is assumed to be @all. + other values are converted to strings. """ - flowspec, cuts = parse_cutspec(cutspec) - flows = ctx.master.commands.call_args("view.resolve", [flowspec]) ret = [] for f in flows: ret.append([extract(c, f) for c in cuts]) return ret @command.command("cut.save") - def save(self, cuts: command.Cuts, path: command.Path) -> None: + def save( + self, + flows: typing.Sequence[flow.Flow], + cuts: typing.Sequence[command.Cut], + path: command.Path + ) -> None: """ - Save cuts to file. If there are multiple rows or columns, the format + Save cuts to file. If there are multiple flows or cuts, the format is UTF-8 encoded CSV. If there is exactly one row and one column, the data is written to file as-is, with raw bytes preserved. If the path is prefixed with a "+", values are appended if there is an @@ -108,30 +96,37 @@ class Cut: if path.startswith("+"): append = True path = command.Path(path[1:]) - if len(cuts) == 1 and len(cuts[0]) == 1: + if len(cuts) == 1 and len(flows) == 1: with open(path, "ab" if append else "wb") as fp: if fp.tell() > 0: # We're appending to a file that already exists and has content fp.write(b"\n") - v = cuts[0][0] - if isinstance(v, bytes): - fp.write(v) - else: - fp.write(v.encode("utf8")) + for v in [extract(cuts[0], f) for f in flows]: + if isinstance(v, bytes): + fp.write(v) + else: + fp.write(v.encode("utf8")) ctx.log.alert("Saved single cut.") else: with open(path, "a" if append else "w", newline='', encoding="utf8") as fp: writer = csv.writer(fp) - for r in cuts: + for f in flows: + vals = [extract(c, f) for c in cuts] writer.writerow( - [strutils.always_str(c) or "" for c in r] # type: ignore + [strutils.always_str(x) or "" for x in vals] # type: ignore ) - ctx.log.alert("Saved %s cuts as CSV." % len(cuts)) + ctx.log.alert("Saved %s cuts over %d flows as CSV." % (len(cuts), len(flows))) @command.command("cut.clip") - def clip(self, cuts: command.Cuts) -> None: + def clip( + self, + flows: typing.Sequence[flow.Flow], + cuts: typing.Sequence[command.Cut], + ) -> None: """ - Send cuts to the system clipboard. + Send cuts to the clipboard. If there are multiple flows or cuts, the + format is UTF-8 encoded CSV. If there is exactly one row and one + column, the data is written to file as-is, with raw bytes preserved. """ fp = io.StringIO(newline="") if len(cuts) == 1 and len(cuts[0]) == 1: diff --git a/mitmproxy/command.py b/mitmproxy/command.py index 05caf261..82bad4fa 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -29,6 +29,10 @@ Cuts = typing.Sequence[ ] +class Cut(str): + pass + + class Path(str): pass @@ -52,8 +56,10 @@ def typename(t: type, ret: bool) -> str: return "[flow]" if ret else "flowspec" elif t == typing.Sequence[str]: return "[str]" + elif t == typing.Sequence[Cut]: + return "[cut]" elif t == Cuts: - return "[cuts]" if ret else "cutspec" + return "[cuts]" elif t == flow.Flow: return "flow" elif issubclass(t, (str, int, bool)): @@ -264,7 +270,7 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any: "Command requires one flow, specification matched %s." % len(flows) ) return flows[0] - elif argtype == typing.Sequence[str]: + elif argtype in (typing.Sequence[str], typing.Sequence[Cut]): return [i.strip() for i in spec.split(",")] else: raise exceptions.CommandError("Unsupported argument type: %s" % argtype) diff --git a/mitmproxy/tools/console/defaultkeys.py b/mitmproxy/tools/console/defaultkeys.py index c4a44aca..b845a3ae 100644 --- a/mitmproxy/tools/console/defaultkeys.py +++ b/mitmproxy/tools/console/defaultkeys.py @@ -31,7 +31,7 @@ def map(km): km.add("A", "flow.resume @all", ["flowlist", "flowview"], "Resume all intercepted flows") km.add("a", "flow.resume @focus", ["flowlist", "flowview"], "Resume this intercepted flow") km.add( - "b", "console.command cut.save s.content|@focus ''", + "b", "console.command cut.save @focus s.content ", ["flowlist", "flowview"], "Save response body to file" ) diff --git a/test/mitmproxy/addons/test_cut.py b/test/mitmproxy/addons/test_cut.py index 242c6c2f..bb3e1c2d 100644 --- a/test/mitmproxy/addons/test_cut.py +++ b/test/mitmproxy/addons/test_cut.py @@ -56,42 +56,6 @@ def test_extract(): assert "CERTIFICATE" in cut.extract("sc.cert", tf) -def test_parse_cutspec(): - tests = [ - ("", None, True), - ("req.method", ("@all", ["req.method"]), False), - ( - "req.method,req.host", - ("@all", ["req.method", "req.host"]), - False - ), - ( - "req.method,req.host|~b foo", - ("~b foo", ["req.method", "req.host"]), - False - ), - ( - "req.method,req.host|~b foo | ~b bar", - ("~b foo | ~b bar", ["req.method", "req.host"]), - False - ), - ( - "req.method, req.host | ~b foo | ~b bar", - ("~b foo | ~b bar", ["req.method", "req.host"]), - False - ), - ] - for cutspec, output, err in tests: - try: - assert cut.parse_cutspec(cutspec) == output - except exceptions.CommandError: - if not err: - raise - else: - if err: - raise AssertionError("Expected error.") - - def test_headername(): with pytest.raises(exceptions.CommandError): cut.headername("header[foo.") @@ -110,69 +74,64 @@ def test_cut_clip(): v.add([tflow.tflow(resp=True)]) with mock.patch('pyperclip.copy') as pc: - tctx.command(c.clip, "q.method|@all") + tctx.command(c.clip, "@all", "q.method") assert pc.called with mock.patch('pyperclip.copy') as pc: - tctx.command(c.clip, "q.content|@all") + tctx.command(c.clip, "@all", "q.content") assert pc.called with mock.patch('pyperclip.copy') as pc: - tctx.command(c.clip, "q.method,q.content|@all") + tctx.command(c.clip, "@all", "q.method,q.content") assert pc.called -def test_cut_file(tmpdir): +def test_cut_save(tmpdir): f = str(tmpdir.join("path")) v = view.View() c = cut.Cut() with taddons.context() as tctx: tctx.master.addons.add(v, c) - v.add([tflow.tflow(resp=True)]) - tctx.command(c.save, "q.method|@all", f) + tctx.command(c.save, "@all", "q.method", f) assert qr(f) == b"GET" - tctx.command(c.save, "q.content|@all", f) + tctx.command(c.save, "@all", "q.content", f) assert qr(f) == b"content" - tctx.command(c.save, "q.content|@all", "+" + f) + tctx.command(c.save, "@all", "q.content", "+" + f) assert qr(f) == b"content\ncontent" v.add([tflow.tflow(resp=True)]) - tctx.command(c.save, "q.method|@all", f) + tctx.command(c.save, "@all", "q.method", f) assert qr(f).splitlines() == [b"GET", b"GET"] - tctx.command(c.save, "q.method,q.content|@all", f) + tctx.command(c.save, "@all", "q.method,q.content", f) assert qr(f).splitlines() == [b"GET,content", b"GET,content"] def test_cut(): - v = view.View() c = cut.Cut() - with taddons.context() as tctx: - v.add([tflow.tflow(resp=True)]) - tctx.master.addons.add(v, c) - assert c.cut("q.method|@all") == [["GET"]] - assert c.cut("q.scheme|@all") == [["http"]] - assert c.cut("q.host|@all") == [["address"]] - assert c.cut("q.port|@all") == [["22"]] - assert c.cut("q.path|@all") == [["/path"]] - assert c.cut("q.url|@all") == [["http://address:22/path"]] - assert c.cut("q.content|@all") == [[b"content"]] - assert c.cut("q.header[header]|@all") == [["qvalue"]] - assert c.cut("q.header[unknown]|@all") == [[""]] - - assert c.cut("s.status_code|@all") == [["200"]] - assert c.cut("s.reason|@all") == [["OK"]] - assert c.cut("s.content|@all") == [[b"message"]] - assert c.cut("s.header[header-response]|@all") == [["svalue"]] - assert c.cut("moo") == [[""]] + with taddons.context(): + tflows = [tflow.tflow(resp=True)] + assert c.cut(tflows, ["q.method"]) == [["GET"]] + assert c.cut(tflows, ["q.scheme"]) == [["http"]] + assert c.cut(tflows, ["q.host"]) == [["address"]] + assert c.cut(tflows, ["q.port"]) == [["22"]] + assert c.cut(tflows, ["q.path"]) == [["/path"]] + assert c.cut(tflows, ["q.url"]) == [["http://address:22/path"]] + assert c.cut(tflows, ["q.content"]) == [[b"content"]] + assert c.cut(tflows, ["q.header[header]"]) == [["qvalue"]] + assert c.cut(tflows, ["q.header[unknown]"]) == [[""]] + + assert c.cut(tflows, ["s.status_code"]) == [["200"]] + assert c.cut(tflows, ["s.reason"]) == [["OK"]] + assert c.cut(tflows, ["s.content"]) == [[b"message"]] + assert c.cut(tflows, ["s.header[header-response]"]) == [["svalue"]] + assert c.cut(tflows, ["moo"]) == [[""]] with pytest.raises(exceptions.CommandError): - assert c.cut("__dict__") == [[""]] + assert c.cut(tflows, ["__dict__"]) == [[""]] - v = view.View() c = cut.Cut() - with taddons.context() as tctx: - tctx.master.addons.add(v, c) - v.add([tflow.ttcpflow()]) - assert c.cut("q.method|@all") == [[""]] - assert c.cut("s.status|@all") == [[""]] + with taddons.context(): + tflows = [tflow.ttcpflow()] + assert c.cut(tflows, ["q.method"]) == [[""]] + assert c.cut(tflows, ["s.status"]) == [[""]] diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index 298b34fb..066cbf15 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -155,8 +155,8 @@ def test_typename(): assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]" assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec" - assert command.typename(command.Cuts, False) == "cutspec" assert command.typename(command.Cuts, True) == "[cuts]" + assert command.typename(typing.Sequence[command.Cut], False) == "[cut]" assert command.typename(flow.Flow, False) == "flow" assert command.typename(typing.Sequence[str], False) == "[str]" |