aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docs/scripting/overview.rst6
-rw-r--r--examples/simple/add_header.py5
-rw-r--r--examples/simple/add_header_class.py5
-rw-r--r--examples/simple/custom_contentview.py3
-rw-r--r--examples/simple/filter_flows.py6
-rw-r--r--examples/simple/io_read_dumpfile.py1
-rw-r--r--examples/simple/io_write_dumpfile.py8
-rw-r--r--examples/simple/modify_body_inject_iframe.py4
-rw-r--r--examples/simple/modify_form.py5
-rw-r--r--examples/simple/modify_querystring.py5
-rw-r--r--examples/simple/redirect_requests.py3
-rw-r--r--examples/simple/send_reply_from_proxy.py2
-rw-r--r--examples/simple/upsidedownternet.py4
-rw-r--r--examples/simple/wsgi_flask_app.py2
-rw-r--r--mitmproxy/addons/__init__.py4
-rw-r--r--mitmproxy/addons/clientplayback.py47
-rw-r--r--mitmproxy/addons/core.py63
-rw-r--r--mitmproxy/addons/cut.py151
-rw-r--r--mitmproxy/addons/export.py75
-rw-r--r--mitmproxy/addons/script.py33
-rw-r--r--mitmproxy/addons/serverplayback.py46
-rw-r--r--mitmproxy/addons/view.py211
-rw-r--r--mitmproxy/command.py81
-rw-r--r--mitmproxy/eventsequence.py1
-rw-r--r--mitmproxy/export.py183
-rw-r--r--mitmproxy/master.py20
-rw-r--r--mitmproxy/test/tflow.py4
-rw-r--r--mitmproxy/tools/console/commandeditor.py15
-rw-r--r--mitmproxy/tools/console/common.py28
-rw-r--r--mitmproxy/tools/console/flowlist.py176
-rw-r--r--mitmproxy/tools/console/flowview.py47
-rw-r--r--mitmproxy/tools/console/master.py127
-rw-r--r--mitmproxy/tools/main.py6
-rw-r--r--mitmproxy/tools/web/app.py20
-rw-r--r--mitmproxy/types/multidict.py16
-rw-r--r--mitmproxy/utils/strutils.py7
-rw-r--r--mitmproxy/utils/typecheck.py14
-rw-r--r--release/README.md8
-rw-r--r--setup.cfg3
-rw-r--r--test/mitmproxy/addons/test_clientplayback.py19
-rw-r--r--test/mitmproxy/addons/test_core.py46
-rw-r--r--test/mitmproxy/addons/test_cut.py178
-rw-r--r--test/mitmproxy/addons/test_export.py109
-rw-r--r--test/mitmproxy/addons/test_script.py70
-rw-r--r--test/mitmproxy/addons/test_serverplayback.py12
-rw-r--r--test/mitmproxy/addons/test_view.py150
-rw-r--r--test/mitmproxy/console/test_flowlist.py8
-rw-r--r--test/mitmproxy/data/test_flow_export/locust_get.py35
-rw-r--r--test/mitmproxy/data/test_flow_export/locust_patch.py37
-rw-r--r--test/mitmproxy/data/test_flow_export/locust_post.py26
-rw-r--r--test/mitmproxy/data/test_flow_export/locust_task_get.py20
-rw-r--r--test/mitmproxy/data/test_flow_export/locust_task_patch.py22
-rw-r--r--test/mitmproxy/data/test_flow_export/locust_task_post.py11
-rw-r--r--test/mitmproxy/data/test_flow_export/python_get.py9
-rw-r--r--test/mitmproxy/data/test_flow_export/python_patch.py10
-rw-r--r--test/mitmproxy/data/test_flow_export/python_post.py17
-rw-r--r--test/mitmproxy/data/test_flow_export/python_post_json.py9
-rw-r--r--test/mitmproxy/test_command.py79
-rw-r--r--test/mitmproxy/test_connections.py2
-rw-r--r--test/mitmproxy/test_export.py133
-rw-r--r--test/mitmproxy/test_flow.py4
-rw-r--r--test/mitmproxy/tools/web/test_app.py10
-rw-r--r--test/mitmproxy/utils/test_typecheck.py26
63 files changed, 1395 insertions, 1092 deletions
diff --git a/docs/scripting/overview.rst b/docs/scripting/overview.rst
index c333a98b..5ceb5da3 100644
--- a/docs/scripting/overview.rst
+++ b/docs/scripting/overview.rst
@@ -29,6 +29,12 @@ will be added to all responses passing through the proxy:
>>> mitmdump -s add_header.py
+Examples
+--------
+
+A collection of addons that demonstrate popular features can be found at :src:`examples/simple`.
+
+
Using classes
-------------
diff --git a/examples/simple/add_header.py b/examples/simple/add_header.py
index 3e0b5f1e..64fc6267 100644
--- a/examples/simple/add_header.py
+++ b/examples/simple/add_header.py
@@ -1,2 +1,5 @@
-def response(flow):
+from mitmproxy import http
+
+
+def response(flow: http.HTTPFlow) -> None:
flow.response.headers["newheader"] = "foo"
diff --git a/examples/simple/add_header_class.py b/examples/simple/add_header_class.py
index 5d5c7902..419c99ac 100644
--- a/examples/simple/add_header_class.py
+++ b/examples/simple/add_header_class.py
@@ -1,5 +1,8 @@
+from mitmproxy import http
+
+
class AddHeader:
- def response(self, flow):
+ def response(self, flow: http.HTTPFlow) -> None:
flow.response.headers["newheader"] = "foo"
diff --git a/examples/simple/custom_contentview.py b/examples/simple/custom_contentview.py
index 34fa5541..289e1efe 100644
--- a/examples/simple/custom_contentview.py
+++ b/examples/simple/custom_contentview.py
@@ -3,6 +3,7 @@ This example shows how one can add a custom contentview to mitmproxy.
The content view API is explained in the mitmproxy.contentviews module.
"""
from mitmproxy import contentviews
+from typing import Tuple, Iterable, AnyStr, List
class ViewSwapCase(contentviews.View):
@@ -13,7 +14,7 @@ class ViewSwapCase(contentviews.View):
prompt = ("swap case text", "z")
content_types = ["text/plain"]
- def __call__(self, data: bytes, **metadata):
+ def __call__(self, data: bytes, **metadata) -> Tuple[str, Iterable[List[Tuple[str, AnyStr]]]]:
return "case-swapped text", contentviews.format_text(data.swapcase())
diff --git a/examples/simple/filter_flows.py b/examples/simple/filter_flows.py
index fd49425a..70979591 100644
--- a/examples/simple/filter_flows.py
+++ b/examples/simple/filter_flows.py
@@ -2,12 +2,12 @@
This scripts demonstrates how to use mitmproxy's filter pattern in scripts.
"""
from mitmproxy import flowfilter
-from mitmproxy import ctx
+from mitmproxy import ctx, http
class Filter:
def __init__(self):
- self.filter = None
+ self.filter = None # type: flowfilter.TFilter
def configure(self, updated):
self.filter = flowfilter.parse(ctx.options.flowfilter)
@@ -17,7 +17,7 @@ class Filter:
"flowfilter", str, "", "Check that flow matches filter."
)
- def response(self, flow):
+ def response(self, flow: http.HTTPFlow) -> None:
if flowfilter.match(self.filter, flow):
print("Flow matches filter:")
print(flow)
diff --git a/examples/simple/io_read_dumpfile.py b/examples/simple/io_read_dumpfile.py
index edbbe2dd..87d37c0f 100644
--- a/examples/simple/io_read_dumpfile.py
+++ b/examples/simple/io_read_dumpfile.py
@@ -8,6 +8,7 @@ from mitmproxy.exceptions import FlowReadException
import pprint
import sys
+
with open(sys.argv[1], "rb") as logfile:
freader = io.FlowReader(logfile)
pp = pprint.PrettyPrinter(indent=4)
diff --git a/examples/simple/io_write_dumpfile.py b/examples/simple/io_write_dumpfile.py
index a0956e33..7c4c6a7a 100644
--- a/examples/simple/io_write_dumpfile.py
+++ b/examples/simple/io_write_dumpfile.py
@@ -7,18 +7,18 @@ to multiple files in parallel.
"""
import random
import sys
-from mitmproxy import io
+from mitmproxy import io, http
class Writer:
- def __init__(self, path):
+ def __init__(self, path: str) -> None:
if path == "-":
- f = sys.stdout
+ f = sys.stdout # type: io.TextIO
else:
f = open(path, "wb")
self.w = io.FlowWriter(f)
- def response(self, flow):
+ def response(self, flow: http.HTTPFlow) -> None:
if random.choice([True, False]):
self.w.add(flow)
diff --git a/examples/simple/modify_body_inject_iframe.py b/examples/simple/modify_body_inject_iframe.py
index dff72afa..595bd9f2 100644
--- a/examples/simple/modify_body_inject_iframe.py
+++ b/examples/simple/modify_body_inject_iframe.py
@@ -1,6 +1,6 @@
# (this script works best with --anticache)
from bs4 import BeautifulSoup
-from mitmproxy import ctx
+from mitmproxy import ctx, http
class Injector:
@@ -9,7 +9,7 @@ class Injector:
"iframe", str, "", "IFrame to inject"
)
- def response(self, flow):
+ def response(self, flow: http.HTTPFlow) -> None:
if ctx.options.iframe:
html = BeautifulSoup(flow.response.content, "html.parser")
if html.body:
diff --git a/examples/simple/modify_form.py b/examples/simple/modify_form.py
index b425efb0..8742a976 100644
--- a/examples/simple/modify_form.py
+++ b/examples/simple/modify_form.py
@@ -1,4 +1,7 @@
-def request(flow):
+from mitmproxy import http
+
+
+def request(flow: http.HTTPFlow) -> None:
if flow.request.urlencoded_form:
# If there's already a form, one can just add items to the dict:
flow.request.urlencoded_form["mitmproxy"] = "rocks"
diff --git a/examples/simple/modify_querystring.py b/examples/simple/modify_querystring.py
index ee8a89ad..12b16fda 100644
--- a/examples/simple/modify_querystring.py
+++ b/examples/simple/modify_querystring.py
@@ -1,2 +1,5 @@
-def request(flow):
+from mitmproxy import http
+
+
+def request(flow: http.HTTPFlow) -> None:
flow.request.query["mitmproxy"] = "rocks"
diff --git a/examples/simple/redirect_requests.py b/examples/simple/redirect_requests.py
index 51876df7..ddb89961 100644
--- a/examples/simple/redirect_requests.py
+++ b/examples/simple/redirect_requests.py
@@ -1,9 +1,10 @@
"""
This example shows two ways to redirect flows to another server.
"""
+from mitmproxy import http
-def request(flow):
+def request(flow: http.HTTPFlow) -> None:
# pretty_host takes the "Host" header of the request into account,
# which is useful in transparent mode where we usually only have the IP
# otherwise.
diff --git a/examples/simple/send_reply_from_proxy.py b/examples/simple/send_reply_from_proxy.py
index bef2e7e7..5011fd2e 100644
--- a/examples/simple/send_reply_from_proxy.py
+++ b/examples/simple/send_reply_from_proxy.py
@@ -5,7 +5,7 @@ without sending any data to the remote server.
from mitmproxy import http
-def request(flow):
+def request(flow: http.HTTPFlow) -> None:
# pretty_url takes the "Host" header of the request into account, which
# is useful in transparent mode where we usually only have the IP otherwise.
diff --git a/examples/simple/upsidedownternet.py b/examples/simple/upsidedownternet.py
index 8ba450ab..f150a5c3 100644
--- a/examples/simple/upsidedownternet.py
+++ b/examples/simple/upsidedownternet.py
@@ -2,11 +2,11 @@
This script rotates all images passing through the proxy by 180 degrees.
"""
import io
-
from PIL import Image
+from mitmproxy import http
-def response(flow):
+def response(flow: http.HTTPFlow) -> None:
if flow.response.headers.get("content-type", "").startswith("image"):
s = io.BytesIO(flow.response.content)
img = Image.open(s).rotate(180)
diff --git a/examples/simple/wsgi_flask_app.py b/examples/simple/wsgi_flask_app.py
index a03ad4c5..4be38000 100644
--- a/examples/simple/wsgi_flask_app.py
+++ b/examples/simple/wsgi_flask_app.py
@@ -10,7 +10,7 @@ app = Flask("proxapp")
@app.route('/')
-def hello_world():
+def hello_world() -> str:
return 'Hello World!'
diff --git a/mitmproxy/addons/__init__.py b/mitmproxy/addons/__init__.py
index fa194fc1..783a2c94 100644
--- a/mitmproxy/addons/__init__.py
+++ b/mitmproxy/addons/__init__.py
@@ -5,7 +5,9 @@ from mitmproxy.addons import check_ca
from mitmproxy.addons import clientplayback
from mitmproxy.addons import core_option_validation
from mitmproxy.addons import core
+from mitmproxy.addons import cut
from mitmproxy.addons import disable_h2c
+from mitmproxy.addons import export
from mitmproxy.addons import onboarding
from mitmproxy.addons import proxyauth
from mitmproxy.addons import replace
@@ -28,7 +30,9 @@ def default_addons():
check_alpn.CheckALPN(),
check_ca.CheckCA(),
clientplayback.ClientPlayback(),
+ cut.Cut(),
disable_h2c.DisableH2C(),
+ export.Export(),
onboarding.Onboarding(),
proxyauth.ProxyAuth(),
replace.Replace(),
diff --git a/mitmproxy/addons/clientplayback.py b/mitmproxy/addons/clientplayback.py
index acb77bb2..0db6d336 100644
--- a/mitmproxy/addons/clientplayback.py
+++ b/mitmproxy/addons/clientplayback.py
@@ -2,6 +2,7 @@ from mitmproxy import exceptions
from mitmproxy import ctx
from mitmproxy import io
from mitmproxy import flow
+from mitmproxy import command
import typing
@@ -11,32 +12,54 @@ class ClientPlayback:
self.flows = None
self.current_thread = None
self.has_replayed = False
+ self.configured = False
def count(self) -> int:
if self.flows:
return len(self.flows)
return 0
- def load(self, flows: typing.Sequence[flow.Flow]):
+ @command.command("replay.client.stop")
+ def stop_replay(self) -> None:
+ """
+ Stop client replay.
+ """
+ self.flows = []
+ ctx.master.addons.trigger("update", [])
+
+ @command.command("replay.client")
+ def start_replay(self, flows: typing.Sequence[flow.Flow]) -> None:
+ """
+ Replay requests from flows.
+ """
+ self.flows = flows
+ ctx.master.addons.trigger("update", [])
+
+ @command.command("replay.client.file")
+ def load_file(self, path: str) -> None:
+ try:
+ flows = io.read_flows_from_paths([path])
+ except exceptions.FlowReadException as e:
+ raise exceptions.CommandError(str(e))
self.flows = flows
def configure(self, updated):
- if "client_replay" in updated:
- if ctx.options.client_replay:
- ctx.log.info("Client Replay: {}".format(ctx.options.client_replay))
- try:
- flows = io.read_flows_from_paths(ctx.options.client_replay)
- except exceptions.FlowReadException as e:
- raise exceptions.OptionsError(str(e))
- self.load(flows)
- else:
- self.flows = None
+ if not self.configured and ctx.options.client_replay:
+ self.configured = True
+ ctx.log.info("Client Replay: {}".format(ctx.options.client_replay))
+ try:
+ flows = io.read_flows_from_paths(ctx.options.client_replay)
+ except exceptions.FlowReadException as e:
+ raise exceptions.OptionsError(str(e))
+ self.start_replay(flows)
def tick(self):
if self.current_thread and not self.current_thread.is_alive():
self.current_thread = None
if self.flows and not self.current_thread:
- self.current_thread = ctx.master.replay_request(self.flows.pop(0))
+ f = self.flows.pop(0)
+ self.current_thread = ctx.master.replay_request(f)
+ ctx.master.addons.trigger("update", [f])
self.has_replayed = True
if self.has_replayed:
if not self.flows and not self.current_thread:
diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py
index 3f9cb15e..b482edbb 100644
--- a/mitmproxy/addons/core.py
+++ b/mitmproxy/addons/core.py
@@ -1,6 +1,9 @@
+import typing
+
from mitmproxy import ctx
from mitmproxy import exceptions
from mitmproxy import command
+from mitmproxy import flow
class Core:
@@ -16,3 +19,63 @@ class Core:
ctx.options.set(spec)
except exceptions.OptionsError as e:
raise exceptions.CommandError(e) from e
+
+ @command.command("flow.resume")
+ def resume(self, flows: typing.Sequence[flow.Flow]) -> None:
+ """
+ Resume flows if they are intercepted.
+ """
+ intercepted = [i for i in flows if i.intercepted]
+ for f in intercepted:
+ f.resume()
+ ctx.master.addons.trigger("update", intercepted)
+
+ # FIXME: this will become view.mark later
+ @command.command("flow.mark")
+ def mark(self, flows: typing.Sequence[flow.Flow], val: bool) -> None:
+ """
+ Mark flows.
+ """
+ updated = []
+ for i in flows:
+ if i.marked != val:
+ i.marked = val
+ updated.append(i)
+ ctx.master.addons.trigger("update", updated)
+
+ # FIXME: this will become view.mark.toggle later
+ @command.command("flow.mark.toggle")
+ def mark_toggle(self, flows: typing.Sequence[flow.Flow]) -> None:
+ """
+ Toggle mark for flows.
+ """
+ for i in flows:
+ i.marked = not i.marked
+ ctx.master.addons.trigger("update", flows)
+
+ @command.command("flow.kill")
+ def kill(self, flows: typing.Sequence[flow.Flow]) -> None:
+ """
+ Kill running flows.
+ """
+ updated = []
+ for f in flows:
+ if f.killable:
+ f.kill()
+ updated.append(f)
+ ctx.log.alert("Killed %s flows." % len(updated))
+ ctx.master.addons.trigger("update", updated)
+
+ # FIXME: this will become view.revert later
+ @command.command("flow.revert")
+ def revert(self, flows: typing.Sequence[flow.Flow]) -> None:
+ """
+ Revert flow changes.
+ """
+ updated = []
+ for f in flows:
+ if f.modified():
+ f.revert()
+ updated.append(f)
+ ctx.log.alert("Reverted %s flows." % len(updated))
+ ctx.master.addons.trigger("update", updated)
diff --git a/mitmproxy/addons/cut.py b/mitmproxy/addons/cut.py
new file mode 100644
index 00000000..a4a2107b
--- /dev/null
+++ b/mitmproxy/addons/cut.py
@@ -0,0 +1,151 @@
+import io
+import csv
+import typing
+from mitmproxy import command
+from mitmproxy import exceptions
+from mitmproxy import flow
+from mitmproxy import ctx
+from mitmproxy import certs
+from mitmproxy.utils import strutils
+
+import pyperclip
+
+
+def headername(spec: str):
+ if not (spec.startswith("header[") and spec.endswith("]")):
+ raise exceptions.CommandError("Invalid header spec: %s" % spec)
+ return spec[len("header["):-1].strip()
+
+
+flow_shortcuts = {
+ "q": "request",
+ "s": "response",
+ "cc": "client_conn",
+ "sc": "server_conn",
+}
+
+
+def is_addr(v):
+ return isinstance(v, tuple) and len(v) > 1
+
+
+def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]:
+ path = cut.split(".")
+ current = f # type: typing.Any
+ for i, spec in enumerate(path):
+ if spec.startswith("_"):
+ raise exceptions.CommandError("Can't access internal attribute %s" % spec)
+ if isinstance(current, flow.Flow):
+ spec = flow_shortcuts.get(spec, spec)
+
+ part = getattr(current, spec, None)
+ if i == len(path) - 1:
+ if spec == "port" and is_addr(current):
+ return str(current[1])
+ if spec == "host" and is_addr(current):
+ return str(current[0])
+ elif spec.startswith("header["):
+ return current.headers.get(headername(spec), "")
+ elif isinstance(part, bytes):
+ return part
+ elif isinstance(part, bool):
+ return "true" if part else "false"
+ elif isinstance(part, certs.SSLCert):
+ return part.to_pem().decode("ascii")
+ current = part
+ return str(current or "")
+
+
+def parse_cutspec(s: str) -> typing.Tuple[str, typing.Sequence[str]]:
+ """
+ Returns (flowspec, [cuts]).
+
+ Raises exceptions.CommandError if input is invalid.
+ """
+ parts = s.split("|", maxsplit=1)
+ flowspec = "@all"
+ if len(parts) == 2:
+ flowspec = parts[1].strip()
+ cuts = parts[0]
+ cutparts = [i.strip() for i in cuts.split(",") if i.strip()]
+ if len(cutparts) == 0:
+ raise exceptions.CommandError("Invalid cut specification.")
+ return flowspec, cutparts
+
+
+class Cut:
+ @command.command("cut")
+ def cut(self, cutspec: str) -> command.Cuts:
+ """
+ Resolve a cut specification of the form "cuts|flowspec". The cuts
+ are a comma-separated list of cut snippets. Cut snippets are
+ attribute paths from the base of the flow object, with a few
+ conveniences - "q", "s", "cc" and "sc" are shortcuts for request,
+ response, client_conn and server_conn, "port" and "host" retrieve
+ parts of an address tuple, ".header[key]" retrieves a header value.
+ Return values converted sensibly: SSL certicates are converted to PEM
+ format, bools are "true" or "false", "bytes" are preserved, and all
+ other values are converted to strings. The flowspec is optional, and
+ if it is not specified, it is assumed to be @all.
+ """
+ flowspec, cuts = parse_cutspec(cutspec)
+ flows = ctx.master.commands.call_args("view.resolve", [flowspec])
+ ret = []
+ for f in flows:
+ ret.append([extract(c, f) for c in cuts])
+ return ret
+
+ @command.command("cut.save")
+ def save(self, cuts: command.Cuts, path: str) -> None:
+ """
+ Save cuts to file. If there are multiple rows or columns, the format
+ is UTF-8 encoded CSV. If there is exactly one row and one column,
+ the data is written to file as-is, with raw bytes preserved. If the
+ path is prefixed with a "+", values are appended if there is an
+ existing file.
+ """
+ append = False
+ if path.startswith("+"):
+ append = True
+ path = path[1:]
+ if len(cuts) == 1 and len(cuts[0]) == 1:
+ with open(path, "ab" if append else "wb") as fp:
+ if fp.tell() > 0:
+ # We're appending to a file that already exists and has content
+ fp.write(b"\n")
+ v = cuts[0][0]
+ if isinstance(v, bytes):
+ fp.write(v)
+ else:
+ fp.write(v.encode("utf8"))
+ ctx.log.alert("Saved single cut.")
+ else:
+ with open(path, "a" if append else "w", newline='', encoding="utf8") as fp:
+ writer = csv.writer(fp)
+ for r in cuts:
+ writer.writerow(
+ [strutils.always_str(c) or "" for c in r] # type: ignore
+ )
+ ctx.log.alert("Saved %s cuts as CSV." % len(cuts))
+
+ @command.command("cut.clip")
+ def clip(self, cuts: command.Cuts) -> None:
+ """
+ Send cuts to the system clipboard.
+ """
+ fp = io.StringIO(newline="")
+ if len(cuts) == 1 and len(cuts[0]) == 1:
+ v = cuts[0][0]
+ if isinstance(v, bytes):
+ fp.write(strutils.always_str(v))
+ else:
+ fp.write("utf8")
+ ctx.log.alert("Clipped single cut.")
+ else:
+ writer = csv.writer(fp)
+ for r in cuts:
+ writer.writerow(
+ [strutils.always_str(c) or "" for c in r] # type: ignore
+ )
+ ctx.log.alert("Clipped %s cuts as CSV." % len(cuts))
+ pyperclip.copy(fp.getvalue())
diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py
new file mode 100644
index 00000000..fd0c830e
--- /dev/null
+++ b/mitmproxy/addons/export.py
@@ -0,0 +1,75 @@
+import typing
+
+from mitmproxy import command
+from mitmproxy import flow
+from mitmproxy import exceptions
+from mitmproxy.utils import strutils
+from mitmproxy.net.http.http1 import assemble
+
+import pyperclip
+
+
+def curl_command(f: flow.Flow) -> str:
+ if not hasattr(f, "request"):
+ raise exceptions.CommandError("Can't export flow with no request.")
+ data = "curl "
+ request = f.request.copy() # type: ignore
+ request.decode(strict=False)
+ for k, v in request.headers.items(multi=True):
+ data += "-H '%s:%s' " % (k, v)
+ if request.method != "GET":
+ data += "-X %s " % request.method
+ data += "'%s'" % request.url
+ if request.content:
+ data += " --data-binary '%s'" % strutils.bytes_to_escaped_str(
+ request.content,
+ escape_single_quotes=True
+ )
+ return data
+
+
+def raw(f: flow.Flow) -> bytes:
+ if not hasattr(f, "request"):
+ raise exceptions.CommandError("Can't export flow with no request.")
+ return assemble.assemble_request(f.request) # type: ignore
+
+
+formats = dict(
+ curl = curl_command,
+ raw = raw,
+)
+
+
+class Export():
+ @command.command("export.formats")
+ def formats(self) -> typing.Sequence[str]:
+ """
+ Return a list of the supported export formats.
+ """
+ return list(sorted(formats.keys()))
+
+ @command.command("export.file")
+ def file(self, fmt: str, f: flow.Flow, path: str) -> None:
+ """
+ Export a flow to path.
+ """
+ if fmt not in formats:
+ raise exceptions.CommandError("No such export format: %s" % fmt)
+ func = formats[fmt] # type: typing.Any
+ v = func(f)
+ with open(path, "wb") as fp:
+ if isinstance(v, bytes):
+ fp.write(v)
+ else:
+ fp.write(v.encode("utf-8"))
+
+ @command.command("export.clip")
+ def clip(self, fmt: str, f: flow.Flow) -> None:
+ """
+ Export a flow to the system clipboard.
+ """
+ if fmt not in formats:
+ raise exceptions.CommandError("No such export format: %s" % fmt)
+ func = formats[fmt] # type: typing.Any
+ v = strutils.always_str(func(f))
+ pyperclip.copy(v)
diff --git a/mitmproxy/addons/script.py b/mitmproxy/addons/script.py
index 99a8f6a4..e90dd885 100644
--- a/mitmproxy/addons/script.py
+++ b/mitmproxy/addons/script.py
@@ -2,9 +2,13 @@ import os
import importlib
import time
import sys
+import typing
from mitmproxy import addonmanager
from mitmproxy import exceptions
+from mitmproxy import flow
+from mitmproxy import command
+from mitmproxy import eventsequence
from mitmproxy import ctx
@@ -34,10 +38,13 @@ class Script:
def __init__(self, path):
self.name = "scriptmanager:" + path
self.path = path
+ self.fullpath = os.path.expanduser(path)
self.ns = None
self.last_load = 0
self.last_mtime = 0
+ if not os.path.isfile(self.fullpath):
+ raise exceptions.OptionsError("No such script: %s" % path)
@property
def addons(self):
@@ -45,12 +52,12 @@ class Script:
def tick(self):
if time.time() - self.last_load > self.ReloadInterval:
- mtime = os.stat(self.path).st_mtime
+ mtime = os.stat(self.fullpath).st_mtime
if mtime > self.last_mtime:
ctx.log.info("Loading script: %s" % self.path)
if self.ns:
ctx.master.addons.remove(self.ns)
- self.ns = load_script(ctx, self.path)
+ self.ns = load_script(ctx, self.fullpath)
if self.ns:
# We're already running, so we have to explicitly register and
# configure the addon
@@ -76,9 +83,25 @@ class ScriptLoader:
def running(self):
self.is_running = True
- def run_once(self, command, flows):
- # Returning once we have proper commands
- raise NotImplementedError
+ @command.command("script.run")
+ def script_run(self, flows: typing.Sequence[flow.Flow], path: str) -> None:
+ """
+ Run a script on the specified flows. The script is loaded with
+ default options, and all lifecycle events for each flow are
+ simulated.
+ """
+ try:
+ s = Script(path)
+ l = addonmanager.Loader(ctx.master)
+ ctx.master.addons.invoke_addon(s, "load", l)
+ ctx.master.addons.invoke_addon(s, "configure", ctx.options.keys())
+ # Script is loaded on the first tick
+ ctx.master.addons.invoke_addon(s, "tick")
+ for f in flows:
+ for evt, arg in eventsequence.iterate(f):
+ ctx.master.addons.invoke_addon(s, evt, arg)
+ except exceptions.OptionsError as e:
+ raise exceptions.CommandError("Error running script: %s" % e) from e
def configure(self, updated):
if "scripts" in updated:
diff --git a/mitmproxy/addons/serverplayback.py b/mitmproxy/addons/serverplayback.py
index 2255aaf2..927f6e15 100644
--- a/mitmproxy/addons/serverplayback.py
+++ b/mitmproxy/addons/serverplayback.py
@@ -1,11 +1,14 @@
import hashlib
import urllib
+import typing
from typing import Any # noqa
from typing import List # noqa
from mitmproxy import ctx
+from mitmproxy import flow
from mitmproxy import exceptions
from mitmproxy import io
+from mitmproxy import command
class ServerPlayback:
@@ -13,15 +16,35 @@ class ServerPlayback:
self.flowmap = {}
self.stop = False
self.final_flow = None
+ self.configured = False
- def load_flows(self, flows):
+ @command.command("replay.server")
+ def load_flows(self, flows: typing.Sequence[flow.Flow]) -> None:
+ """
+ Replay server responses from flows.
+ """
+ self.flowmap = {}
for i in flows:
- if i.response:
+ if i.response: # type: ignore
l = self.flowmap.setdefault(self._hash(i), [])
l.append(i)
-
- def clear(self):
+ ctx.master.addons.trigger("update", [])
+
+ @command.command("replay.server.file")
+ def load_file(self, path: str) -> None:
+ try:
+ flows = io.read_flows_from_paths([path])
+ except exceptions.FlowReadException as e:
+ raise exceptions.CommandError(str(e))
+ self.load_flows(flows)
+
+ @command.command("replay.server.stop")
+ def clear(self) -> None:
+ """
+ Stop server replay.
+ """
self.flowmap = {}
+ ctx.master.addons.trigger("update", [])
def count(self):
return sum([len(i) for i in self.flowmap.values()])
@@ -90,14 +113,13 @@ class ServerPlayback:
return ret
def configure(self, updated):
- if "server_replay" in updated:
- self.clear()
- if ctx.options.server_replay:
- try:
- flows = io.read_flows_from_paths(ctx.options.server_replay)
- except exceptions.FlowReadException as e:
- raise exceptions.OptionsError(str(e))
- self.load_flows(flows)
+ if not self.configured and ctx.options.server_replay:
+ self.configured = True
+ try:
+ flows = io.read_flows_from_paths(ctx.options.server_replay)
+ except exceptions.FlowReadException as e:
+ raise exceptions.OptionsError(str(e))
+ self.load_flows(flows)
def tick(self):
if self.stop and not self.final_flow.live:
diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py
index f4082abe..c9c9cbed 100644
--- a/mitmproxy/addons/view.py
+++ b/mitmproxy/addons/view.py
@@ -18,7 +18,10 @@ import sortedcontainers
import mitmproxy.flow
from mitmproxy import flowfilter
from mitmproxy import exceptions
+from mitmproxy import command
+from mitmproxy import connections
from mitmproxy import ctx
+from mitmproxy import io
from mitmproxy import http # noqa
# The underlying sorted list implementation expects the sort key to be stable
@@ -199,7 +202,18 @@ class View(collections.Sequence):
self.sig_view_refresh.send(self)
# API
- def toggle_marked(self):
+ @command.command("view.order.options")
+ def order_options(self) -> typing.Sequence[str]:
+ """
+ A list of all the orders we support.
+ """
+ return list(sorted(self.orders.keys()))
+
+ @command.command("view.marked.toggle")
+ def toggle_marked(self) -> None:
+ """
+ Toggle whether to show marked views only.
+ """
self.show_marked = not self.show_marked
self._refilter()
@@ -223,7 +237,7 @@ class View(collections.Sequence):
self.filter = flt or matchall
self._refilter()
- def clear(self):
+ def clear(self) -> None:
"""
Clears both the store and view.
"""
@@ -243,55 +257,19 @@ class View(collections.Sequence):
self._refilter()
self.sig_store_refresh.send(self)
- def add(self, f: mitmproxy.flow.Flow) -> None:
+ def add(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
"""
Adds a flow to the state. If the flow already exists, it is
ignored.
"""
- if f.id not in self._store:
- self._store[f.id] = f
- if self.filter(f):
- self._base_add(f)
- if self.focus_follow:
- self.focus.flow = f
- self.sig_view_add.send(self, flow=f)
-
- def remove(self, f: mitmproxy.flow.Flow):
- """
- Removes the flow from the underlying store and the view.
- """
- if f.id in self._store:
- if f in self._view:
- self._view.remove(f)
- self.sig_view_remove.send(self, flow=f)
- del self._store[f.id]
- self.sig_store_remove.send(self, flow=f)
-
- def update(self, f: mitmproxy.flow.Flow):
- """
- Updates a flow. If the flow is not in the state, it's ignored.
- """
- if f.id in self._store:
- if self.filter(f):
- if f not in self._view:
+ for f in flows:
+ if f.id not in self._store:
+ self._store[f.id] = f
+ if self.filter(f):
self._base_add(f)
if self.focus_follow:
self.focus.flow = f
self.sig_view_add.send(self, flow=f)
- else:
- # This is a tad complicated. The sortedcontainers
- # implementation assumes that the order key is stable. If
- # it changes mid-way Very Bad Things happen. We detect when
- # this happens, and re-fresh the item.
- self.order_key.refresh(f)
- self.sig_view_update.send(self, flow=f)
- else:
- try:
- self._view.remove(f)
- self.sig_view_remove.send(self, flow=f)
- except ValueError:
- # The value was not in the view
- pass
def get_by_id(self, flow_id: str) -> typing.Optional[mitmproxy.flow.Flow]:
"""
@@ -300,32 +278,65 @@ class View(collections.Sequence):
"""
return self._store.get(flow_id)
- # Event handlers
- def configure(self, updated):
- if "view_filter" in updated:
- filt = None
- if ctx.options.view_filter:
- filt = flowfilter.parse(ctx.options.view_filter)
- if not filt:
- raise exceptions.OptionsError(
- "Invalid interception filter: %s" % ctx.options.view_filter
- )
- self.set_filter(filt)
- if "console_order" in updated:
- if ctx.options.console_order not in self.orders:
- raise exceptions.OptionsError(
- "Unknown flow order: %s" % ctx.options.console_order
- )
- self.set_order(self.orders[ctx.options.console_order])
- if "console_order_reversed" in updated:
- self.set_reversed(ctx.options.console_order_reversed)
- if "console_focus_follow" in updated:
- self.focus_follow = ctx.options.console_focus_follow
+ @command.command("view.load")
+ def load_file(self, path: str) -> None:
+ """
+ Load flows into the view, without processing them with addons.
+ """
+ for i in io.FlowReader(open(path, "rb")).stream():
+ # Do this to get a new ID, so we can load the same file N times and
+ # get new flows each time. It would be more efficient to just have a
+ # .newid() method or something.
+ self.add([i.copy()])
+
+ @command.command("view.go")
+ def go(self, dst: int) -> None:
+ """
+ Go to a specified offset. Positive offests are from the beginning of
+ the view, negative from the end of the view, so that 0 is the first
+ flow, -1 is the last flow.
+ """
+ if dst < 0:
+ dst = len(self) + dst
+ if dst < 0:
+ dst = 0
+ if dst > len(self) - 1:
+ dst = len(self) - 1
+ self.focus.flow = self[dst]
+
+ @command.command("view.duplicate")
+ def duplicate(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
+ """
+ Duplicates the specified flows, and sets the focus to the first
+ duplicate.
+ """
+ dups = [f.copy() for f in flows]
+ if dups:
+ self.add(dups)
+ self.focus.flow = dups[0]
+ @command.command("view.remove")
+ def remove(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
+ """
+ Removes the flow from the underlying store and the view.
+ """
+ for f in flows:
+ if f.id in self._store:
+ if f.killable:
+ f.kill()
+ if f in self._view:
+ self._view.remove(f)
+ self.sig_view_remove.send(self, flow=f)
+ del self._store[f.id]
+ self.sig_store_remove.send(self, flow=f)
+
+ @command.command("view.resolve")
def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
"""
Resolve a flow list specification to an actual list of flows.
"""
+ if spec == "@all":
+ return [i for i in self._store.values()]
if spec == "@focus":
return [self.focus.flow] if self.focus.flow else []
elif spec == "@shown":
@@ -342,26 +353,82 @@ class View(collections.Sequence):
raise exceptions.CommandError("Invalid flow filter: %s" % spec)
return [i for i in self._store.values() if filt(i)]
- def load(self, l):
- l.add_command("console.resolve", self.resolve)
+ @command.command("view.create")
+ def create(self, method: str, url: str) -> None:
+ req = http.HTTPRequest.make(method.upper(), url)
+ c = connections.ClientConnection.make_dummy(("", 0))
+ s = connections.ServerConnection.make_dummy((req.host, req.port))
+ f = http.HTTPFlow(c, s)
+ f.request = req
+ f.request.headers["Host"] = req.host
+ self.add([f])
+
+ # Event handlers
+ def configure(self, updated):
+ if "view_filter" in updated:
+ filt = None
+ if ctx.options.view_filter:
+ filt = flowfilter.parse(ctx.options.view_filter)
+ if not filt:
+ raise exceptions.OptionsError(
+ "Invalid interception filter: %s" % ctx.options.view_filter
+ )
+ self.set_filter(filt)
+ if "console_order" in updated:
+ if ctx.options.console_order not in self.orders:
+ raise exceptions.OptionsError(
+ "Unknown flow order: %s" % ctx.options.console_order
+ )
+ self.set_order(self.orders[ctx.options.console_order])
+ if "console_order_reversed" in updated:
+ self.set_reversed(ctx.options.console_order_reversed)
+ if "console_focus_follow" in updated:
+ self.focus_follow = ctx.options.console_focus_follow
def request(self, f):
- self.add(f)
+ self.add([f])
def error(self, f):
- self.update(f)
+ self.update([f])
def response(self, f):
- self.update(f)
+ self.update([f])
def intercept(self, f):
- self.update(f)
+ self.update([f])
def resume(self, f):
- self.update(f)
+ self.update([f])
def kill(self, f):
- self.update(f)
+ self.update([f])
+
+ def update(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
+ """
+ Updates a list of flows. If flow is not in the state, it's ignored.
+ """
+ for f in flows:
+ if f.id in self._store:
+ if self.filter(f):
+ if f not in self._view:
+ self._base_add(f)
+ if self.focus_follow:
+ self.focus.flow = f
+ self.sig_view_add.send(self, flow=f)
+ else:
+ # This is a tad complicated. The sortedcontainers
+ # implementation assumes that the order key is stable. If
+ # it changes mid-way Very Bad Things happen. We detect when
+ # this happens, and re-fresh the item.
+ self.order_key.refresh(f)
+ self.sig_view_update.send(self, flow=f)
+ else:
+ try:
+ self._view.remove(f)
+ self.sig_view_remove.send(self, flow=f)
+ except ValueError:
+ # The value was not in the view
+ pass
class Focus:
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
index fa6e23ea..82b8fae4 100644
--- a/mitmproxy/command.py
+++ b/mitmproxy/command.py
@@ -1,23 +1,36 @@
+"""
+ This module manges and invokes typed commands.
+"""
import inspect
import typing
import shlex
import textwrap
import functools
+import sys
from mitmproxy.utils import typecheck
from mitmproxy import exceptions
from mitmproxy import flow
+Cuts = typing.Sequence[
+ typing.Sequence[typing.Union[str, bytes]]
+]
+
+
def typename(t: type, ret: bool) -> str:
"""
- Translates a type to an explanatory string. Ifl ret is True, we're
+ Translates a type to an explanatory string. If ret is True, we're
looking at a return type, else we're looking at a parameter type.
"""
- if t in (str, int, bool):
+ if issubclass(t, (str, int, bool)):
return t.__name__
elif t == typing.Sequence[flow.Flow]:
return "[flow]" if ret else "flowspec"
+ elif t == typing.Sequence[str]:
+ return "[str]"
+ elif t == Cuts:
+ return "[cuts]" if ret else "cutspec"
elif t == flow.Flow:
return "flow"
else: # pragma: no cover
@@ -34,11 +47,20 @@ class Command:
if func.__doc__:
txt = func.__doc__.strip()
self.help = "\n".join(textwrap.wrap(txt))
+
+ self.has_positional = False
+ for i in sig.parameters.values():
+ # This is the kind for *args paramters
+ if i.kind == i.VAR_POSITIONAL:
+ self.has_positional = True
self.paramtypes = [v.annotation for v in sig.parameters.values()]
self.returntype = sig.return_annotation
def paramnames(self) -> typing.Sequence[str]:
- return [typename(i, False) for i in self.paramtypes]
+ v = [typename(i, False) for i in self.paramtypes]
+ if self.has_positional:
+ v[-1] = "*" + v[-1][1:-1]
+ return v
def retname(self) -> str:
return typename(self.returntype, True) if self.returntype else ""
@@ -54,17 +76,31 @@ class Command:
"""
Call the command with a set of arguments. At this point, all argumets are strings.
"""
- if len(self.paramtypes) != len(args):
+ if not self.has_positional and (len(self.paramtypes) != len(args)):
raise exceptions.CommandError("Usage: %s" % self.signature_help())
+ remainder = [] # type: typing.Sequence[str]
+ if self.has_positional:
+ remainder = args[len(self.paramtypes) - 1:]
+ args = args[:len(self.paramtypes) - 1]
+
pargs = []
for i in range(len(args)):
- pargs.append(parsearg(self.manager, args[i], self.paramtypes[i]))
+ if typecheck.check_command_type(args[i], self.paramtypes[i]):
+ pargs.append(args[i])
+ else:
+ pargs.append(parsearg(self.manager, args[i], self.paramtypes[i]))
+
+ if remainder:
+ if typecheck.check_command_type(remainder, self.paramtypes[-1]):
+ pargs.extend(remainder)
+ else:
+ raise exceptions.CommandError("Invalid value type.")
with self.manager.master.handlecontext():
ret = self.func(*pargs)
- if not typecheck.check_command_return_type(ret, self.returntype):
+ if not typecheck.check_command_type(ret, self.returntype):
raise exceptions.CommandError("Command returned unexpected data")
return ret
@@ -102,22 +138,49 @@ class CommandManager:
raise exceptions.CommandError("Invalid command: %s" % cmdstr)
return self.call_args(parts[0], parts[1:])
+ def dump(self, out=sys.stdout) -> None:
+ cmds = list(self.commands.values())
+ cmds.sort(key=lambda x: x.signature_help())
+ for c in cmds:
+ for hl in (c.help or "").splitlines():
+ print("# " + hl, file=out)
+ print(c.signature_help(), file=out)
+ print(file=out)
+
def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
Convert a string to a argument to the appropriate type.
"""
- if argtype == str:
+ if issubclass(argtype, str):
return spec
+ elif argtype == bool:
+ if spec == "true":
+ return True
+ elif spec == "false":
+ return False
+ else:
+ raise exceptions.CommandError(
+ "Booleans are 'true' or 'false', got %s" % spec
+ )
+ elif issubclass(argtype, int):
+ try:
+ return int(spec)
+ except ValueError as e:
+ raise exceptions.CommandError("Expected an integer, got %s." % spec)
elif argtype == typing.Sequence[flow.Flow]:
- return manager.call_args("console.resolve", [spec])
+ return manager.call_args("view.resolve", [spec])
+ elif argtype == Cuts:
+ return manager.call_args("cut", [spec])
elif argtype == flow.Flow:
- flows = manager.call_args("console.resolve", [spec])
+ flows = manager.call_args("view.resolve", [spec])
if len(flows) != 1:
raise exceptions.CommandError(
"Command requires one flow, specification matched %s." % len(flows)
)
return flows[0]
+ elif argtype == typing.Sequence[str]:
+ return [i.strip() for i in spec.split(",")]
else:
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
diff --git a/mitmproxy/eventsequence.py b/mitmproxy/eventsequence.py
index 30c037f1..4e199972 100644
--- a/mitmproxy/eventsequence.py
+++ b/mitmproxy/eventsequence.py
@@ -35,6 +35,7 @@ Events = frozenset([
"load",
"running",
"tick",
+ "update",
])
diff --git a/mitmproxy/export.py b/mitmproxy/export.py
deleted file mode 100644
index efa08874..00000000
--- a/mitmproxy/export.py
+++ /dev/null
@@ -1,183 +0,0 @@
-import io
-import json
-import pprint
-import re
-import textwrap
-from typing import Any
-
-from mitmproxy import http
-from mitmproxy.utils import strutils
-
-
-def curl_command(flow: http.HTTPFlow) -> str:
- data = "curl "
-
- request = flow.request.copy()
- request.decode(strict=False)
-
- for k, v in request.headers.items(multi=True):
- data += "-H '%s:%s' " % (k, v)
-
- if request.method != "GET":
- data += "-X %s " % request.method
-
- data += "'%s'" % request.url
-
- if request.content:
- data += " --data-binary '%s'" % strutils.bytes_to_escaped_str(
- request.content,
- escape_single_quotes=True
- )
-
- return data
-
-
-def python_arg(arg: str, val: Any) -> str:
- if not val:
- return ""
- if arg:
- arg += "="
- arg_str = "{}{},\n".format(
- arg,
- pprint.pformat(val, 79 - len(arg))
- )
- return textwrap.indent(arg_str, " " * 4)
-
-
-def python_code(flow: http.HTTPFlow):
- code = io.StringIO()
-
- def writearg(arg, val):
- code.write(python_arg(arg, val))
-
- code.write("import requests\n")
- code.write("\n")
- if flow.request.method.lower() in ("get", "post", "put", "head", "delete", "patch"):
- code.write("response = requests.{}(\n".format(flow.request.method.lower()))
- else:
- code.write("response = requests.request(\n")
- writearg("", flow.request.method)
- url_without_query = flow.request.url.split("?", 1)[0]
- writearg("", url_without_query)
-
- writearg("params", list(flow.request.query.fields))
-
- headers = flow.request.headers.copy()
- # requests adds those by default.
- for x in (":authority", "host", "content-length"):
- headers.pop(x, None)
- writearg("headers", dict(headers))
- try:
- if "json" not in flow.request.headers.get("content-type", ""):
- raise ValueError()
- writearg("json", json.loads(flow.request.text))
- except ValueError:
- writearg("data", flow.request.content)
-
- code.seek(code.tell() - 2) # remove last comma
- code.write("\n)\n")
- code.write("\n")
- code.write("print(response.text)")
-
- return code.getvalue()
-
-
-def locust_code(flow):
- code = textwrap.dedent("""
- from locust import HttpLocust, TaskSet, task
-
- class UserBehavior(TaskSet):
- def on_start(self):
- ''' on_start is called when a Locust start before any task is scheduled '''
- self.{name}()
-
- @task()
- def {name}(self):
- url = self.locust.host + '{path}'
- {headers}{params}{data}
- self.response = self.client.request(
- method='{method}',
- url=url,{args}
- )
-
- ### Additional tasks can go here ###
-
-
- class WebsiteUser(HttpLocust):
- task_set = UserBehavior
- min_wait = 1000
- max_wait = 3000
-""").strip()
-
- name = re.sub('\W|^(?=\d)', '_', flow.request.path.strip("/").split("?", 1)[0])
- if not name:
- new_name = "_".join([str(flow.request.host), str(flow.request.timestamp_start)])
- name = re.sub('\W|^(?=\d)', '_', new_name)
-
- path_without_query = flow.request.path.split("?")[0]
-
- args = ""
- headers = ""
-
- def conv(x):
- return strutils.bytes_to_escaped_str(x, escape_single_quotes=True)
-
- if flow.request.headers:
- lines = [
- (conv(k), conv(v)) for k, v in flow.request.headers.fields
- if conv(k).lower() not in [":authority", "host", "cookie"]
- ]
- lines = [" '%s': '%s',\n" % (k, v) for k, v in lines]
- headers += "\n headers = {\n%s }\n" % "".join(lines)
- args += "\n headers=headers,"
-
- params = ""
- if flow.request.query:
- lines = [
- " %s: %s,\n" % (repr(k), repr(v))
- for k, v in
- flow.request.query.collect()
- ]
- params = "\n params = {\n%s }\n" % "".join(lines)
- args += "\n params=params,"
-
- data = ""
- if flow.request.content:
- data = "\n data = '''%s'''\n" % conv(flow.request.content)
- args += "\n data=data,"
-
- code = code.format(
- name=name,
- path=path_without_query,
- headers=headers,
- params=params,
- data=data,
- method=flow.request.method,
- args=args,
- )
-
- return code
-
-
-def locust_task(flow):
- code = locust_code(flow)
- start_task = len(code.split('@task')[0]) - 4
- end_task = -19 - len(code.split('### Additional')[1])
- task_code = code[start_task:end_task]
-
- return task_code
-
-
-def url(flow):
- return flow.request.url
-
-
-EXPORTERS = [
- ("content", "c", None),
- ("headers+content", "h", None),
- ("url", "u", url),
- ("as curl command", "r", curl_command),
- ("as python code", "p", python_code),
- ("as locust code", "l", locust_code),
- ("as locust task", "t", locust_task),
-]
diff --git a/mitmproxy/master.py b/mitmproxy/master.py
index 2a032c4a..d21a323e 100644
--- a/mitmproxy/master.py
+++ b/mitmproxy/master.py
@@ -7,7 +7,6 @@ from mitmproxy import options
from mitmproxy import controller
from mitmproxy import eventsequence
from mitmproxy import exceptions
-from mitmproxy import connections
from mitmproxy import command
from mitmproxy import http
from mitmproxy import log
@@ -78,9 +77,6 @@ class Master:
self.start()
try:
while not self.should_exit.is_set():
- # Don't choose a very small timeout in Python 2:
- # https://github.com/mitmproxy/mitmproxy/issues/443
- # TODO: Lower the timeout value if we move to Python 3.
self.tick(0.1)
finally:
self.shutdown()
@@ -109,22 +105,6 @@ class Master:
self.should_exit.set()
self.addons.trigger("done")
- def create_request(self, method, url):
- """
- Create a new artificial and minimalist request also adds it to flowlist.
-
- Raises:
- ValueError, if the url is malformed.
- """
- req = http.HTTPRequest.make(method, url)
- c = connections.ClientConnection.make_dummy(("", 0))
- s = connections.ServerConnection.make_dummy((req.host, req.port))
-
- f = http.HTTPFlow(c, s)
- f.request = req
- self.load_flow(f)
- return f
-
def load_flow(self, f):
"""
Loads a flow
diff --git a/mitmproxy/test/tflow.py b/mitmproxy/test/tflow.py
index 270021cb..9004df2f 100644
--- a/mitmproxy/test/tflow.py
+++ b/mitmproxy/test/tflow.py
@@ -174,7 +174,7 @@ def tserver_conn():
id=str(uuid.uuid4()),
address=("address", 22),
source_address=("address", 22),
- ip_address=None,
+ ip_address=("192.168.0.1", 22),
cert=None,
timestamp_start=1,
timestamp_tcp_setup=2,
@@ -183,7 +183,7 @@ def tserver_conn():
ssl_established=False,
sni="address",
alpn_proto_negotiated=None,
- tls_version=None,
+ tls_version="TLSv1.2",
via=None,
))
c.reply = controller.DummyReply()
diff --git a/mitmproxy/tools/console/commandeditor.py b/mitmproxy/tools/console/commandeditor.py
index fd7d12ac..17d1506b 100644
--- a/mitmproxy/tools/console/commandeditor.py
+++ b/mitmproxy/tools/console/commandeditor.py
@@ -1,6 +1,8 @@
+import typing
import urwid
from mitmproxy import exceptions
+from mitmproxy import flow
from mitmproxy.tools.console import signals
@@ -23,5 +25,14 @@ class CommandExecutor:
except exceptions.CommandError as v:
signals.status_message.send(message=str(v))
else:
- if type(ret) == str:
- signals.status_message.send(message=ret)
+ if ret:
+ if type(ret) == typing.Sequence[flow.Flow]:
+ signals.status_message.send(
+ message="Command returned %s flows" % len(ret)
+ )
+ elif len(str(ret)) < 50:
+ signals.status_message.send(message=str(ret))
+ else:
+ signals.status_message.send(
+ message="Command returned too much data to display."
+ )
diff --git a/mitmproxy/tools/console/common.py b/mitmproxy/tools/console/common.py
index ec637cbc..812ca7a8 100644
--- a/mitmproxy/tools/console/common.py
+++ b/mitmproxy/tools/console/common.py
@@ -9,7 +9,6 @@ import urwid.util
import mitmproxy.net
from functools import lru_cache
from mitmproxy.tools.console import signals
-from mitmproxy import export
from mitmproxy.utils import human
try:
@@ -306,28 +305,6 @@ def ask_save_body(scope, flow):
signals.status_message.send(message="No content.")
-def export_to_clip_or_file(key, scope, flow, writer):
- """
- Export selected flow to clipboard or a file.
-
- key: _c_ontent, _h_eaders+content, _u_rl,
- cu_r_l_command, _p_ython_code,
- _l_ocust_code, locust_t_ask
- scope: None, _a_ll, re_q_uest, re_s_ponse
- writer: copy_to_clipboard_or_prompt, ask_save_path
- """
-
- for _, exp_key, exporter in export.EXPORTERS:
- if key == exp_key:
- if exporter is None: # 'c' & 'h'
- if scope is None:
- ask_scope_and_callback(flow, handle_flow_data, key, writer)
- else:
- handle_flow_data(scope, flow, key, writer)
- else: # other keys
- writer(exporter(flow))
-
-
@lru_cache(maxsize=800)
def raw_format_flow(f, flow):
f = dict(f)
@@ -418,13 +395,16 @@ def raw_format_flow(f, flow):
def format_flow(f, focus, extended=False, hostheader=False, max_url_len=False):
+ acked = False
+ if f.reply and f.reply.state == "committed":
+ acked = True
d = dict(
focus=focus,
extended=extended,
max_url_len=max_url_len,
intercepted = f.intercepted,
- acked = f.reply.state == "committed",
+ acked = acked,
req_timestamp = f.request.timestamp_start,
req_is_replay = f.request.is_replay,
diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py
index bb59a9b7..7400c16c 100644
--- a/mitmproxy/tools/console/flowlist.py
+++ b/mitmproxy/tools/console/flowlist.py
@@ -1,10 +1,7 @@
import urwid
-from mitmproxy import exceptions
from mitmproxy.tools.console import common
from mitmproxy.tools.console import signals
-from mitmproxy.addons import view
-from mitmproxy import export
import mitmproxy.tools.console.master # noqa
@@ -133,14 +130,6 @@ class FlowItem(urwid.WidgetWrap):
def selectable(self):
return True
- def server_replay_prompt(self, k):
- a = self.master.addons.get("serverplayback")
- if k == "a":
- a.load([i.copy() for i in self.master.view])
- elif k == "t":
- a.load([self.flow.copy()])
- signals.update_settings.send(self)
-
def mouse_event(self, size, event, button, col, row, focus):
if event == "mouse press" and button == 1:
if self.flow.request:
@@ -149,91 +138,7 @@ class FlowItem(urwid.WidgetWrap):
def keypress(self, xxx_todo_changeme, key):
(maxcol,) = xxx_todo_changeme
- key = common.shortcuts(key)
- if key == "a":
- self.flow.resume()
- self.master.view.update(self.flow)
- elif key == "d":
- if self.flow.killable:
- self.flow.kill()
- self.master.view.remove(self.flow)
- elif key == "D":
- cp = self.flow.copy()
- self.master.view.add(cp)
- self.master.view.focus.flow = cp
- elif key == "m":
- self.flow.marked = not self.flow.marked
- signals.flowlist_change.send(self)
- elif key == "r":
- try:
- self.master.replay_request(self.flow)
- except exceptions.ReplayException as e:
- signals.add_log("Replay error: %s" % e, "warn")
- signals.flowlist_change.send(self)
- elif key == "S":
- def stop_server_playback(response):
- if response == "y":
- self.master.options.server_replay = []
- a = self.master.addons.get("serverplayback")
- if a.count():
- signals.status_prompt_onekey.send(
- prompt = "Stop current server replay?",
- keys = (
- ("yes", "y"),
- ("no", "n"),
- ),
- callback = stop_server_playback,
- )
- else:
- signals.status_prompt_onekey.send(
- prompt = "Server Replay",
- keys = (
- ("all flows", "a"),
- ("this flow", "t"),
- ),
- callback = self.server_replay_prompt,
- )
- elif key == "U":
- for f in self.master.view:
- f.marked = False
- signals.flowlist_change.send(self)
- elif key == "V":
- if not self.flow.modified():
- signals.status_message.send(message="Flow not modified.")
- return
- self.flow.revert()
- signals.flowlist_change.send(self)
- signals.status_message.send(message="Reverted.")
- elif key == "X":
- if self.flow.killable:
- self.flow.kill()
- self.master.view.update(self.flow)
- elif key == "|":
- signals.status_prompt_path.send(
- prompt = "Send flow to script",
- callback = self.master.run_script_once,
- args = (self.flow,)
- )
- elif key == "E":
- signals.status_prompt_onekey.send(
- self,
- prompt = "Export to file",
- keys = [(e[0], e[1]) for e in export.EXPORTERS],
- callback = common.export_to_clip_or_file,
- args = (None, self.flow, common.ask_save_path)
- )
- elif key == "C":
- signals.status_prompt_onekey.send(
- self,
- prompt = "Export to clipboard",
- keys = [(e[0], e[1]) for e in export.EXPORTERS],
- callback = common.export_to_clip_or_file,
- args = (None, self.flow, common.copy_to_clipboard_or_prompt)
- )
- elif key == "b":
- common.ask_save_body(None, self.flow)
- else:
- return key
+ return common.shortcuts(key)
class FlowListWalker(urwid.ListWalker):
@@ -284,83 +189,6 @@ class FlowListBox(urwid.ListBox):
self.master = master # type: "mitmproxy.tools.console.master.ConsoleMaster"
super().__init__(FlowListWalker(master))
- def get_method_raw(self, k):
- if k:
- self.get_url(k)
-
- def get_method(self, k):
- if k == "e":
- signals.status_prompt.send(
- self,
- prompt = "Method",
- text = "",
- callback = self.get_method_raw
- )
- else:
- method = ""
- for i in common.METHOD_OPTIONS:
- if i[1] == k:
- method = i[0].upper()
- self.get_url(method)
-
- def get_url(self, method):
- signals.status_prompt.send(
- prompt = "URL",
- text = "http://www.example.com/",
- callback = self.new_request,
- args = (method,)
- )
-
- def new_request(self, url, method):
- try:
- f = self.master.create_request(method, url)
- except ValueError as e:
- signals.status_message.send(message = "Invalid URL: " + str(e))
- return
- self.master.view.focus.flow = f
-
def keypress(self, size, key):
key = common.shortcuts(key)
- if key == "A":
- for f in self.master.view:
- if f.intercepted:
- f.resume()
- self.master.view.update(f)
- elif key == "z":
- self.master.view.clear()
- elif key == "Z":
- self.master.view.clear_not_marked()
- elif key == "g":
- if len(self.master.view):
- self.master.view.focus.index = 0
- elif key == "G":
- if len(self.master.view):
- self.master.view.focus.index = len(self.master.view) - 1
- elif key == "L":
- signals.status_prompt_path.send(
- self,
- prompt = "Load flows",
- callback = self.master.load_flows_callback
- )
- elif key == "M":
- self.master.view.toggle_marked()
- elif key == "n":
- signals.status_prompt_onekey.send(
- prompt = "Method",
- keys = common.METHOD_OPTIONS,
- callback = self.get_method
- )
- elif key == "o":
- orders = [(i[1], i[0]) for i in view.orders]
- lookup = dict([(i[0], i[1]) for i in view.orders])
-
- def change_order(k):
- self.master.options.console_order = lookup[k]
-
- signals.status_prompt_onekey.send(
- prompt = "Order",
- keys = orders,
- callback = change_order
- )
- else:
- return urwid.ListBox.keypress(self, size, key)
+ return urwid.ListBox.keypress(self, size, key)
diff --git a/mitmproxy/tools/console/flowview.py b/mitmproxy/tools/console/flowview.py
index 33c8f2ac..b7b7053f 100644
--- a/mitmproxy/tools/console/flowview.py
+++ b/mitmproxy/tools/console/flowview.py
@@ -8,7 +8,6 @@ import urwid
from mitmproxy import contentviews
from mitmproxy import exceptions
-from mitmproxy import export
from mitmproxy import http
from mitmproxy.net.http import Headers
from mitmproxy.net.http import status_codes
@@ -619,29 +618,31 @@ class FlowView(tabs.Tabs):
)
)
elif key == "E":
- if self.tab_offset == TAB_REQ:
- scope = "q"
- else:
- scope = "s"
- signals.status_prompt_onekey.send(
- self,
- prompt = "Export to file",
- keys = [(e[0], e[1]) for e in export.EXPORTERS],
- callback = common.export_to_clip_or_file,
- args = (scope, self.flow, common.ask_save_path)
- )
+ pass
+ # if self.tab_offset == TAB_REQ:
+ # scope = "q"
+ # else:
+ # scope = "s"
+ # signals.status_prompt_onekey.send(
+ # self,
+ # prompt = "Export to file",
+ # keys = [(e[0], e[1]) for e in export.EXPORTERS],
+ # callback = common.export_to_clip_or_file,
+ # args = (scope, self.flow, common.ask_save_path)
+ # )
elif key == "C":
- if self.tab_offset == TAB_REQ:
- scope = "q"
- else:
- scope = "s"
- signals.status_prompt_onekey.send(
- self,
- prompt = "Export to clipboard",
- keys = [(e[0], e[1]) for e in export.EXPORTERS],
- callback = common.export_to_clip_or_file,
- args = (scope, self.flow, common.copy_to_clipboard_or_prompt)
- )
+ pass
+ # if self.tab_offset == TAB_REQ:
+ # scope = "q"
+ # else:
+ # scope = "s"
+ # signals.status_prompt_onekey.send(
+ # self,
+ # prompt = "Export to clipboard",
+ # keys = [(e[0], e[1]) for e in export.EXPORTERS],
+ # callback = common.export_to_clip_or_file,
+ # args = (scope, self.flow, common.copy_to_clipboard_or_prompt)
+ # )
elif key == "x":
conn.content = None
signals.flow_change.send(self, flow=self.flow)
diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py
index 7787ba11..5b6d9bcb 100644
--- a/mitmproxy/tools/console/master.py
+++ b/mitmproxy/tools/console/master.py
@@ -9,14 +9,14 @@ import subprocess
import sys
import tempfile
import traceback
+import typing
import urwid
+from mitmproxy import ctx
from mitmproxy import addons
-from mitmproxy import exceptions
from mitmproxy import command
from mitmproxy import master
-from mitmproxy import io
from mitmproxy import log
from mitmproxy import flow
from mitmproxy.addons import intercept
@@ -78,7 +78,7 @@ class UnsupportedLog:
signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug")
-class ConsoleCommands:
+class ConsoleAddon:
"""
An addon that exposes console-specific commands.
"""
@@ -86,12 +86,31 @@ class ConsoleCommands:
self.master = master
self.started = False
+ @command.command("console.choose")
+ def console_choose(
+ self, prompt: str, choicecmd: str, *cmd: typing.Sequence[str]
+ ) -> None:
+ """
+ Prompt the user to choose from a list of strings returned by a
+ command, then invoke another command with all occurances of {choice}
+ replaced by the choice the user made.
+ """
+ choices = ctx.master.commands.call_args(choicecmd, [])
+
+ def callback(opt):
+ repl = " ".join(cmd)
+ repl = repl.replace("{choice}", opt)
+ self.master.commands.call(repl)
+
+ self.master.overlay(overlay.Chooser(choicecmd, choices, "", callback))
+ ctx.log.info(choices)
+
@command.command("console.command")
- def console_command(self, partial: str) -> None:
+ def console_command(self, *partial: typing.Sequence[str]) -> None:
"""
Prompt the user to edit a command with a (possilby empty) starting value.
"""
- signals.status_prompt_command.send(partial=partial)
+ signals.status_prompt_command.send(partial=" ".join(partial) + " ") # type: ignore
@command.command("console.view.commands")
def view_commands(self) -> None:
@@ -131,6 +150,10 @@ class ConsoleCommands:
def running(self):
self.started = True
+ def update(self, flows):
+ if not flows:
+ signals.update_settings.send(self)
+
def configure(self, updated):
if self.started:
if "console_eventlog" in updated:
@@ -144,14 +167,50 @@ def default_keymap(km):
km.add("O", "console.view.options")
km.add("Q", "console.exit")
km.add("q", "console.view.pop")
- km.add("i", "console.command 'set intercept='")
- km.add("W", "console.command 'set save_stream_file='")
-
+ km.add("i", "console.command set intercept=")
+ km.add("W", "console.command set save_stream_file=")
+
+ km.add("A", "flow.resume @all", context="flowlist")
+ km.add("a", "flow.resume @focus", context="flowlist")
+ km.add("b", "console.command cut.save s.content|@focus ''", context="flowlist")
+ km.add("d", "view.remove @focus", context="flowlist")
+ km.add("D", "view.duplicate @focus", context="flowlist")
+ km.add("e", "set console_eventlog=toggle", context="flowlist")
+ km.add(
+ "E",
+ "console.choose Format export.formats "
+ "console.command export.file {choice} @focus ''",
+ context="flowlist"
+ )
+ km.add("f", "console.command 'set view_filter='", context="flowlist")
km.add("F", "set console_focus_follow=toggle", context="flowlist")
+ km.add("g", "view.go 0", context="flowlist")
+ km.add("G", "view.go -1", context="flowlist")
+ km.add("l", "console.command cut.clip ", context="flowlist")
+ km.add("L", "console.command view.load ", context="flowlist")
+ km.add("m", "flow.mark.toggle @focus", context="flowlist")
+ km.add("M", "view.marked.toggle", context="flowlist")
+ km.add(
+ "n",
+ "console.command view.create get https://google.com",
+ context="flowlist"
+ )
+ km.add(
+ "o",
+ "console.choose Order view.order.options "
+ "set console_order={choice}",
+ context="flowlist"
+ )
+ km.add("r", "replay.client @focus", context="flowlist")
+ km.add("S", "console.command 'replay.server '")
km.add("v", "set console_order_reversed=toggle", context="flowlist")
- km.add("f", "console.command 'set view_filter='", context="flowlist")
- km.add("e", "set console_eventlog=toggle", context="flowlist")
+ km.add("U", "flow.mark @all false", context="flowlist")
km.add("w", "console.command 'save.file @shown '", context="flowlist")
+ km.add("V", "flow.revert @focus", context="flowlist")
+ km.add("X", "flow.kill @focus", context="flowlist")
+ km.add("z", "view.remove @all", context="flowlist")
+ km.add("Z", "view.remove @hidden", context="flowlist")
+ km.add("|", "console.command 'script.run @focus '", context="flowlist")
km.add("enter", "console.view.flow @focus", context="flowlist")
@@ -184,7 +243,7 @@ class ConsoleMaster(master.Master):
self.view,
UnsupportedLog(),
readfile.ReadFile(),
- ConsoleCommands(self),
+ ConsoleAddon(self),
)
def sigint_handler(*args, **kwargs):
@@ -262,31 +321,10 @@ class ConsoleMaster(master.Master):
self.loop.widget = window
self.loop.draw_screen()
- def run_script_once(self, command, f):
- sc = self.addons.get("scriptloader")
- try:
- with self.handlecontext():
- sc.run_once(command, [f])
- except ValueError as e:
- signals.add_log("Input error: %s" % e, "warn")
-
def refresh_view(self):
self.view_flowlist()
signals.replace_view_state.send(self)
- def _readflows(self, path):
- """
- Utitility function that reads a list of flows
- or prints an error to the UI if that fails.
- Returns
- - None, if there was an error.
- - a list of flows, otherwise.
- """
- try:
- return io.read_flows_from_paths(path)
- except exceptions.FlowReadException as e:
- signals.status_message.send(message=str(e))
-
def spawn_editor(self, data):
text = not isinstance(data, bytes)
fd, name = tempfile.mkstemp('', "mproxy", text=text)
@@ -507,31 +545,6 @@ class ConsoleMaster(master.Master):
)
)
- def _write_flows(self, path, flows):
- with open(path, "wb") as f:
- fw = io.FlowWriter(f)
- for i in flows:
- fw.add(i)
-
- def save_one_flow(self, path, flow):
- return self._write_flows(path, [flow])
-
- def save_flows(self, path):
- return self._write_flows(path, self.view)
-
- def load_flows_callback(self, path):
- ret = self.load_flows_path(path)
- return ret or "Flows loaded from %s" % path
-
- def load_flows_path(self, path):
- reterr = None
- try:
- master.Master.load_flows_file(self, path)
- except exceptions.FlowReadException as e:
- reterr = str(e)
- signals.flowlist_change.send(self)
- return reterr
-
def quit(self, a):
if a != "n":
self.shutdown()
diff --git a/mitmproxy/tools/main.py b/mitmproxy/tools/main.py
index fefdca5c..9748f3cf 100644
--- a/mitmproxy/tools/main.py
+++ b/mitmproxy/tools/main.py
@@ -85,11 +85,7 @@ def run(MasterKlass, args, extra=None): # pragma: no cover
print(optmanager.dump_defaults(opts))
sys.exit(0)
if args.commands:
- cmds = []
- for c in master.commands.commands.values():
- cmds.append(c.signature_help())
- for i in sorted(cmds):
- print(i)
+ master.commands.dump()
sys.exit(0)
opts.set(*args.setoptions)
if extra:
diff --git a/mitmproxy/tools/web/app.py b/mitmproxy/tools/web/app.py
index 1e01c57c..c55c0cb5 100644
--- a/mitmproxy/tools/web/app.py
+++ b/mitmproxy/tools/web/app.py
@@ -246,7 +246,7 @@ class ResumeFlows(RequestHandler):
def post(self):
for f in self.view:
f.resume()
- self.view.update(f)
+ self.view.update([f])
class KillFlows(RequestHandler):
@@ -254,27 +254,27 @@ class KillFlows(RequestHandler):
for f in self.view:
if f.killable:
f.kill()
- self.view.update(f)
+ self.view.update([f])
class ResumeFlow(RequestHandler):
def post(self, flow_id):
self.flow.resume()
- self.view.update(self.flow)
+ self.view.update([self.flow])
class KillFlow(RequestHandler):
def post(self, flow_id):
if self.flow.killable:
self.flow.kill()
- self.view.update(self.flow)
+ self.view.update([self.flow])
class FlowHandler(RequestHandler):
def delete(self, flow_id):
if self.flow.killable:
self.flow.kill()
- self.view.remove(self.flow)
+ self.view.remove([self.flow])
def put(self, flow_id):
flow = self.flow
@@ -317,13 +317,13 @@ class FlowHandler(RequestHandler):
except APIError:
flow.revert()
raise
- self.view.update(flow)
+ self.view.update([flow])
class DuplicateFlow(RequestHandler):
def post(self, flow_id):
f = self.flow.copy()
- self.view.add(f)
+ self.view.add([f])
self.write(f.id)
@@ -331,14 +331,14 @@ class RevertFlow(RequestHandler):
def post(self, flow_id):
if self.flow.modified():
self.flow.revert()
- self.view.update(self.flow)
+ self.view.update([self.flow])
class ReplayFlow(RequestHandler):
def post(self, flow_id):
self.flow.backup()
self.flow.response = None
- self.view.update(self.flow)
+ self.view.update([self.flow])
try:
self.master.replay_request(self.flow)
@@ -351,7 +351,7 @@ class FlowContent(RequestHandler):
self.flow.backup()
message = getattr(self.flow, message)
message.content = self.filecontents
- self.view.update(self.flow)
+ self.view.update([self.flow])
def get(self, flow_id, message):
message = getattr(self.flow, message)
diff --git a/mitmproxy/types/multidict.py b/mitmproxy/types/multidict.py
index c4f42580..bd9766a3 100644
--- a/mitmproxy/types/multidict.py
+++ b/mitmproxy/types/multidict.py
@@ -155,22 +155,6 @@ class _MultiDict(MutableMapping, metaclass=ABCMeta):
else:
return super().items()
- def collect(self):
- """
- Returns a list of (key, value) tuples, where values are either
- singular if there is only one matching item for a key, or a list
- if there are more than one. The order of the keys matches the order
- in the underlying fields list.
- """
- coll = []
- for key in self:
- values = self.get_all(key)
- if len(values) == 1:
- coll.append([key, values[0]])
- else:
- coll.append([key, values])
- return coll
-
class MultiDict(_MultiDict, serializable.Serializable):
def __init__(self, fields=()):
diff --git a/mitmproxy/utils/strutils.py b/mitmproxy/utils/strutils.py
index 1b90c2e5..db0cfd2e 100644
--- a/mitmproxy/utils/strutils.py
+++ b/mitmproxy/utils/strutils.py
@@ -25,9 +25,10 @@ def always_str(str_or_bytes: Optional[AnyStr], *decode_args) -> Optional[str]:
raise TypeError("Expected str or bytes, but got {}.".format(type(str_or_bytes).__name__))
-# Translate control characters to "safe" characters. This implementation initially
-# replaced them with the matching control pictures (http://unicode.org/charts/PDF/U2400.pdf),
-# but that turned out to render badly with monospace fonts. We are back to "." therefore.
+# Translate control characters to "safe" characters. This implementation
+# initially replaced them with the matching control pictures
+# (http://unicode.org/charts/PDF/U2400.pdf), but that turned out to render badly
+# with monospace fonts. We are back to "." therefore.
_control_char_trans = {
x: ord(".") # x + 0x2400 for unicode control group pictures
for x in range(32)
diff --git a/mitmproxy/utils/typecheck.py b/mitmproxy/utils/typecheck.py
index 20791e17..a5f27fee 100644
--- a/mitmproxy/utils/typecheck.py
+++ b/mitmproxy/utils/typecheck.py
@@ -1,7 +1,7 @@
import typing
-def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
+def check_command_type(value: typing.Any, typeinfo: typing.Any) -> bool:
"""
Check if the provided value is an instance of typeinfo. Returns True if the
types match, False otherwise. This function supports only those types
@@ -17,7 +17,17 @@ def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
if not isinstance(value, (tuple, list)):
return False
for v in value:
- if not check_command_return_type(v, T):
+ if not check_command_type(v, T):
+ return False
+ elif typename.startswith("typing.Union"):
+ try:
+ types = typeinfo.__args__ # type: ignore
+ except AttributeError:
+ # Python 3.5.x
+ types = typeinfo.__union_params__ # type: ignore
+ for T in types:
+ checks = [check_command_type(value, T) for T in types]
+ if not any(checks):
return False
elif value is None and typeinfo is None:
return True
diff --git a/release/README.md b/release/README.md
index a30221c8..a60b7f98 100644
--- a/release/README.md
+++ b/release/README.md
@@ -8,7 +8,7 @@ Make sure run all these steps on the correct branch you want to create a new rel
- Wait for tag CI to complete
## GitHub Release
-- Create release notice on Github [https://github.com/mitmproxy/mitmproxy/releases/new](here)
+- Create release notice on Github [here](https://github.com/mitmproxy/mitmproxy/releases/new)
- Attach all files from the new release folder on https://snapshots.mitmproxy.org
## PyPi
@@ -20,6 +20,10 @@ Make sure run all these steps on the correct branch you want to create a new rel
- Update the dependencies in [alpine/requirements.txt](https://github.com/mitmproxy/docker-releases/commit/3d6a9989fde068ad0aea257823ac3d7986ff1613#diff-9b7e0eea8ae74688b1ac13ea080549ba)
* Creating a fresh venv, pip-installing the new wheel in there, and then export all packages:
* `virtualenv -ppython3.5 venv && source venv/bin/activate && pip install mitmproxy && pip freeze`
-- Update `latest` tag [https://hub.docker.com/r/mitmproxy/mitmproxy/~/settings/automated-builds/](here)
+ - Tag the commit with the correct version
+ * `2.0.0` for new major versions
+ * `2.0.2` for new patch versions
+ * `2.0` always points to the latest patch version of the `2.0.x` series (update tag + force push)
+- Update `latest` tag [here](https://hub.docker.com/r/mitmproxy/mitmproxy/~/settings/automated-builds/)
After everything is done, you might want to bump the version on master in [https://github.com/mitmproxy/mitmproxy/blob/master/mitmproxy/version.py](mitmproxy/version.py) if you just created a major release.
diff --git a/setup.cfg b/setup.cfg
index d0307bc8..993cad31 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -35,7 +35,6 @@ exclude =
mitmproxy/proxy/server.py
mitmproxy/tools/
mitmproxy/controller.py
- mitmproxy/export.py
mitmproxy/flow.py
mitmproxy/io/compat.py
mitmproxy/master.py
@@ -54,7 +53,6 @@ exclude =
mitmproxy/controller.py
mitmproxy/ctx.py
mitmproxy/exceptions.py
- mitmproxy/export.py
mitmproxy/flow.py
mitmproxy/io/io.py
mitmproxy/io/compat.py
@@ -85,7 +83,6 @@ exclude =
mitmproxy/proxy/root_context.py
mitmproxy/proxy/server.py
mitmproxy/stateobject.py
- mitmproxy/types/multidict.py
mitmproxy/utils/bits.py
pathod/language/actions.py
pathod/language/base.py
diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py
index f71662f0..7ffda317 100644
--- a/test/mitmproxy/addons/test_clientplayback.py
+++ b/test/mitmproxy/addons/test_clientplayback.py
@@ -26,7 +26,7 @@ class TestClientPlayback:
with taddons.context() as tctx:
assert cp.count() == 0
f = tflow.tflow(resp=True)
- cp.load([f])
+ cp.start_replay([f])
assert cp.count() == 1
RP = "mitmproxy.proxy.protocol.http_replay.RequestReplayThread"
with mock.patch(RP) as rp:
@@ -44,13 +44,30 @@ class TestClientPlayback:
cp.tick()
assert cp.current_thread is None
+ cp.start_replay([f])
+ cp.stop_replay()
+ assert not cp.flows
+
+ def test_load_file(self, tmpdir):
+ cp = clientplayback.ClientPlayback()
+ with taddons.context():
+ fpath = str(tmpdir.join("flows"))
+ tdump(fpath, [tflow.tflow(resp=True)])
+ cp.load_file(fpath)
+ assert cp.flows
+ with pytest.raises(exceptions.CommandError):
+ cp.load_file("/nonexistent")
+
def test_configure(self, tmpdir):
cp = clientplayback.ClientPlayback()
with taddons.context() as tctx:
path = str(tmpdir.join("flows"))
tdump(path, [tflow.tflow()])
tctx.configure(cp, client_replay=[path])
+ cp.configured = False
tctx.configure(cp, client_replay=[])
+ cp.configured = False
tctx.configure(cp)
+ cp.configured = False
with pytest.raises(exceptions.OptionsError):
tctx.configure(cp, client_replay=["nonexistent"])
diff --git a/test/mitmproxy/addons/test_core.py b/test/mitmproxy/addons/test_core.py
index 6ebf4ba9..64d0fa19 100644
--- a/test/mitmproxy/addons/test_core.py
+++ b/test/mitmproxy/addons/test_core.py
@@ -1,5 +1,6 @@
from mitmproxy.addons import core
from mitmproxy.test import taddons
+from mitmproxy.test import tflow
from mitmproxy import exceptions
import pytest
@@ -15,3 +16,48 @@ def test_set():
with pytest.raises(exceptions.CommandError):
tctx.command(sa.set, "nonexistent")
+
+
+def test_resume():
+ sa = core.Core()
+ with taddons.context():
+ f = tflow.tflow()
+ assert not sa.resume([f])
+ f.intercept()
+ sa.resume([f])
+ assert not f.reply.state == "taken"
+
+
+def test_mark():
+ sa = core.Core()
+ with taddons.context():
+ f = tflow.tflow()
+ assert not f.marked
+ sa.mark([f], True)
+ assert f.marked
+
+ sa.mark_toggle([f])
+ assert not f.marked
+ sa.mark_toggle([f])
+ assert f.marked
+
+
+def test_kill():
+ sa = core.Core()
+ with taddons.context():
+ f = tflow.tflow()
+ f.intercept()
+ assert f.killable
+ sa.kill([f])
+ assert not f.killable
+
+
+def test_revert():
+ sa = core.Core()
+ with taddons.context():
+ f = tflow.tflow()
+ f.backup()
+ f.request.content = b"bar"
+ assert f.modified()
+ sa.revert([f])
+ assert not f.modified()
diff --git a/test/mitmproxy/addons/test_cut.py b/test/mitmproxy/addons/test_cut.py
new file mode 100644
index 00000000..e028331f
--- /dev/null
+++ b/test/mitmproxy/addons/test_cut.py
@@ -0,0 +1,178 @@
+
+from mitmproxy.addons import cut
+from mitmproxy.addons import view
+from mitmproxy import exceptions
+from mitmproxy import certs
+from mitmproxy.test import taddons
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+import pytest
+from unittest import mock
+
+
+def test_extract():
+ tf = tflow.tflow(resp=True)
+ tests = [
+ ["q.method", "GET"],
+ ["q.scheme", "http"],
+ ["q.host", "address"],
+ ["q.port", "22"],
+ ["q.path", "/path"],
+ ["q.url", "http://address:22/path"],
+ ["q.text", "content"],
+ ["q.content", b"content"],
+ ["q.raw_content", b"content"],
+ ["q.header[header]", "qvalue"],
+
+ ["s.status_code", "200"],
+ ["s.reason", "OK"],
+ ["s.text", "message"],
+ ["s.content", b"message"],
+ ["s.raw_content", b"message"],
+ ["s.header[header-response]", "svalue"],
+
+ ["cc.address.port", "22"],
+ ["cc.address.host", "address"],
+ ["cc.tls_version", "TLSv1.2"],
+ ["cc.sni", "address"],
+ ["cc.ssl_established", "false"],
+
+ ["sc.address.port", "22"],
+ ["sc.address.host", "address"],
+ ["sc.ip_address.host", "192.168.0.1"],
+ ["sc.tls_version", "TLSv1.2"],
+ ["sc.sni", "address"],
+ ["sc.ssl_established", "false"],
+ ]
+ for t in tests:
+ ret = cut.extract(t[0], tf)
+ if ret != t[1]:
+ raise AssertionError("%s: Expected %s, got %s" % (t[0], t[1], ret))
+
+ with open(tutils.test_data.path("mitmproxy/net/data/text_cert"), "rb") as f:
+ d = f.read()
+ c1 = certs.SSLCert.from_pem(d)
+ tf.server_conn.cert = c1
+ assert "CERTIFICATE" in cut.extract("sc.cert", tf)
+
+
+def test_parse_cutspec():
+ tests = [
+ ("", None, True),
+ ("req.method", ("@all", ["req.method"]), False),
+ (
+ "req.method,req.host",
+ ("@all", ["req.method", "req.host"]),
+ False
+ ),
+ (
+ "req.method,req.host|~b foo",
+ ("~b foo", ["req.method", "req.host"]),
+ False
+ ),
+ (
+ "req.method,req.host|~b foo | ~b bar",
+ ("~b foo | ~b bar", ["req.method", "req.host"]),
+ False
+ ),
+ (
+ "req.method, req.host | ~b foo | ~b bar",
+ ("~b foo | ~b bar", ["req.method", "req.host"]),
+ False
+ ),
+ ]
+ for cutspec, output, err in tests:
+ try:
+ assert cut.parse_cutspec(cutspec) == output
+ except exceptions.CommandError:
+ if not err:
+ raise
+ else:
+ if err:
+ raise AssertionError("Expected error.")
+
+
+def test_headername():
+ with pytest.raises(exceptions.CommandError):
+ cut.headername("header[foo.")
+
+
+def qr(f):
+ with open(f, "rb") as fp:
+ return fp.read()
+
+
+def test_cut_clip():
+ v = view.View()
+ c = cut.Cut()
+ with taddons.context() as tctx:
+ tctx.master.addons.add(v, c)
+ v.add([tflow.tflow(resp=True)])
+
+ with mock.patch('pyperclip.copy') as pc:
+ tctx.command(c.clip, "q.method|@all")
+ assert pc.called
+
+ with mock.patch('pyperclip.copy') as pc:
+ tctx.command(c.clip, "q.content|@all")
+ assert pc.called
+
+ with mock.patch('pyperclip.copy') as pc:
+ tctx.command(c.clip, "q.method,q.content|@all")
+ assert pc.called
+
+
+def test_cut_file(tmpdir):
+ f = str(tmpdir.join("path"))
+ v = view.View()
+ c = cut.Cut()
+ with taddons.context() as tctx:
+ tctx.master.addons.add(v, c)
+
+ v.add([tflow.tflow(resp=True)])
+
+ tctx.command(c.save, "q.method|@all", f)
+ assert qr(f) == b"GET"
+ tctx.command(c.save, "q.content|@all", f)
+ assert qr(f) == b"content"
+ tctx.command(c.save, "q.content|@all", "+" + f)
+ assert qr(f) == b"content\ncontent"
+
+ v.add([tflow.tflow(resp=True)])
+ tctx.command(c.save, "q.method|@all", f)
+ assert qr(f).splitlines() == [b"GET", b"GET"]
+ tctx.command(c.save, "q.method,q.content|@all", f)
+ assert qr(f).splitlines() == [b"GET,content", b"GET,content"]
+
+
+def test_cut():
+ v = view.View()
+ c = cut.Cut()
+ with taddons.context() as tctx:
+ v.add([tflow.tflow(resp=True)])
+ tctx.master.addons.add(v, c)
+ assert c.cut("q.method|@all") == [["GET"]]
+ assert c.cut("q.scheme|@all") == [["http"]]
+ assert c.cut("q.host|@all") == [["address"]]
+ assert c.cut("q.port|@all") == [["22"]]
+ assert c.cut("q.path|@all") == [["/path"]]
+ assert c.cut("q.url|@all") == [["http://address:22/path"]]
+ assert c.cut("q.content|@all") == [[b"content"]]
+ assert c.cut("q.header[header]|@all") == [["qvalue"]]
+ assert c.cut("q.header[unknown]|@all") == [[""]]
+
+ assert c.cut("s.status_code|@all") == [["200"]]
+ assert c.cut("s.reason|@all") == [["OK"]]
+ assert c.cut("s.content|@all") == [[b"message"]]
+ assert c.cut("s.header[header-response]|@all") == [["svalue"]]
+ assert c.cut("moo") == [[""]]
+ with pytest.raises(exceptions.CommandError):
+ assert c.cut("__dict__") == [[""]]
+
+ v = view.View()
+ c = cut.Cut()
+ with taddons.context() as tctx:
+ tctx.master.addons.add(v, c)
+ v.add([tflow.ttcpflow()])
+ assert c.cut("q.method|@all") == [[""]]
+ assert c.cut("s.status|@all") == [[""]]
diff --git a/test/mitmproxy/addons/test_export.py b/test/mitmproxy/addons/test_export.py
new file mode 100644
index 00000000..5c7c4976
--- /dev/null
+++ b/test/mitmproxy/addons/test_export.py
@@ -0,0 +1,109 @@
+import pytest
+import os
+
+from mitmproxy import exceptions
+from mitmproxy.addons import export # heh
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+from mitmproxy.test import taddons
+from unittest import mock
+
+
+@pytest.fixture
+def get_request():
+ return tflow.tflow(
+ req=tutils.treq(
+ method=b'GET',
+ content=b'',
+ path=b"/path?a=foo&a=bar&b=baz"
+ )
+ )
+
+
+@pytest.fixture
+def post_request():
+ return tflow.tflow(
+ req=tutils.treq(
+ method=b'POST',
+ headers=(),
+ content=bytes(range(256))
+ )
+ )
+
+
+@pytest.fixture
+def patch_request():
+ return tflow.tflow(
+ req=tutils.treq(method=b'PATCH', path=b"/path?query=param")
+ )
+
+
+@pytest.fixture
+def tcp_flow():
+ return tflow.ttcpflow()
+
+
+class TestExportCurlCommand:
+ def test_get(self, get_request):
+ result = """curl -H 'header:qvalue' -H 'content-length:7' 'http://address:22/path?a=foo&a=bar&b=baz'"""
+ assert export.curl_command(get_request) == result
+
+ def test_post(self, post_request):
+ result = "curl -X POST 'http://address:22/path' --data-binary '{}'".format(
+ str(bytes(range(256)))[2:-1]
+ )
+ assert export.curl_command(post_request) == result
+
+ def test_patch(self, patch_request):
+ result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'"""
+ assert export.curl_command(patch_request) == result
+
+ def test_tcp(self, tcp_flow):
+ with pytest.raises(exceptions.CommandError):
+ export.curl_command(tcp_flow)
+
+
+class TestRaw:
+ def test_get(self, get_request):
+ assert b"header: qvalue" in export.raw(get_request)
+
+ def test_tcp(self, tcp_flow):
+ with pytest.raises(exceptions.CommandError):
+ export.raw(tcp_flow)
+
+
+def qr(f):
+ with open(f, "rb") as fp:
+ return fp.read()
+
+
+def test_export(tmpdir):
+ f = str(tmpdir.join("path"))
+ e = export.Export()
+ with taddons.context():
+ assert e.formats() == ["curl", "raw"]
+ with pytest.raises(exceptions.CommandError):
+ e.file("nonexistent", tflow.tflow(resp=True), f)
+
+ e.file("raw", tflow.tflow(resp=True), f)
+ assert qr(f)
+ os.unlink(f)
+
+ e.file("curl", tflow.tflow(resp=True), f)
+ assert qr(f)
+ os.unlink(f)
+
+
+def test_clip(tmpdir):
+ e = export.Export()
+ with taddons.context():
+ with pytest.raises(exceptions.CommandError):
+ e.clip("nonexistent", tflow.tflow(resp=True))
+
+ with mock.patch('pyperclip.copy') as pc:
+ e.clip("raw", tflow.tflow(resp=True))
+ assert pc.called
+
+ with mock.patch('pyperclip.copy') as pc:
+ e.clip("curl", tflow.tflow(resp=True))
+ assert pc.called
diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py
index 859d99f9..a3df1fcf 100644
--- a/test/mitmproxy/addons/test_script.py
+++ b/test/mitmproxy/addons/test_script.py
@@ -9,9 +9,6 @@ from mitmproxy.test import tutils
from mitmproxy.test import taddons
from mitmproxy import addonmanager
from mitmproxy import exceptions
-from mitmproxy import options
-from mitmproxy import proxy
-from mitmproxy import master
from mitmproxy.addons import script
@@ -48,9 +45,9 @@ def test_script_print_stdout():
class TestScript:
def test_notfound(self):
- with taddons.context() as tctx:
- sc = script.Script("nonexistent")
- tctx.master.addons.add(sc)
+ with taddons.context():
+ with pytest.raises(exceptions.OptionsError):
+ script.Script("nonexistent")
def test_simple(self):
with taddons.context() as tctx:
@@ -136,25 +133,45 @@ class TestCutTraceback:
class TestScriptLoader:
- def test_simple(self):
- o = options.Options(scripts=[])
- m = master.Master(o, proxy.DummyServer())
+ def test_script_run(self):
+ rp = tutils.test_data.path(
+ "mitmproxy/data/addonscripts/recorder/recorder.py"
+ )
sc = script.ScriptLoader()
- sc.running()
- m.addons.add(sc)
- assert len(m.addons) == 1
- o.update(
- scripts = [
- tutils.test_data.path(
- "mitmproxy/data/addonscripts/recorder/recorder.py"
- )
+ with taddons.context() as tctx:
+ sc.script_run([tflow.tflow(resp=True)], rp)
+ debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
+ assert debug == [
+ 'recorder load', 'recorder running', 'recorder configure',
+ 'recorder tick',
+ 'recorder requestheaders', 'recorder request',
+ 'recorder responseheaders', 'recorder response'
]
- )
- assert len(m.addons) == 1
- assert len(sc.addons) == 1
- o.update(scripts = [])
- assert len(m.addons) == 1
- assert len(sc.addons) == 0
+
+ def test_script_run_nonexistent(self):
+ sc = script.ScriptLoader()
+ with taddons.context():
+ with pytest.raises(exceptions.CommandError):
+ sc.script_run([tflow.tflow(resp=True)], "/nonexistent")
+
+ def test_simple(self):
+ sc = script.ScriptLoader()
+ with taddons.context() as tctx:
+ tctx.master.addons.add(sc)
+ sc.running()
+ assert len(tctx.master.addons) == 1
+ tctx.master.options.update(
+ scripts = [
+ tutils.test_data.path(
+ "mitmproxy/data/addonscripts/recorder/recorder.py"
+ )
+ ]
+ )
+ assert len(tctx.master.addons) == 1
+ assert len(sc.addons) == 1
+ tctx.master.options.update(scripts = [])
+ assert len(tctx.master.addons) == 1
+ assert len(sc.addons) == 0
def test_dupes(self):
sc = script.ScriptLoader()
@@ -166,13 +183,6 @@ class TestScriptLoader:
scripts = ["one", "one"]
)
- def test_nonexistent(self):
- sc = script.ScriptLoader()
- with taddons.context() as tctx:
- tctx.master.addons.add(sc)
- tctx.configure(sc, scripts = ["nonexistent"])
- tctx.master.has_log("nonexistent: file not found")
-
def test_order(self):
rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder")
sc = script.ScriptLoader()
diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py
index 29de48a0..3ceab3fa 100644
--- a/test/mitmproxy/addons/test_serverplayback.py
+++ b/test/mitmproxy/addons/test_serverplayback.py
@@ -16,12 +16,24 @@ def tdump(path, flows):
w.add(i)
+def test_load_file(tmpdir):
+ s = serverplayback.ServerPlayback()
+ with taddons.context():
+ fpath = str(tmpdir.join("flows"))
+ tdump(fpath, [tflow.tflow(resp=True)])
+ s.load_file(fpath)
+ assert s.flowmap
+ with pytest.raises(exceptions.CommandError):
+ s.load_file("/nonexistent")
+
+
def test_config(tmpdir):
s = serverplayback.ServerPlayback()
with taddons.context() as tctx:
fpath = str(tmpdir.join("flows"))
tdump(fpath, [tflow.tflow(resp=True)])
tctx.configure(s, server_replay=[fpath])
+ s.configured = False
with pytest.raises(exceptions.OptionsError):
tctx.configure(s, server_replay=[str(tmpdir)])
diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py
index 05d4af30..1724da49 100644
--- a/test/mitmproxy/addons/test_view.py
+++ b/test/mitmproxy/addons/test_view.py
@@ -4,8 +4,8 @@ from mitmproxy.test import tflow
from mitmproxy.addons import view
from mitmproxy import flowfilter
-from mitmproxy import options
from mitmproxy import exceptions
+from mitmproxy import io
from mitmproxy.test import taddons
@@ -26,12 +26,12 @@ def test_order_refresh():
v.sig_view_refresh.connect(save)
tf = tflow.tflow(resp=True)
- with taddons.context(options=options.Options()) as tctx:
+ with taddons.context() as tctx:
tctx.configure(v, console_order="time")
- v.add(tf)
+ v.add([tf])
tf.request.timestamp_start = 1
assert not sargs
- v.update(tf)
+ v.update([tf])
assert sargs
@@ -131,15 +131,50 @@ def test_filter():
assert len(v) == 4
-def test_load():
+def tdump(path, flows):
+ w = io.FlowWriter(open(path, "wb"))
+ for i in flows:
+ w.add(i)
+
+
+def test_create():
+ v = view.View()
+ with taddons.context():
+ v.create("get", "http://foo.com")
+ assert len(v) == 1
+ assert v[0].request.url == "http://foo.com/"
+ v.create("get", "http://foo.com")
+ assert len(v) == 2
+
+
+def test_orders():
v = view.View()
- with taddons.context(options=options.Options()) as tctx:
+ with taddons.context():
+ assert v.order_options()
+
+
+def test_load(tmpdir):
+ path = str(tmpdir.join("path"))
+ v = view.View()
+ with taddons.context() as tctx:
tctx.master.addons.add(v)
+ tdump(
+ path,
+ [
+ tflow.tflow(resp=True),
+ tflow.tflow(resp=True)
+ ]
+ )
+ v.load_file(path)
+ assert len(v) == 2
+ v.load_file(path)
+ assert len(v) == 4
def test_resolve():
v = view.View()
- with taddons.context(options=options.Options()) as tctx:
+ with taddons.context() as tctx:
+ assert tctx.command(v.resolve, "@all") == []
assert tctx.command(v.resolve, "@focus") == []
assert tctx.command(v.resolve, "@shown") == []
assert tctx.command(v.resolve, "@hidden") == []
@@ -149,6 +184,7 @@ def test_resolve():
v.request(tft(method="get"))
assert len(tctx.command(v.resolve, "~m get")) == 1
assert len(tctx.command(v.resolve, "@focus")) == 1
+ assert len(tctx.command(v.resolve, "@all")) == 1
assert len(tctx.command(v.resolve, "@shown")) == 1
assert len(tctx.command(v.resolve, "@unmarked")) == 1
assert tctx.command(v.resolve, "@hidden") == []
@@ -156,6 +192,7 @@ def test_resolve():
v.request(tft(method="put"))
assert len(tctx.command(v.resolve, "@focus")) == 1
assert len(tctx.command(v.resolve, "@shown")) == 2
+ assert len(tctx.command(v.resolve, "@all")) == 2
assert tctx.command(v.resolve, "@hidden") == []
assert tctx.command(v.resolve, "@marked") == []
@@ -175,14 +212,52 @@ def test_resolve():
assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"]
assert m(tctx.command(v.resolve, "@marked")) == ["GET"]
assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"]
+ assert m(tctx.command(v.resolve, "@all")) == ["GET", "PUT", "GET", "PUT"]
with pytest.raises(exceptions.CommandError, match="Invalid flow filter"):
tctx.command(v.resolve, "~")
+def test_go():
+ v = view.View()
+ with taddons.context():
+ v.add([
+ tflow.tflow(),
+ tflow.tflow(),
+ tflow.tflow(),
+ tflow.tflow(),
+ tflow.tflow(),
+ ])
+ assert v.focus.index == 0
+ v.go(-1)
+ assert v.focus.index == 4
+ v.go(0)
+ assert v.focus.index == 0
+ v.go(1)
+ assert v.focus.index == 1
+ v.go(999)
+ assert v.focus.index == 4
+ v.go(-999)
+ assert v.focus.index == 0
+
+
+def test_duplicate():
+ v = view.View()
+ with taddons.context():
+ f = [
+ tflow.tflow(),
+ tflow.tflow(),
+ ]
+ v.add(f)
+ assert len(v) == 2
+ v.duplicate(f)
+ assert len(v) == 4
+ assert v.focus.index == 2
+
+
def test_order():
v = view.View()
- with taddons.context(options=options.Options()) as tctx:
+ with taddons.context() as tctx:
v.request(tft(method="get", start=1))
v.request(tft(method="put", start=2))
v.request(tft(method="get", start=3))
@@ -230,14 +305,14 @@ def test_update():
assert f in v
f.request.method = "put"
- v.update(f)
+ v.update([f])
assert f not in v
f.request.method = "get"
- v.update(f)
+ v.update([f])
assert f in v
- v.update(f)
+ v.update([f])
assert f in v
@@ -276,7 +351,7 @@ def test_signals():
assert not any([rec_add, rec_update, rec_remove, rec_refresh])
# Simple add
- v.add(tft())
+ v.add([tft()])
assert rec_add
assert not any([rec_update, rec_remove, rec_refresh])
@@ -291,14 +366,14 @@ def test_signals():
# An update that results in a flow being added to the view
clearrec()
v[0].request.method = "PUT"
- v.update(v[0])
+ v.update([v[0]])
assert rec_remove
assert not any([rec_update, rec_refresh, rec_add])
# An update that does not affect the view just sends update
v.set_filter(flowfilter.parse("~m put"))
clearrec()
- v.update(v[0])
+ v.update([v[0]])
assert rec_update
assert not any([rec_remove, rec_refresh, rec_add])
@@ -307,33 +382,33 @@ def test_signals():
v.set_filter(flowfilter.parse("~m get"))
assert not len(v)
clearrec()
- v.update(f)
+ v.update([f])
assert not any([rec_add, rec_update, rec_remove, rec_refresh])
def test_focus_follow():
v = view.View()
- with taddons.context(options=options.Options()) as tctx:
+ with taddons.context() as tctx:
tctx.configure(v, console_focus_follow=True, view_filter="~m get")
- v.add(tft(start=5))
+ v.add([tft(start=5)])
assert v.focus.index == 0
- v.add(tft(start=4))
+ v.add([tft(start=4)])
assert v.focus.index == 0
assert v.focus.flow.request.timestamp_start == 4
- v.add(tft(start=7))
+ v.add([tft(start=7)])
assert v.focus.index == 2
assert v.focus.flow.request.timestamp_start == 7
mod = tft(method="put", start=6)
- v.add(mod)
+ v.add([mod])
assert v.focus.index == 2
assert v.focus.flow.request.timestamp_start == 7
mod.request.method = "GET"
- v.update(mod)
+ v.update([mod])
assert v.focus.index == 2
assert v.focus.flow.request.timestamp_start == 6
@@ -341,7 +416,7 @@ def test_focus_follow():
def test_focus():
# Special case - initialising with a view that already contains data
v = view.View()
- v.add(tft())
+ v.add([tft()])
f = view.Focus(v)
assert f.index is 0
assert f.flow is v[0]
@@ -352,7 +427,7 @@ def test_focus():
assert f.index is None
assert f.flow is None
- v.add(tft(start=1))
+ v.add([tft(start=1)])
assert f.index == 0
assert f.flow is v[0]
@@ -362,11 +437,11 @@ def test_focus():
with pytest.raises(ValueError):
f.__setattr__("index", 99)
- v.add(tft(start=0))
+ v.add([tft(start=0)])
assert f.index == 1
assert f.flow is v[1]
- v.add(tft(start=2))
+ v.add([tft(start=2)])
assert f.index == 1
assert f.flow is v[1]
@@ -374,22 +449,25 @@ def test_focus():
assert f.index == 0
f.index = 1
- v.remove(v[1])
+ v.remove([v[1]])
+ v[1].intercept()
assert f.index == 1
assert f.flow is v[1]
- v.remove(v[1])
+ v.remove([v[1]])
assert f.index == 0
assert f.flow is v[0]
- v.remove(v[0])
+ v.remove([v[0]])
assert f.index is None
assert f.flow is None
- v.add(tft(method="get", start=0))
- v.add(tft(method="get", start=1))
- v.add(tft(method="put", start=2))
- v.add(tft(method="get", start=3))
+ v.add([
+ tft(method="get", start=0),
+ tft(method="get", start=1),
+ tft(method="put", start=2),
+ tft(method="get", start=3),
+ ])
f.flow = v[2]
assert f.flow.request.method == "PUT"
@@ -409,16 +487,16 @@ def test_settings():
with pytest.raises(KeyError):
v.settings[f]
- v.add(f)
+ v.add([f])
v.settings[f]["foo"] = "bar"
assert v.settings[f]["foo"] == "bar"
assert len(list(v.settings)) == 1
- v.remove(f)
+ v.remove([f])
with pytest.raises(KeyError):
v.settings[f]
assert not v.settings.keys()
- v.add(f)
+ v.add([f])
v.settings[f]["foo"] = "bar"
assert v.settings.keys()
v.clear()
@@ -427,7 +505,7 @@ def test_settings():
def test_configure():
v = view.View()
- with taddons.context(options=options.Options()) as tctx:
+ with taddons.context() as tctx:
tctx.configure(v, view_filter="~q")
with pytest.raises(Exception, match="Invalid interception filter"):
tctx.configure(v, view_filter="~~")
diff --git a/test/mitmproxy/console/test_flowlist.py b/test/mitmproxy/console/test_flowlist.py
index d63dab1c..6d82749d 100644
--- a/test/mitmproxy/console/test_flowlist.py
+++ b/test/mitmproxy/console/test_flowlist.py
@@ -1,4 +1,3 @@
-from unittest import mock
import urwid
import mitmproxy.tools.console.flowlist as flowlist
@@ -14,13 +13,6 @@ class TestFlowlist:
o = options.Options(**opts)
return console.master.ConsoleMaster(o, proxy.DummyServer())
- def test_new_request(self):
- m = self.mkmaster()
- x = flowlist.FlowListBox(m)
- with mock.patch('mitmproxy.tools.console.signals.status_message.send') as mock_thing:
- x.new_request("nonexistent url", "GET")
- mock_thing.assert_called_once_with(message="Invalid URL: No hostname given")
-
def test_logbuffer_set_focus(self):
m = self.mkmaster()
b = flowlist.LogBufferBox(m)
diff --git a/test/mitmproxy/data/test_flow_export/locust_get.py b/test/mitmproxy/data/test_flow_export/locust_get.py
deleted file mode 100644
index 632d5d53..00000000
--- a/test/mitmproxy/data/test_flow_export/locust_get.py
+++ /dev/null
@@ -1,35 +0,0 @@
-from locust import HttpLocust, TaskSet, task
-
-class UserBehavior(TaskSet):
- def on_start(self):
- ''' on_start is called when a Locust start before any task is scheduled '''
- self.path()
-
- @task()
- def path(self):
- url = self.locust.host + '/path'
-
- headers = {
- 'header': 'qvalue',
- 'content-length': '7',
- }
-
- params = {
- 'a': ['foo', 'bar'],
- 'b': 'baz',
- }
-
- self.response = self.client.request(
- method='GET',
- url=url,
- headers=headers,
- params=params,
- )
-
- ### Additional tasks can go here ###
-
-
-class WebsiteUser(HttpLocust):
- task_set = UserBehavior
- min_wait = 1000
- max_wait = 3000
diff --git a/test/mitmproxy/data/test_flow_export/locust_patch.py b/test/mitmproxy/data/test_flow_export/locust_patch.py
deleted file mode 100644
index f64e0857..00000000
--- a/test/mitmproxy/data/test_flow_export/locust_patch.py
+++ /dev/null
@@ -1,37 +0,0 @@
-from locust import HttpLocust, TaskSet, task
-
-class UserBehavior(TaskSet):
- def on_start(self):
- ''' on_start is called when a Locust start before any task is scheduled '''
- self.path()
-
- @task()
- def path(self):
- url = self.locust.host + '/path'
-
- headers = {
- 'header': 'qvalue',
- 'content-length': '7',
- }
-
- params = {
- 'query': 'param',
- }
-
- data = '''content'''
-
- self.response = self.client.request(
- method='PATCH',
- url=url,
- headers=headers,
- params=params,
- data=data,
- )
-
- ### Additional tasks can go here ###
-
-
-class WebsiteUser(HttpLocust):
- task_set = UserBehavior
- min_wait = 1000
- max_wait = 3000
diff --git a/test/mitmproxy/data/test_flow_export/locust_post.py b/test/mitmproxy/data/test_flow_export/locust_post.py
deleted file mode 100644
index df23476a..00000000
--- a/test/mitmproxy/data/test_flow_export/locust_post.py
+++ /dev/null
@@ -1,26 +0,0 @@
-from locust import HttpLocust, TaskSet, task
-
-class UserBehavior(TaskSet):
- def on_start(self):
- ''' on_start is called when a Locust start before any task is scheduled '''
- self.path()
-
- @task()
- def path(self):
- url = self.locust.host + '/path'
-
- data = '''content'''
-
- self.response = self.client.request(
- method='POST',
- url=url,
- data=data,
- )
-
- ### Additional tasks can go here ###
-
-
-class WebsiteUser(HttpLocust):
- task_set = UserBehavior
- min_wait = 1000
- max_wait = 3000
diff --git a/test/mitmproxy/data/test_flow_export/locust_task_get.py b/test/mitmproxy/data/test_flow_export/locust_task_get.py
deleted file mode 100644
index 03821cd8..00000000
--- a/test/mitmproxy/data/test_flow_export/locust_task_get.py
+++ /dev/null
@@ -1,20 +0,0 @@
- @task()
- def path(self):
- url = self.locust.host + '/path'
-
- headers = {
- 'header': 'qvalue',
- 'content-length': '7',
- }
-
- params = {
- 'a': ['foo', 'bar'],
- 'b': 'baz',
- }
-
- self.response = self.client.request(
- method='GET',
- url=url,
- headers=headers,
- params=params,
- )
diff --git a/test/mitmproxy/data/test_flow_export/locust_task_patch.py b/test/mitmproxy/data/test_flow_export/locust_task_patch.py
deleted file mode 100644
index d425209c..00000000
--- a/test/mitmproxy/data/test_flow_export/locust_task_patch.py
+++ /dev/null
@@ -1,22 +0,0 @@
- @task()
- def path(self):
- url = self.locust.host + '/path'
-
- headers = {
- 'header': 'qvalue',
- 'content-length': '7',
- }
-
- params = {
- 'query': 'param',
- }
-
- data = '''content'''
-
- self.response = self.client.request(
- method='PATCH',
- url=url,
- headers=headers,
- params=params,
- data=data,
- )
diff --git a/test/mitmproxy/data/test_flow_export/locust_task_post.py b/test/mitmproxy/data/test_flow_export/locust_task_post.py
deleted file mode 100644
index a5f307ee..00000000
--- a/test/mitmproxy/data/test_flow_export/locust_task_post.py
+++ /dev/null
@@ -1,11 +0,0 @@
- @task()
- def path(self):
- url = self.locust.host + '/path'
-
- data = '''\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff'''
-
- self.response = self.client.request(
- method='POST',
- url=url,
- data=data,
- )
diff --git a/test/mitmproxy/data/test_flow_export/python_get.py b/test/mitmproxy/data/test_flow_export/python_get.py
deleted file mode 100644
index e9ed072a..00000000
--- a/test/mitmproxy/data/test_flow_export/python_get.py
+++ /dev/null
@@ -1,9 +0,0 @@
-import requests
-
-response = requests.get(
- 'http://address:22/path',
- params=[('a', 'foo'), ('a', 'bar'), ('b', 'baz')],
- headers={'header': 'qvalue'}
-)
-
-print(response.text) \ No newline at end of file
diff --git a/test/mitmproxy/data/test_flow_export/python_patch.py b/test/mitmproxy/data/test_flow_export/python_patch.py
deleted file mode 100644
index d83a57b9..00000000
--- a/test/mitmproxy/data/test_flow_export/python_patch.py
+++ /dev/null
@@ -1,10 +0,0 @@
-import requests
-
-response = requests.patch(
- 'http://address:22/path',
- params=[('query', 'param')],
- headers={'header': 'qvalue'},
- data=b'content'
-)
-
-print(response.text) \ No newline at end of file
diff --git a/test/mitmproxy/data/test_flow_export/python_post.py b/test/mitmproxy/data/test_flow_export/python_post.py
deleted file mode 100644
index 42f1af9a..00000000
--- a/test/mitmproxy/data/test_flow_export/python_post.py
+++ /dev/null
@@ -1,17 +0,0 @@
-import requests
-
-response = requests.post(
- 'http://address:22/path',
- data=(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13'
- b'\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./01234567'
- b'89:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f'
- b'\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f'
- b'\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f'
- b'\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf'
- b'\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf'
- b'\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf'
- b'\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf'
- b'\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef'
- b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff')
-)
-print(response.text)
diff --git a/test/mitmproxy/data/test_flow_export/python_post_json.py b/test/mitmproxy/data/test_flow_export/python_post_json.py
deleted file mode 100644
index d6ae6357..00000000
--- a/test/mitmproxy/data/test_flow_export/python_post_json.py
+++ /dev/null
@@ -1,9 +0,0 @@
-import requests
-
-response = requests.post(
- 'http://address:22/path',
- headers={'content-type': 'application/json'},
- json={'email': 'example@example.com', 'name': 'example'}
-)
-
-print(response.text) \ No newline at end of file
diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py
index 64928dbf..958328b2 100644
--- a/test/mitmproxy/test_command.py
+++ b/test/mitmproxy/test_command.py
@@ -1,12 +1,10 @@
import typing
from mitmproxy import command
from mitmproxy import flow
-from mitmproxy import master
-from mitmproxy import options
-from mitmproxy import proxy
from mitmproxy import exceptions
from mitmproxy.test import tflow
from mitmproxy.test import taddons
+import io
import pytest
@@ -18,24 +16,41 @@ class TAddon:
def cmd2(self, foo: str) -> str:
return 99
+ def cmd3(self, foo: int) -> int:
+ return foo
+
def empty(self) -> None:
pass
+ def varargs(self, one: str, *var: typing.Sequence[str]) -> typing.Sequence[str]:
+ return list(var)
+
class TestCommand:
+ def test_varargs(self):
+ with taddons.context() as tctx:
+ cm = command.CommandManager(tctx.master)
+ a = TAddon()
+ c = command.Command(cm, "varargs", a.varargs)
+ assert c.signature_help() == "varargs str *str -> [str]"
+ assert c.call(["one", "two", "three"]) == ["two", "three"]
+ with pytest.raises(exceptions.CommandError):
+ c.call(["one", "two", 3])
+
def test_call(self):
- o = options.Options()
- m = master.Master(o, proxy.DummyServer(o))
- cm = command.CommandManager(m)
+ with taddons.context() as tctx:
+ cm = command.CommandManager(tctx.master)
+ a = TAddon()
+ c = command.Command(cm, "cmd.path", a.cmd1)
+ assert c.call(["foo"]) == "ret foo"
+ assert c.signature_help() == "cmd.path str -> str"
- a = TAddon()
- c = command.Command(cm, "cmd.path", a.cmd1)
- assert c.call(["foo"]) == "ret foo"
- assert c.signature_help() == "cmd.path str -> str"
+ c = command.Command(cm, "cmd.two", a.cmd2)
+ with pytest.raises(exceptions.CommandError):
+ c.call(["foo"])
- c = command.Command(cm, "cmd.two", a.cmd2)
- with pytest.raises(exceptions.CommandError):
- c.call(["foo"])
+ c = command.Command(cm, "cmd.three", a.cmd3)
+ assert c.call(["1"]) == 1
def test_simple():
@@ -55,27 +70,48 @@ def test_simple():
c.add("empty", a.empty)
c.call("empty")
+ fp = io.StringIO()
+ c.dump(fp)
+ assert fp.getvalue()
+
def test_typename():
assert command.typename(str, True) == "str"
assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]"
assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec"
+
+ assert command.typename(command.Cuts, False) == "cutspec"
+ assert command.typename(command.Cuts, True) == "[cuts]"
+
assert command.typename(flow.Flow, False) == "flow"
+ assert command.typename(typing.Sequence[str], False) == "[str]"
class DummyConsole:
- def load(self, l):
- l.add_command("console.resolve", self.resolve)
-
+ @command.command("view.resolve")
def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
n = int(spec)
return [tflow.tflow(resp=True)] * n
+ @command.command("cut")
+ def cut(self, spec: str) -> command.Cuts:
+ return [["test"]]
+
def test_parsearg():
with taddons.context() as tctx:
tctx.master.addons.add(DummyConsole())
assert command.parsearg(tctx.master.commands, "foo", str) == "foo"
+
+ assert command.parsearg(tctx.master.commands, "1", int) == 1
+ with pytest.raises(exceptions.CommandError):
+ command.parsearg(tctx.master.commands, "foo", int)
+
+ assert command.parsearg(tctx.master.commands, "true", bool) is True
+ assert command.parsearg(tctx.master.commands, "false", bool) is False
+ with pytest.raises(exceptions.CommandError):
+ command.parsearg(tctx.master.commands, "flobble", bool)
+
assert len(command.parsearg(
tctx.master.commands, "2", typing.Sequence[flow.Flow]
)) == 2
@@ -87,6 +123,17 @@ def test_parsearg():
with pytest.raises(exceptions.CommandError):
command.parsearg(tctx.master.commands, "foo", Exception)
+ assert command.parsearg(
+ tctx.master.commands, "foo", command.Cuts
+ ) == [["test"]]
+
+ assert command.parsearg(
+ tctx.master.commands, "foo", typing.Sequence[str]
+ ) == ["foo"]
+ assert command.parsearg(
+ tctx.master.commands, "foo, bar", typing.Sequence[str]
+ ) == ["foo", "bar"]
+
class TDec:
@command.command("cmd1")
diff --git a/test/mitmproxy/test_connections.py b/test/mitmproxy/test_connections.py
index 67a6552f..e320885d 100644
--- a/test/mitmproxy/test_connections.py
+++ b/test/mitmproxy/test_connections.py
@@ -99,7 +99,7 @@ class TestServerConnection:
c.alpn_proto_negotiated = b'h2'
assert 'address:22' in repr(c)
assert 'ALPN' in repr(c)
- assert 'TLS: foobar' in repr(c)
+ assert 'TLSv1.2: foobar' in repr(c)
c.sni = None
c.tls_established = True
diff --git a/test/mitmproxy/test_export.py b/test/mitmproxy/test_export.py
deleted file mode 100644
index b789e6b5..00000000
--- a/test/mitmproxy/test_export.py
+++ /dev/null
@@ -1,133 +0,0 @@
-import re
-
-import pytest
-
-from mitmproxy import export # heh
-from mitmproxy.net.http import Headers
-from mitmproxy.test import tflow
-from mitmproxy.test import tutils
-
-
-def clean_blanks(s):
- return re.sub(r"^\s+", "", s, flags=re.MULTILINE)
-
-
-def python_equals(testdata, text):
- """
- Compare two bits of Python code, disregarding non-significant differences
- like whitespace on blank lines and trailing space.
- """
- d = open(tutils.test_data.path(testdata)).read()
- assert clean_blanks(text).rstrip() == clean_blanks(d).rstrip()
-
-
-@pytest.fixture
-def get_request():
- return tflow.tflow(
- req=tutils.treq(
- method=b'GET',
- content=b'',
- path=b"/path?a=foo&a=bar&b=baz"
- )
- )
-
-
-@pytest.fixture
-def post_request():
- return tflow.tflow(
- req=tutils.treq(
- method=b'POST',
- headers=(),
- content=bytes(range(256))
- )
- )
-
-
-@pytest.fixture
-def patch_request():
- return tflow.tflow(
- req=tutils.treq(method=b'PATCH', path=b"/path?query=param")
- )
-
-
-class TExport:
- def test_get(self, get_request):
- raise NotImplementedError()
-
- def test_post(self, post_request):
- raise NotImplementedError()
-
- def test_patch(self, patch_request):
- raise NotImplementedError()
-
-
-class TestExportCurlCommand(TExport):
- def test_get(self, get_request):
- result = """curl -H 'header:qvalue' -H 'content-length:7' 'http://address:22/path?a=foo&a=bar&b=baz'"""
- assert export.curl_command(get_request) == result
-
- def test_post(self, post_request):
- result = "curl -X POST 'http://address:22/path' --data-binary '{}'".format(
- str(bytes(range(256)))[2:-1]
- )
- assert export.curl_command(post_request) == result
-
- def test_patch(self, patch_request):
- result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'"""
- assert export.curl_command(patch_request) == result
-
-
-class TestExportPythonCode(TExport):
- def test_get(self, get_request):
- python_equals("mitmproxy/data/test_flow_export/python_get.py",
- export.python_code(get_request))
-
- def test_post(self, post_request):
- python_equals("mitmproxy/data/test_flow_export/python_post.py",
- export.python_code(post_request))
-
- def test_post_json(self, post_request):
- post_request.request.content = b'{"name": "example", "email": "example@example.com"}'
- post_request.request.headers = Headers(content_type="application/json")
- python_equals("mitmproxy/data/test_flow_export/python_post_json.py",
- export.python_code(post_request))
-
- def test_patch(self, patch_request):
- python_equals("mitmproxy/data/test_flow_export/python_patch.py",
- export.python_code(patch_request))
-
-
-class TestExportLocustCode(TExport):
- def test_get(self, get_request):
- python_equals("mitmproxy/data/test_flow_export/locust_get.py",
- export.locust_code(get_request))
-
- def test_post(self, post_request):
- post_request.request.content = b'content'
- post_request.request.headers.clear()
- python_equals("mitmproxy/data/test_flow_export/locust_post.py",
- export.locust_code(post_request))
-
- def test_patch(self, patch_request):
- python_equals("mitmproxy/data/test_flow_export/locust_patch.py",
- export.locust_code(patch_request))
-
-
-class TestExportLocustTask(TExport):
- def test_get(self, get_request):
- python_equals("mitmproxy/data/test_flow_export/locust_task_get.py",
- export.locust_task(get_request))
-
- def test_post(self, post_request):
- python_equals("mitmproxy/data/test_flow_export/locust_task_post.py",
- export.locust_task(post_request))
-
- def test_patch(self, patch_request):
- python_equals("mitmproxy/data/test_flow_export/locust_task_patch.py",
- export.locust_task(patch_request))
-
-
-class TestURL:
- def test_url(self):
- flow = tflow.tflow()
- assert export.url(flow) == "http://address:22/path"
diff --git a/test/mitmproxy/test_flow.py b/test/mitmproxy/test_flow.py
index 78f893c0..19f0e7d9 100644
--- a/test/mitmproxy/test_flow.py
+++ b/test/mitmproxy/test_flow.py
@@ -113,10 +113,6 @@ class TestFlowMaster:
with pytest.raises(Exception, match="live"):
fm.replay_request(f)
- def test_create_flow(self):
- fm = master.Master(None, DummyServer())
- assert fm.create_request("GET", "http://example.com/")
-
def test_all(self):
s = tservers.TestState()
fm = master.Master(None, DummyServer())
diff --git a/test/mitmproxy/tools/web/test_app.py b/test/mitmproxy/tools/web/test_app.py
index e3d5dc44..2b6181d3 100644
--- a/test/mitmproxy/tools/web/test_app.py
+++ b/test/mitmproxy/tools/web/test_app.py
@@ -23,8 +23,8 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
m = webmaster.WebMaster(o, proxy.DummyServer(), with_termlog=False)
f = tflow.tflow(resp=True)
f.id = "42"
- m.view.add(f)
- m.view.add(tflow.tflow(err=True))
+ m.view.add([f])
+ m.view.add([tflow.tflow(err=True)])
m.add_log("test log", "info")
self.master = m
self.view = m.view
@@ -78,7 +78,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
# restore
for f in flows:
- self.view.add(f)
+ self.view.add([f])
self.events.data = events
def test_resume(self):
@@ -110,7 +110,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
assert self.fetch("/flows/42", method="DELETE").code == 200
assert not self.view.get_by_id("42")
- self.view.add(f)
+ self.view.add([f])
assert self.fetch("/flows/1234", method="DELETE").code == 404
@@ -162,7 +162,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
f = self.view.get_by_id(resp.body.decode())
assert f
assert f.id != "42"
- self.view.remove(f)
+ self.view.remove([f])
def test_flow_revert(self):
f = self.view.get_by_id("42")
diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py
index 22bd7c34..fe33070e 100644
--- a/test/mitmproxy/utils/test_typecheck.py
+++ b/test/mitmproxy/utils/test_typecheck.py
@@ -4,6 +4,7 @@ from unittest import mock
import pytest
from mitmproxy.utils import typecheck
+from mitmproxy import command
class TBase:
@@ -87,15 +88,26 @@ def test_check_any():
typecheck.check_option_type("foo", None, typing.Any)
-def test_check_command_return_type():
- assert(typecheck.check_command_return_type("foo", str))
- assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str]))
- assert(typecheck.check_command_return_type(None, None))
- assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int]))
- assert(not typecheck.check_command_return_type("foo", typing.Sequence[int]))
+def test_check_command_type():
+ assert(typecheck.check_command_type("foo", str))
+ assert(typecheck.check_command_type(["foo"], typing.Sequence[str]))
+ assert(not typecheck.check_command_type(["foo", 1], typing.Sequence[str]))
+ assert(typecheck.check_command_type(None, None))
+ assert(not typecheck.check_command_type(["foo"], typing.Sequence[int]))
+ assert(not typecheck.check_command_type("foo", typing.Sequence[int]))
+ assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts))
+ assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts))
+ assert(not typecheck.check_command_type([["foo", 22]], command.Cuts))
# Python 3.5 only defines __parameters__
m = mock.Mock()
m.__str__ = lambda self: "typing.Sequence"
m.__parameters__ = (int,)
- typecheck.check_command_return_type([10], m)
+
+ typecheck.check_command_type([10], m)
+
+ # Python 3.5 only defines __union_params__
+ m = mock.Mock()
+ m.__str__ = lambda self: "typing.Union"
+ m.__union_params__ = (int,)
+ assert not typecheck.check_command_type([22], m)