aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/addons/anticache.py9
-rw-r--r--mitmproxy/addons/anticomp.py9
-rw-r--r--mitmproxy/addons/disable_h2c.py3
-rw-r--r--mitmproxy/addons/dumper.py32
-rw-r--r--mitmproxy/addons/onboarding.py5
-rw-r--r--mitmproxy/addons/proxyauth.py6
-rw-r--r--mitmproxy/addons/readfile.py12
-rw-r--r--mitmproxy/addons/script.py7
-rw-r--r--mitmproxy/addons/serverplayback.py27
-rw-r--r--mitmproxy/addons/termlog.py7
-rw-r--r--mitmproxy/addons/termstatus.py9
-rw-r--r--mitmproxy/addons/upstream_auth.py6
-rw-r--r--mitmproxy/ctx.py3
-rw-r--r--mitmproxy/master.py2
-rw-r--r--test/mitmproxy/addons/test_proxyauth.py3
-rw-r--r--test/mitmproxy/addons/test_serverplayback.py293
-rw-r--r--test/mitmproxy/addons/test_termstatus.py1
17 files changed, 199 insertions, 235 deletions
diff --git a/mitmproxy/addons/anticache.py b/mitmproxy/addons/anticache.py
index 8d748a21..5b34d5a5 100644
--- a/mitmproxy/addons/anticache.py
+++ b/mitmproxy/addons/anticache.py
@@ -1,10 +1,7 @@
-class AntiCache:
- def __init__(self):
- self.enabled = False
+from mitmproxy import ctx
- def configure(self, options, updated):
- self.enabled = options.anticache
+class AntiCache:
def request(self, flow):
- if self.enabled:
+ if ctx.options.anticache:
flow.request.anticache()
diff --git a/mitmproxy/addons/anticomp.py b/mitmproxy/addons/anticomp.py
index eaf01296..d7d1ca8d 100644
--- a/mitmproxy/addons/anticomp.py
+++ b/mitmproxy/addons/anticomp.py
@@ -1,10 +1,7 @@
-class AntiComp:
- def __init__(self):
- self.enabled = False
+from mitmproxy import ctx
- def configure(self, options, updated):
- self.enabled = options.anticomp
+class AntiComp:
def request(self, flow):
- if self.enabled:
+ if ctx.options.anticomp:
flow.request.anticomp()
diff --git a/mitmproxy/addons/disable_h2c.py b/mitmproxy/addons/disable_h2c.py
index b43d5207..392a29a5 100644
--- a/mitmproxy/addons/disable_h2c.py
+++ b/mitmproxy/addons/disable_h2c.py
@@ -14,9 +14,6 @@ class DisableH2C:
by sending the connection preface. We just kill those flows.
"""
- def configure(self, options, updated):
- pass
-
def process_flow(self, f):
if f.request.headers.get('upgrade', '') == 'h2c':
mitmproxy.ctx.log.warn("HTTP/2 cleartext connections (h2c upgrade requests) are currently not supported.")
diff --git a/mitmproxy/addons/dumper.py b/mitmproxy/addons/dumper.py
index 5fd8408f..587bb29b 100644
--- a/mitmproxy/addons/dumper.py
+++ b/mitmproxy/addons/dumper.py
@@ -29,10 +29,7 @@ def colorful(line, styles):
class Dumper:
def __init__(self, outfile=sys.stdout):
self.filter = None # type: flowfilter.TFilter
- self.flow_detail = None # type: int
self.outfp = outfile # type: typing.io.TextIO
- self.showhost = None # type: bool
- self.default_contentview = "auto" # type: str
def configure(self, options, updated):
if "view_filter" in updated:
@@ -44,9 +41,6 @@ class Dumper:
)
else:
self.filter = None
- self.flow_detail = options.flow_detail
- self.showhost = options.showhost
- self.default_contentview = options.default_contentview
def echo(self, text, ident=None, **style):
if ident:
@@ -67,13 +61,13 @@ class Dumper:
def _echo_message(self, message):
_, lines, error = contentviews.get_message_content_view(
- self.default_contentview,
+ ctx.options.default_contentview,
message
)
if error:
ctx.log.debug(error)
- if self.flow_detail == 3:
+ if ctx.options.flow_detail == 3:
lines_to_echo = itertools.islice(lines, 70)
else:
lines_to_echo = lines
@@ -95,7 +89,7 @@ class Dumper:
if next(lines, None):
self.echo("(cut off)", ident=4, dim=True)
- if self.flow_detail >= 2:
+ if ctx.options.flow_detail >= 2:
self.echo("")
def _echo_request_line(self, flow):
@@ -121,12 +115,12 @@ class Dumper:
fg=method_color,
bold=True
)
- if self.showhost:
+ if ctx.options.showhost:
url = flow.request.pretty_url
else:
url = flow.request.url
terminalWidthLimit = max(shutil.get_terminal_size()[0] - 25, 50)
- if self.flow_detail < 1 and len(url) > terminalWidthLimit:
+ if ctx.options.flow_detail < 1 and len(url) > terminalWidthLimit:
url = url[:terminalWidthLimit] + "…"
url = click.style(strutils.escape_control_characters(url), bold=True)
@@ -176,7 +170,7 @@ class Dumper:
size = click.style(size, bold=True)
arrows = click.style(" <<", bold=True)
- if self.flow_detail == 1:
+ if ctx.options.flow_detail == 1:
# This aligns the HTTP response code with the HTTP request method:
# 127.0.0.1:59519: GET http://example.com/
# << 304 Not Modified 0b
@@ -194,16 +188,16 @@ class Dumper:
def echo_flow(self, f):
if f.request:
self._echo_request_line(f)
- if self.flow_detail >= 2:
+ if ctx.options.flow_detail >= 2:
self._echo_headers(f.request.headers)
- if self.flow_detail >= 3:
+ if ctx.options.flow_detail >= 3:
self._echo_message(f.request)
if f.response:
self._echo_response_line(f)
- if self.flow_detail >= 2:
+ if ctx.options.flow_detail >= 2:
self._echo_headers(f.response.headers)
- if self.flow_detail >= 3:
+ if ctx.options.flow_detail >= 3:
self._echo_message(f.response)
if f.error:
@@ -211,7 +205,7 @@ class Dumper:
self.echo(" << {}".format(msg), bold=True, fg="red")
def match(self, f):
- if self.flow_detail == 0:
+ if ctx.options.flow_detail == 0:
return False
if not self.filter:
return True
@@ -239,7 +233,7 @@ class Dumper:
if self.match(f):
message = f.messages[-1]
self.echo(f.message_info(message))
- if self.flow_detail >= 3:
+ if ctx.options.flow_detail >= 3:
self._echo_message(message)
def websocket_end(self, f):
@@ -267,5 +261,5 @@ class Dumper:
server=repr(f.server_conn.address),
direction=direction,
))
- if self.flow_detail >= 3:
+ if ctx.options.flow_detail >= 3:
self._echo_message(message)
diff --git a/mitmproxy/addons/onboarding.py b/mitmproxy/addons/onboarding.py
index 6552ec9e..e67beadd 100644
--- a/mitmproxy/addons/onboarding.py
+++ b/mitmproxy/addons/onboarding.py
@@ -1,5 +1,6 @@
from mitmproxy.addons import wsgiapp
from mitmproxy.addons.onboardingapp import app
+from mitmproxy import ctx
class Onboarding(wsgiapp.WSGIApp):
@@ -7,13 +8,11 @@ class Onboarding(wsgiapp.WSGIApp):
def __init__(self):
super().__init__(app.Adapter(app.application), None, None)
- self.enabled = False
def configure(self, options, updated):
self.host = options.onboarding_host
self.port = options.onboarding_port
- self.enabled = options.onboarding
def request(self, f):
- if self.enabled:
+ if ctx.options.onboarding:
super().request(f)
diff --git a/mitmproxy/addons/proxyauth.py b/mitmproxy/addons/proxyauth.py
index 43677f61..5fed266e 100644
--- a/mitmproxy/addons/proxyauth.py
+++ b/mitmproxy/addons/proxyauth.py
@@ -10,6 +10,7 @@ import mitmproxy.net.http
from mitmproxy import connections # noqa
from mitmproxy import exceptions
from mitmproxy import http
+from mitmproxy import ctx
from mitmproxy.net.http import status_codes
REALM = "mitmproxy"
@@ -45,7 +46,6 @@ class ProxyAuth:
self.nonanonymous = False
self.htpasswd = None
self.singleuser = None
- self.mode = None
self.authenticated = weakref.WeakKeyDictionary() # type: MutableMapping[connections.ClientConnection, Tuple[str, str]]
"""Contains all connections that are permanently authenticated after an HTTP CONNECT"""
@@ -58,7 +58,7 @@ class ProxyAuth:
- True, if authentication is done as if mitmproxy is a proxy
- False, if authentication is done as if mitmproxy is a HTTP server
"""
- return self.mode in ("regular", "upstream")
+ return ctx.options.mode in ("regular", "upstream")
def which_auth_header(self) -> str:
if self.is_proxy_auth():
@@ -136,8 +136,6 @@ class ProxyAuth:
"Invalid single-user auth specification."
)
self.singleuser = parts
- if "mode" in updated:
- self.mode = options.mode
if self.enabled():
if options.mode == "transparent":
raise exceptions.OptionsError(
diff --git a/mitmproxy/addons/readfile.py b/mitmproxy/addons/readfile.py
index 03dcd084..949da15d 100644
--- a/mitmproxy/addons/readfile.py
+++ b/mitmproxy/addons/readfile.py
@@ -9,9 +9,6 @@ class ReadFile:
"""
An addon that handles reading from file on startup.
"""
- def __init__(self):
- self.path = None
-
def load_flows_file(self, path: str) -> int:
path = os.path.expanduser(path)
cnt = 0
@@ -31,16 +28,11 @@ class ReadFile:
ctx.log.error("Flow file corrupted.")
raise exceptions.FlowReadException(v)
- def configure(self, options, updated):
- if "rfile" in updated and options.rfile:
- self.path = options.rfile
-
def running(self):
- if self.path:
+ if ctx.options.rfile:
try:
- self.load_flows_file(self.path)
+ self.load_flows_file(ctx.options.rfile)
except exceptions.FlowReadException as v:
raise exceptions.OptionsError(v)
finally:
- self.path = None
ctx.master.addons.trigger("processing_complete")
diff --git a/mitmproxy/addons/script.py b/mitmproxy/addons/script.py
index 5099e62c..0cca3eac 100644
--- a/mitmproxy/addons/script.py
+++ b/mitmproxy/addons/script.py
@@ -36,7 +36,6 @@ class Script:
self.path = path
self.ns = None
- self.last_options = None
self.last_load = 0
self.last_mtime = 0
@@ -60,14 +59,12 @@ class Script:
ctx.master.addons.invoke_addon(
self.ns,
"configure",
- self.last_options,
- self.last_options.keys()
+ ctx.options,
+ ctx.options.keys()
)
self.last_load = time.time()
self.last_mtime = mtime
- def configure(self, options, updated):
- self.last_options = options
class ScriptLoader:
diff --git a/mitmproxy/addons/serverplayback.py b/mitmproxy/addons/serverplayback.py
index 7bb0c716..7d2aa15f 100644
--- a/mitmproxy/addons/serverplayback.py
+++ b/mitmproxy/addons/serverplayback.py
@@ -10,8 +10,6 @@ from mitmproxy import io
class ServerPlayback:
def __init__(self):
- self.options = None
-
self.flowmap = {}
self.stop = False
self.final_flow = None
@@ -38,27 +36,27 @@ class ServerPlayback:
queriesArray = urllib.parse.parse_qsl(query, keep_blank_values=True)
key = [str(r.port), str(r.scheme), str(r.method), str(path)] # type: List[Any]
- if not self.options.server_replay_ignore_content:
- if self.options.server_replay_ignore_payload_params and r.multipart_form:
+ if not ctx.options.server_replay_ignore_content:
+ if ctx.options.server_replay_ignore_payload_params and r.multipart_form:
key.extend(
(k, v)
for k, v in r.multipart_form.items(multi=True)
- if k.decode(errors="replace") not in self.options.server_replay_ignore_payload_params
+ if k.decode(errors="replace") not in ctx.options.server_replay_ignore_payload_params
)
- elif self.options.server_replay_ignore_payload_params and r.urlencoded_form:
+ elif ctx.options.server_replay_ignore_payload_params and r.urlencoded_form:
key.extend(
(k, v)
for k, v in r.urlencoded_form.items(multi=True)
- if k not in self.options.server_replay_ignore_payload_params
+ if k not in ctx.options.server_replay_ignore_payload_params
)
else:
key.append(str(r.raw_content))
- if not self.options.server_replay_ignore_host:
+ if not ctx.options.server_replay_ignore_host:
key.append(r.host)
filtered = []
- ignore_params = self.options.server_replay_ignore_params or []
+ ignore_params = ctx.options.server_replay_ignore_params or []
for p in queriesArray:
if p[0] not in ignore_params:
filtered.append(p)
@@ -66,9 +64,9 @@ class ServerPlayback:
key.append(p[0])
key.append(p[1])
- if self.options.server_replay_use_headers:
+ if ctx.options.server_replay_use_headers:
headers = []
- for i in self.options.server_replay_use_headers:
+ for i in ctx.options.server_replay_use_headers:
v = r.headers.get(i)
headers.append((i, v))
key.append(headers)
@@ -83,7 +81,7 @@ class ServerPlayback:
"""
hsh = self._hash(request)
if hsh in self.flowmap:
- if self.options.server_replay_nopop:
+ if ctx.options.server_replay_nopop:
return self.flowmap[hsh][0]
else:
ret = self.flowmap[hsh].pop(0)
@@ -92,7 +90,6 @@ class ServerPlayback:
return ret
def configure(self, options, updated):
- self.options = options
if "server_replay" in updated:
self.clear()
if options.server_replay:
@@ -112,13 +109,13 @@ class ServerPlayback:
if rflow:
response = rflow.response.copy()
response.is_replay = True
- if self.options.refresh_server_playback:
+ if ctx.options.refresh_server_playback:
response.refresh()
f.response = response
if not self.flowmap:
self.final_flow = f
self.stop = True
- elif self.options.replay_kill_extra:
+ elif ctx.options.replay_kill_extra:
ctx.log.warn(
"server_playback: killed non-replay request {}".format(
f.request.url
diff --git a/mitmproxy/addons/termlog.py b/mitmproxy/addons/termlog.py
index 0d42b18e..4c37b005 100644
--- a/mitmproxy/addons/termlog.py
+++ b/mitmproxy/addons/termlog.py
@@ -2,6 +2,7 @@ import sys
import click
from mitmproxy import log
+from mitmproxy import ctx
# These get over-ridden by the save execution context. Keep them around so we
# can log directly.
@@ -11,19 +12,15 @@ realstderr = sys.stderr
class TermLog:
def __init__(self, outfile=None):
- self.options = None
self.outfile = outfile
- def configure(self, options, updated):
- self.options = options
-
def log(self, e):
if log.log_tier(e.level) == log.log_tier("error"):
outfile = self.outfile or realstderr
else:
outfile = self.outfile or realstdout
- if self.options.verbosity >= log.log_tier(e.level):
+ if ctx.options.verbosity >= log.log_tier(e.level):
click.secho(
e.msg,
file=outfile,
diff --git a/mitmproxy/addons/termstatus.py b/mitmproxy/addons/termstatus.py
index 951ddd3c..c3c91283 100644
--- a/mitmproxy/addons/termstatus.py
+++ b/mitmproxy/addons/termstatus.py
@@ -8,15 +8,8 @@ from mitmproxy.utils import human
class TermStatus:
- def __init__(self):
- self.server = False
-
- def configure(self, options, updated):
- if "server" in updated:
- self.server = options.server
-
def running(self):
- if self.server:
+ if ctx.options.server:
ctx.log.info(
"Proxy server listening at http://{}".format(
human.format_address(ctx.master.server.address)
diff --git a/mitmproxy/addons/upstream_auth.py b/mitmproxy/addons/upstream_auth.py
index 9beecfc0..0003d2fd 100644
--- a/mitmproxy/addons/upstream_auth.py
+++ b/mitmproxy/addons/upstream_auth.py
@@ -2,6 +2,7 @@ import re
import base64
from mitmproxy import exceptions
+from mitmproxy import ctx
from mitmproxy.utils import strutils
@@ -26,15 +27,12 @@ class UpstreamAuth():
"""
def __init__(self):
self.auth = None
- self.root_mode = None
def configure(self, options, updated):
# FIXME: We're doing this because our proxy core is terminally confused
# at the moment. Ideally, we should be able to check if we're in
# reverse proxy mode at the HTTP layer, so that scripts can put the
# proxy in reverse proxy mode for specific reuests.
- if "mode" in updated:
- self.root_mode = options.mode
if "upstream_auth" in updated:
if options.upstream_auth is None:
self.auth = None
@@ -49,5 +47,5 @@ class UpstreamAuth():
if self.auth:
if f.mode == "upstream" and not f.server_conn.via:
f.request.headers["Proxy-Authorization"] = self.auth
- elif self.root_mode == "reverse":
+ elif ctx.options.mode == "reverse":
f.request.headers["Proxy-Authorization"] = self.auth
diff --git a/mitmproxy/ctx.py b/mitmproxy/ctx.py
index 7b5231e6..954edcb1 100644
--- a/mitmproxy/ctx.py
+++ b/mitmproxy/ctx.py
@@ -1,4 +1,7 @@
import mitmproxy.master # noqa
import mitmproxy.log # noqa
+import mitmproxy.options # noqa
+
master = None # type: "mitmproxy.master.Master"
log = None # type: "mitmproxy.log.Log"
+options = None # type: "mitmproxy.options.Options"
diff --git a/mitmproxy/master.py b/mitmproxy/master.py
index 46fdb585..94900915 100644
--- a/mitmproxy/master.py
+++ b/mitmproxy/master.py
@@ -50,11 +50,13 @@ class Master:
return
mitmproxy_ctx.master = self
mitmproxy_ctx.log = log.Log(self)
+ mitmproxy_ctx.options = self.options
try:
yield
finally:
mitmproxy_ctx.master = None
mitmproxy_ctx.log = None
+ mitmproxy_ctx.options = None
def tell(self, mtype, m):
m.reply = controller.DummyReply()
diff --git a/test/mitmproxy/addons/test_proxyauth.py b/test/mitmproxy/addons/test_proxyauth.py
index 513f3f08..86621709 100644
--- a/test/mitmproxy/addons/test_proxyauth.py
+++ b/test/mitmproxy/addons/test_proxyauth.py
@@ -66,9 +66,6 @@ def test_configure():
with pytest.raises(exceptions.OptionsError):
ctx.configure(up, proxyauth="any", mode="socks5")
- ctx.configure(up, mode="regular")
- assert up.mode == "regular"
-
def test_check():
up = proxyauth.ProxyAuth()
diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py
index 7078b66e..c92f80b8 100644
--- a/test/mitmproxy/addons/test_serverplayback.py
+++ b/test/mitmproxy/addons/test_serverplayback.py
@@ -39,86 +39,88 @@ def test_tick():
def test_server_playback():
sp = serverplayback.ServerPlayback()
- sp.configure(options.Options(), [])
- f = tflow.tflow(resp=True)
+ with taddons.context() as tctx:
+ tctx.configure(sp)
+ f = tflow.tflow(resp=True)
- assert not sp.flowmap
+ assert not sp.flowmap
- sp.load_flows([f])
- assert sp.flowmap
- assert sp.next_flow(f)
- assert not sp.flowmap
+ sp.load_flows([f])
+ assert sp.flowmap
+ assert sp.next_flow(f)
+ assert not sp.flowmap
- sp.load_flows([f])
- assert sp.flowmap
- sp.clear()
- assert not sp.flowmap
+ sp.load_flows([f])
+ assert sp.flowmap
+ sp.clear()
+ assert not sp.flowmap
def test_ignore_host():
sp = serverplayback.ServerPlayback()
- sp.configure(options.Options(server_replay_ignore_host=True), [])
+ with taddons.context() as tctx:
+ tctx.configure(sp, server_replay_ignore_host=True)
- r = tflow.tflow(resp=True)
- r2 = tflow.tflow(resp=True)
+ r = tflow.tflow(resp=True)
+ r2 = tflow.tflow(resp=True)
- r.request.host = "address"
- r2.request.host = "address"
- assert sp._hash(r) == sp._hash(r2)
- r2.request.host = "wrong_address"
- assert sp._hash(r) == sp._hash(r2)
+ r.request.host = "address"
+ r2.request.host = "address"
+ assert sp._hash(r) == sp._hash(r2)
+ r2.request.host = "wrong_address"
+ assert sp._hash(r) == sp._hash(r2)
def test_ignore_content():
s = serverplayback.ServerPlayback()
- s.configure(options.Options(server_replay_ignore_content=False), [])
+ with taddons.context() as tctx:
+ tctx.configure(s, server_replay_ignore_content=False)
- r = tflow.tflow(resp=True)
- r2 = tflow.tflow(resp=True)
+ r = tflow.tflow(resp=True)
+ r2 = tflow.tflow(resp=True)
- r.request.content = b"foo"
- r2.request.content = b"foo"
- assert s._hash(r) == s._hash(r2)
- r2.request.content = b"bar"
- assert not s._hash(r) == s._hash(r2)
+ r.request.content = b"foo"
+ r2.request.content = b"foo"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.content = b"bar"
+ assert not s._hash(r) == s._hash(r2)
- s.configure(options.Options(server_replay_ignore_content=True), [])
- r = tflow.tflow(resp=True)
- r2 = tflow.tflow(resp=True)
- r.request.content = b"foo"
- r2.request.content = b"foo"
- assert s._hash(r) == s._hash(r2)
- r2.request.content = b"bar"
- assert s._hash(r) == s._hash(r2)
- r2.request.content = b""
- assert s._hash(r) == s._hash(r2)
- r2.request.content = None
- assert s._hash(r) == s._hash(r2)
+ tctx.configure(s, server_replay_ignore_content=True)
+ r = tflow.tflow(resp=True)
+ r2 = tflow.tflow(resp=True)
+ r.request.content = b"foo"
+ r2.request.content = b"foo"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.content = b"bar"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.content = b""
+ assert s._hash(r) == s._hash(r2)
+ r2.request.content = None
+ assert s._hash(r) == s._hash(r2)
def test_ignore_content_wins_over_params():
s = serverplayback.ServerPlayback()
- s.configure(
- options.Options(
+ with taddons.context() as tctx:
+ tctx.configure(
+ s,
server_replay_ignore_content=True,
server_replay_ignore_payload_params=[
"param1", "param2"
]
- ),
- []
- )
- # NOTE: parameters are mutually exclusive in options
+ )
- r = tflow.tflow(resp=True)
- r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
- r.request.content = b"paramx=y"
+ # NOTE: parameters are mutually exclusive in options
+ r = tflow.tflow(resp=True)
+ r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
+ r.request.content = b"paramx=y"
- r2 = tflow.tflow(resp=True)
- r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
- r2.request.content = b"paramx=x"
+ r2 = tflow.tflow(resp=True)
+ r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
+ r2.request.content = b"paramx=x"
- # same parameters
- assert s._hash(r) == s._hash(r2)
+ # same parameters
+ assert s._hash(r) == s._hash(r2)
def test_ignore_payload_params_other_content_type():
@@ -147,136 +149,139 @@ def test_ignore_payload_params_other_content_type():
def test_hash():
s = serverplayback.ServerPlayback()
- s.configure(options.Options(), [])
+ with taddons.context() as tctx:
+ tctx.configure(s)
- r = tflow.tflow()
- r2 = tflow.tflow()
+ r = tflow.tflow()
+ r2 = tflow.tflow()
- assert s._hash(r)
- assert s._hash(r) == s._hash(r2)
- r.request.headers["foo"] = "bar"
- assert s._hash(r) == s._hash(r2)
- r.request.path = "voing"
- assert s._hash(r) != s._hash(r2)
+ assert s._hash(r)
+ assert s._hash(r) == s._hash(r2)
+ r.request.headers["foo"] = "bar"
+ assert s._hash(r) == s._hash(r2)
+ r.request.path = "voing"
+ assert s._hash(r) != s._hash(r2)
- r.request.path = "path?blank_value"
- r2.request.path = "path?"
- assert s._hash(r) != s._hash(r2)
+ r.request.path = "path?blank_value"
+ r2.request.path = "path?"
+ assert s._hash(r) != s._hash(r2)
def test_headers():
s = serverplayback.ServerPlayback()
- s.configure(options.Options(server_replay_use_headers=["foo"]), [])
+ with taddons.context() as tctx:
+ tctx.configure(s, server_replay_use_headers=["foo"])
- r = tflow.tflow(resp=True)
- r.request.headers["foo"] = "bar"
- r2 = tflow.tflow(resp=True)
- assert not s._hash(r) == s._hash(r2)
- r2.request.headers["foo"] = "bar"
- assert s._hash(r) == s._hash(r2)
- r2.request.headers["oink"] = "bar"
- assert s._hash(r) == s._hash(r2)
+ r = tflow.tflow(resp=True)
+ r.request.headers["foo"] = "bar"
+ r2 = tflow.tflow(resp=True)
+ assert not s._hash(r) == s._hash(r2)
+ r2.request.headers["foo"] = "bar"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.headers["oink"] = "bar"
+ assert s._hash(r) == s._hash(r2)
- r = tflow.tflow(resp=True)
- r2 = tflow.tflow(resp=True)
- assert s._hash(r) == s._hash(r2)
+ r = tflow.tflow(resp=True)
+ r2 = tflow.tflow(resp=True)
+ assert s._hash(r) == s._hash(r2)
def test_load():
s = serverplayback.ServerPlayback()
- s.configure(options.Options(), [])
+ with taddons.context() as tctx:
+ tctx.configure(s)
- r = tflow.tflow(resp=True)
- r.request.headers["key"] = "one"
+ r = tflow.tflow(resp=True)
+ r.request.headers["key"] = "one"
- r2 = tflow.tflow(resp=True)
- r2.request.headers["key"] = "two"
+ r2 = tflow.tflow(resp=True)
+ r2.request.headers["key"] = "two"
- s.load_flows([r, r2])
+ s.load_flows([r, r2])
- assert s.count() == 2
+ assert s.count() == 2
- n = s.next_flow(r)
- assert n.request.headers["key"] == "one"
- assert s.count() == 1
+ n = s.next_flow(r)
+ assert n.request.headers["key"] == "one"
+ assert s.count() == 1
- n = s.next_flow(r)
- assert n.request.headers["key"] == "two"
- assert not s.flowmap
- assert s.count() == 0
+ n = s.next_flow(r)
+ assert n.request.headers["key"] == "two"
+ assert not s.flowmap
+ assert s.count() == 0
- assert not s.next_flow(r)
+ assert not s.next_flow(r)
def test_load_with_server_replay_nopop():
s = serverplayback.ServerPlayback()
- s.configure(options.Options(server_replay_nopop=True), [])
+ with taddons.context() as tctx:
+ tctx.configure(s, server_replay_nopop=True)
- r = tflow.tflow(resp=True)
- r.request.headers["key"] = "one"
+ r = tflow.tflow(resp=True)
+ r.request.headers["key"] = "one"
- r2 = tflow.tflow(resp=True)
- r2.request.headers["key"] = "two"
+ r2 = tflow.tflow(resp=True)
+ r2.request.headers["key"] = "two"
- s.load_flows([r, r2])
+ s.load_flows([r, r2])
- assert s.count() == 2
- s.next_flow(r)
- assert s.count() == 2
+ assert s.count() == 2
+ s.next_flow(r)
+ assert s.count() == 2
def test_ignore_params():
s = serverplayback.ServerPlayback()
- s.configure(
- options.Options(
+ with taddons.context() as tctx:
+ tctx.configure(
+ s,
server_replay_ignore_params=["param1", "param2"]
- ),
- []
- )
+ )
- r = tflow.tflow(resp=True)
- r.request.path = "/test?param1=1"
- r2 = tflow.tflow(resp=True)
- r2.request.path = "/test"
- assert s._hash(r) == s._hash(r2)
- r2.request.path = "/test?param1=2"
- assert s._hash(r) == s._hash(r2)
- r2.request.path = "/test?param2=1"
- assert s._hash(r) == s._hash(r2)
- r2.request.path = "/test?param3=2"
- assert not s._hash(r) == s._hash(r2)
+ r = tflow.tflow(resp=True)
+ r.request.path = "/test?param1=1"
+ r2 = tflow.tflow(resp=True)
+ r2.request.path = "/test"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.path = "/test?param1=2"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.path = "/test?param2=1"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.path = "/test?param3=2"
+ assert not s._hash(r) == s._hash(r2)
def thash(r, r2, setter):
s = serverplayback.ServerPlayback()
- s.configure(
- options.Options(
+ with taddons.context() as tctx:
+ s = serverplayback.ServerPlayback()
+ tctx.configure(
+ s,
server_replay_ignore_payload_params=["param1", "param2"]
- ),
- []
- )
-
- setter(r, paramx="x", param1="1")
-
- setter(r2, paramx="x", param1="1")
- # same parameters
- assert s._hash(r) == s._hash(r2)
- # ignored parameters !=
- setter(r2, paramx="x", param1="2")
- assert s._hash(r) == s._hash(r2)
- # missing parameter
- setter(r2, paramx="x")
- assert s._hash(r) == s._hash(r2)
- # ignorable parameter added
- setter(r2, paramx="x", param1="2")
- assert s._hash(r) == s._hash(r2)
- # not ignorable parameter changed
- setter(r2, paramx="y", param1="1")
- assert not s._hash(r) == s._hash(r2)
- # not ignorable parameter missing
- setter(r2, param1="1")
- r2.request.content = b"param1=1"
- assert not s._hash(r) == s._hash(r2)
+ )
+
+ setter(r, paramx="x", param1="1")
+
+ setter(r2, paramx="x", param1="1")
+ # same parameters
+ assert s._hash(r) == s._hash(r2)
+ # ignored parameters !=
+ setter(r2, paramx="x", param1="2")
+ assert s._hash(r) == s._hash(r2)
+ # missing parameter
+ setter(r2, paramx="x")
+ assert s._hash(r) == s._hash(r2)
+ # ignorable parameter added
+ setter(r2, paramx="x", param1="2")
+ assert s._hash(r) == s._hash(r2)
+ # not ignorable parameter changed
+ setter(r2, paramx="y", param1="1")
+ assert not s._hash(r) == s._hash(r2)
+ # not ignorable parameter missing
+ setter(r2, param1="1")
+ r2.request.content = b"param1=1"
+ assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_params():
diff --git a/test/mitmproxy/addons/test_termstatus.py b/test/mitmproxy/addons/test_termstatus.py
index 7becc857..2debaff5 100644
--- a/test/mitmproxy/addons/test_termstatus.py
+++ b/test/mitmproxy/addons/test_termstatus.py
@@ -5,6 +5,7 @@ from mitmproxy.test import taddons
def test_configure():
ts = termstatus.TermStatus()
with taddons.context() as ctx:
+ ctx.configure(ts, server=False)
ts.running()
assert not ctx.master.logs
ctx.configure(ts, server=True)