aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@corte.si>2017-04-26 10:25:56 +1200
committerAldo Cortesi <aldo@corte.si>2017-04-26 10:25:56 +1200
commitf90b4c2ff0f3fd71350900c10dea2a67846e1bdb (patch)
tree9209f40200ad73b7b2346d0b2286c8aa26f0d886
parent7aa208189477f8c5fcd3f7850e1c98fade757f11 (diff)
downloadmitmproxy-f90b4c2ff0f3fd71350900c10dea2a67846e1bdb.tar.gz
mitmproxy-f90b4c2ff0f3fd71350900c10dea2a67846e1bdb.tar.bz2
mitmproxy-f90b4c2ff0f3fd71350900c10dea2a67846e1bdb.zip
Move options into ctx
Many addons currently save options on configure(), either as individual options or sometimes by saving the entire options object. The current options should simply be available on the ctx object, simplifying state management for addons considerably.
-rw-r--r--mitmproxy/addons/anticache.py9
-rw-r--r--mitmproxy/addons/anticomp.py9
-rw-r--r--mitmproxy/addons/disable_h2c.py3
-rw-r--r--mitmproxy/addons/dumper.py32
-rw-r--r--mitmproxy/addons/onboarding.py5
-rw-r--r--mitmproxy/addons/proxyauth.py6
-rw-r--r--mitmproxy/addons/readfile.py12
-rw-r--r--mitmproxy/addons/script.py7
-rw-r--r--mitmproxy/addons/serverplayback.py27
-rw-r--r--mitmproxy/addons/termlog.py7
-rw-r--r--mitmproxy/addons/termstatus.py9
-rw-r--r--mitmproxy/addons/upstream_auth.py6
-rw-r--r--mitmproxy/ctx.py3
-rw-r--r--mitmproxy/master.py2
-rw-r--r--test/mitmproxy/addons/test_proxyauth.py3
-rw-r--r--test/mitmproxy/addons/test_serverplayback.py293
-rw-r--r--test/mitmproxy/addons/test_termstatus.py1
17 files changed, 199 insertions, 235 deletions
diff --git a/mitmproxy/addons/anticache.py b/mitmproxy/addons/anticache.py
index 8d748a21..5b34d5a5 100644
--- a/mitmproxy/addons/anticache.py
+++ b/mitmproxy/addons/anticache.py
@@ -1,10 +1,7 @@
-class AntiCache:
- def __init__(self):
- self.enabled = False
+from mitmproxy import ctx
- def configure(self, options, updated):
- self.enabled = options.anticache
+class AntiCache:
def request(self, flow):
- if self.enabled:
+ if ctx.options.anticache:
flow.request.anticache()
diff --git a/mitmproxy/addons/anticomp.py b/mitmproxy/addons/anticomp.py
index eaf01296..d7d1ca8d 100644
--- a/mitmproxy/addons/anticomp.py
+++ b/mitmproxy/addons/anticomp.py
@@ -1,10 +1,7 @@
-class AntiComp:
- def __init__(self):
- self.enabled = False
+from mitmproxy import ctx
- def configure(self, options, updated):
- self.enabled = options.anticomp
+class AntiComp:
def request(self, flow):
- if self.enabled:
+ if ctx.options.anticomp:
flow.request.anticomp()
diff --git a/mitmproxy/addons/disable_h2c.py b/mitmproxy/addons/disable_h2c.py
index b43d5207..392a29a5 100644
--- a/mitmproxy/addons/disable_h2c.py
+++ b/mitmproxy/addons/disable_h2c.py
@@ -14,9 +14,6 @@ class DisableH2C:
by sending the connection preface. We just kill those flows.
"""
- def configure(self, options, updated):
- pass
-
def process_flow(self, f):
if f.request.headers.get('upgrade', '') == 'h2c':
mitmproxy.ctx.log.warn("HTTP/2 cleartext connections (h2c upgrade requests) are currently not supported.")
diff --git a/mitmproxy/addons/dumper.py b/mitmproxy/addons/dumper.py
index 5fd8408f..587bb29b 100644
--- a/mitmproxy/addons/dumper.py
+++ b/mitmproxy/addons/dumper.py
@@ -29,10 +29,7 @@ def colorful(line, styles):
class Dumper:
def __init__(self, outfile=sys.stdout):
self.filter = None # type: flowfilter.TFilter
- self.flow_detail = None # type: int
self.outfp = outfile # type: typing.io.TextIO
- self.showhost = None # type: bool
- self.default_contentview = "auto" # type: str
def configure(self, options, updated):
if "view_filter" in updated:
@@ -44,9 +41,6 @@ class Dumper:
)
else:
self.filter = None
- self.flow_detail = options.flow_detail
- self.showhost = options.showhost
- self.default_contentview = options.default_contentview
def echo(self, text, ident=None, **style):
if ident:
@@ -67,13 +61,13 @@ class Dumper:
def _echo_message(self, message):
_, lines, error = contentviews.get_message_content_view(
- self.default_contentview,
+ ctx.options.default_contentview,
message
)
if error:
ctx.log.debug(error)
- if self.flow_detail == 3:
+ if ctx.options.flow_detail == 3:
lines_to_echo = itertools.islice(lines, 70)
else:
lines_to_echo = lines
@@ -95,7 +89,7 @@ class Dumper:
if next(lines, None):
self.echo("(cut off)", ident=4, dim=True)
- if self.flow_detail >= 2:
+ if ctx.options.flow_detail >= 2:
self.echo("")
def _echo_request_line(self, flow):
@@ -121,12 +115,12 @@ class Dumper:
fg=method_color,
bold=True
)
- if self.showhost:
+ if ctx.options.showhost:
url = flow.request.pretty_url
else:
url = flow.request.url
terminalWidthLimit = max(shutil.get_terminal_size()[0] - 25, 50)
- if self.flow_detail < 1 and len(url) > terminalWidthLimit:
+ if ctx.options.flow_detail < 1 and len(url) > terminalWidthLimit:
url = url[:terminalWidthLimit] + "…"
url = click.style(strutils.escape_control_characters(url), bold=True)
@@ -176,7 +170,7 @@ class Dumper:
size = click.style(size, bold=True)
arrows = click.style(" <<", bold=True)
- if self.flow_detail == 1:
+ if ctx.options.flow_detail == 1:
# This aligns the HTTP response code with the HTTP request method:
# 127.0.0.1:59519: GET http://example.com/
# << 304 Not Modified 0b
@@ -194,16 +188,16 @@ class Dumper:
def echo_flow(self, f):
if f.request:
self._echo_request_line(f)
- if self.flow_detail >= 2:
+ if ctx.options.flow_detail >= 2:
self._echo_headers(f.request.headers)
- if self.flow_detail >= 3:
+ if ctx.options.flow_detail >= 3:
self._echo_message(f.request)
if f.response:
self._echo_response_line(f)
- if self.flow_detail >= 2:
+ if ctx.options.flow_detail >= 2:
self._echo_headers(f.response.headers)
- if self.flow_detail >= 3:
+ if ctx.options.flow_detail >= 3:
self._echo_message(f.response)
if f.error:
@@ -211,7 +205,7 @@ class Dumper:
self.echo(" << {}".format(msg), bold=True, fg="red")
def match(self, f):
- if self.flow_detail == 0:
+ if ctx.options.flow_detail == 0:
return False
if not self.filter:
return True
@@ -239,7 +233,7 @@ class Dumper:
if self.match(f):
message = f.messages[-1]
self.echo(f.message_info(message))
- if self.flow_detail >= 3:
+ if ctx.options.flow_detail >= 3:
self._echo_message(message)
def websocket_end(self, f):
@@ -267,5 +261,5 @@ class Dumper:
server=repr(f.server_conn.address),
direction=direction,
))
- if self.flow_detail >= 3:
+ if ctx.options.flow_detail >= 3:
self._echo_message(message)
diff --git a/mitmproxy/addons/onboarding.py b/mitmproxy/addons/onboarding.py
index 6552ec9e..e67beadd 100644
--- a/mitmproxy/addons/onboarding.py
+++ b/mitmproxy/addons/onboarding.py
@@ -1,5 +1,6 @@
from mitmproxy.addons import wsgiapp
from mitmproxy.addons.onboardingapp import app
+from mitmproxy import ctx
class Onboarding(wsgiapp.WSGIApp):
@@ -7,13 +8,11 @@ class Onboarding(wsgiapp.WSGIApp):
def __init__(self):
super().__init__(app.Adapter(app.application), None, None)
- self.enabled = False
def configure(self, options, updated):
self.host = options.onboarding_host
self.port = options.onboarding_port
- self.enabled = options.onboarding
def request(self, f):
- if self.enabled:
+ if ctx.options.onboarding:
super().request(f)
diff --git a/mitmproxy/addons/proxyauth.py b/mitmproxy/addons/proxyauth.py
index 43677f61..5fed266e 100644
--- a/mitmproxy/addons/proxyauth.py
+++ b/mitmproxy/addons/proxyauth.py
@@ -10,6 +10,7 @@ import mitmproxy.net.http
from mitmproxy import connections # noqa
from mitmproxy import exceptions
from mitmproxy import http
+from mitmproxy import ctx
from mitmproxy.net.http import status_codes
REALM = "mitmproxy"
@@ -45,7 +46,6 @@ class ProxyAuth:
self.nonanonymous = False
self.htpasswd = None
self.singleuser = None
- self.mode = None
self.authenticated = weakref.WeakKeyDictionary() # type: MutableMapping[connections.ClientConnection, Tuple[str, str]]
"""Contains all connections that are permanently authenticated after an HTTP CONNECT"""
@@ -58,7 +58,7 @@ class ProxyAuth:
- True, if authentication is done as if mitmproxy is a proxy
- False, if authentication is done as if mitmproxy is a HTTP server
"""
- return self.mode in ("regular", "upstream")
+ return ctx.options.mode in ("regular", "upstream")
def which_auth_header(self) -> str:
if self.is_proxy_auth():
@@ -136,8 +136,6 @@ class ProxyAuth:
"Invalid single-user auth specification."
)
self.singleuser = parts
- if "mode" in updated:
- self.mode = options.mode
if self.enabled():
if options.mode == "transparent":
raise exceptions.OptionsError(
diff --git a/mitmproxy/addons/readfile.py b/mitmproxy/addons/readfile.py
index 03dcd084..949da15d 100644
--- a/mitmproxy/addons/readfile.py
+++ b/mitmproxy/addons/readfile.py
@@ -9,9 +9,6 @@ class ReadFile:
"""
An addon that handles reading from file on startup.
"""
- def __init__(self):
- self.path = None
-
def load_flows_file(self, path: str) -> int:
path = os.path.expanduser(path)
cnt = 0
@@ -31,16 +28,11 @@ class ReadFile:
ctx.log.error("Flow file corrupted.")
raise exceptions.FlowReadException(v)
- def configure(self, options, updated):
- if "rfile" in updated and options.rfile:
- self.path = options.rfile
-
def running(self):
- if self.path:
+ if ctx.options.rfile:
try:
- self.load_flows_file(self.path)
+ self.load_flows_file(ctx.options.rfile)
except exceptions.FlowReadException as v:
raise exceptions.OptionsError(v)
finally:
- self.path = None
ctx.master.addons.trigger("processing_complete")
diff --git a/mitmproxy/addons/script.py b/mitmproxy/addons/script.py
index 5099e62c..0cca3eac 100644
--- a/mitmproxy/addons/script.py
+++ b/mitmproxy/addons/script.py
@@ -36,7 +36,6 @@ class Script:
self.path = path
self.ns = None
- self.last_options = None
self.last_load = 0
self.last_mtime = 0
@@ -60,14 +59,12 @@ class Script:
ctx.master.addons.invoke_addon(
self.ns,
"configure",
- self.last_options,
- self.last_options.keys()
+ ctx.options,
+ ctx.options.keys()
)
self.last_load = time.time()
self.last_mtime = mtime
- def configure(self, options, updated):
- self.last_options = options
class ScriptLoader:
diff --git a/mitmproxy/addons/serverplayback.py b/mitmproxy/addons/serverplayback.py
index 7bb0c716..7d2aa15f 100644
--- a/mitmproxy/addons/serverplayback.py
+++ b/mitmproxy/addons/serverplayback.py
@@ -10,8 +10,6 @@ from mitmproxy import io
class ServerPlayback:
def __init__(self):
- self.options = None
-
self.flowmap = {}
self.stop = False
self.final_flow = None
@@ -38,27 +36,27 @@ class ServerPlayback:
queriesArray = urllib.parse.parse_qsl(query, keep_blank_values=True)
key = [str(r.port), str(r.scheme), str(r.method), str(path)] # type: List[Any]
- if not self.options.server_replay_ignore_content:
- if self.options.server_replay_ignore_payload_params and r.multipart_form:
+ if not ctx.options.server_replay_ignore_content:
+ if ctx.options.server_replay_ignore_payload_params and r.multipart_form:
key.extend(
(k, v)
for k, v in r.multipart_form.items(multi=True)
- if k.decode(errors="replace") not in self.options.server_replay_ignore_payload_params
+ if k.decode(errors="replace") not in ctx.options.server_replay_ignore_payload_params
)
- elif self.options.server_replay_ignore_payload_params and r.urlencoded_form:
+ elif ctx.options.server_replay_ignore_payload_params and r.urlencoded_form:
key.extend(
(k, v)
for k, v in r.urlencoded_form.items(multi=True)
- if k not in self.options.server_replay_ignore_payload_params
+ if k not in ctx.options.server_replay_ignore_payload_params
)
else:
key.append(str(r.raw_content))
- if not self.options.server_replay_ignore_host:
+ if not ctx.options.server_replay_ignore_host:
key.append(r.host)
filtered = []
- ignore_params = self.options.server_replay_ignore_params or []
+ ignore_params = ctx.options.server_replay_ignore_params or []
for p in queriesArray:
if p[0] not in ignore_params:
filtered.append(p)
@@ -66,9 +64,9 @@ class ServerPlayback:
key.append(p[0])
key.append(p[1])
- if self.options.server_replay_use_headers:
+ if ctx.options.server_replay_use_headers:
headers = []
- for i in self.options.server_replay_use_headers:
+ for i in ctx.options.server_replay_use_headers:
v = r.headers.get(i)
headers.append((i, v))
key.append(headers)
@@ -83,7 +81,7 @@ class ServerPlayback:
"""
hsh = self._hash(request)
if hsh in self.flowmap:
- if self.options.server_replay_nopop:
+ if ctx.options.server_replay_nopop:
return self.flowmap[hsh][0]
else:
ret = self.flowmap[hsh].pop(0)
@@ -92,7 +90,6 @@ class ServerPlayback:
return ret
def configure(self, options, updated):
- self.options = options
if "server_replay" in updated:
self.clear()
if options.server_replay:
@@ -112,13 +109,13 @@ class ServerPlayback:
if rflow:
response = rflow.response.copy()
response.is_replay = True
- if self.options.refresh_server_playback:
+ if ctx.options.refresh_server_playback:
response.refresh()
f.response = response
if not self.flowmap:
self.final_flow = f
self.stop = True
- elif self.options.replay_kill_extra:
+ elif ctx.options.replay_kill_extra:
ctx.log.warn(
"server_playback: killed non-replay request {}".format(
f.request.url
diff --git a/mitmproxy/addons/termlog.py b/mitmproxy/addons/termlog.py
index 0d42b18e..4c37b005 100644
--- a/mitmproxy/addons/termlog.py
+++ b/mitmproxy/addons/termlog.py
@@ -2,6 +2,7 @@ import sys
import click
from mitmproxy import log
+from mitmproxy import ctx
# These get over-ridden by the save execution context. Keep them around so we
# can log directly.
@@ -11,19 +12,15 @@ realstderr = sys.stderr
class TermLog:
def __init__(self, outfile=None):
- self.options = None
self.outfile = outfile
- def configure(self, options, updated):
- self.options = options
-
def log(self, e):
if log.log_tier(e.level) == log.log_tier("error"):
outfile = self.outfile or realstderr
else:
outfile = self.outfile or realstdout
- if self.options.verbosity >= log.log_tier(e.level):
+ if ctx.options.verbosity >= log.log_tier(e.level):
click.secho(
e.msg,
file=outfile,
diff --git a/mitmproxy/addons/termstatus.py b/mitmproxy/addons/termstatus.py
index 951ddd3c..c3c91283 100644
--- a/mitmproxy/addons/termstatus.py
+++ b/mitmproxy/addons/termstatus.py
@@ -8,15 +8,8 @@ from mitmproxy.utils import human
class TermStatus:
- def __init__(self):
- self.server = False
-
- def configure(self, options, updated):
- if "server" in updated:
- self.server = options.server
-
def running(self):
- if self.server:
+ if ctx.options.server:
ctx.log.info(
"Proxy server listening at http://{}".format(
human.format_address(ctx.master.server.address)
diff --git a/mitmproxy/addons/upstream_auth.py b/mitmproxy/addons/upstream_auth.py
index 9beecfc0..0003d2fd 100644
--- a/mitmproxy/addons/upstream_auth.py
+++ b/mitmproxy/addons/upstream_auth.py
@@ -2,6 +2,7 @@ import re
import base64
from mitmproxy import exceptions
+from mitmproxy import ctx
from mitmproxy.utils import strutils
@@ -26,15 +27,12 @@ class UpstreamAuth():
"""
def __init__(self):
self.auth = None
- self.root_mode = None
def configure(self, options, updated):
# FIXME: We're doing this because our proxy core is terminally confused
# at the moment. Ideally, we should be able to check if we're in
# reverse proxy mode at the HTTP layer, so that scripts can put the
# proxy in reverse proxy mode for specific reuests.
- if "mode" in updated:
- self.root_mode = options.mode
if "upstream_auth" in updated:
if options.upstream_auth is None:
self.auth = None
@@ -49,5 +47,5 @@ class UpstreamAuth():
if self.auth:
if f.mode == "upstream" and not f.server_conn.via:
f.request.headers["Proxy-Authorization"] = self.auth
- elif self.root_mode == "reverse":
+ elif ctx.options.mode == "reverse":
f.request.headers["Proxy-Authorization"] = self.auth
diff --git a/mitmproxy/ctx.py b/mitmproxy/ctx.py
index 7b5231e6..954edcb1 100644
--- a/mitmproxy/ctx.py
+++ b/mitmproxy/ctx.py
@@ -1,4 +1,7 @@
import mitmproxy.master # noqa
import mitmproxy.log # noqa
+import mitmproxy.options # noqa
+
master = None # type: "mitmproxy.master.Master"
log = None # type: "mitmproxy.log.Log"
+options = None # type: "mitmproxy.options.Options"
diff --git a/mitmproxy/master.py b/mitmproxy/master.py
index 46fdb585..94900915 100644
--- a/mitmproxy/master.py
+++ b/mitmproxy/master.py
@@ -50,11 +50,13 @@ class Master:
return
mitmproxy_ctx.master = self
mitmproxy_ctx.log = log.Log(self)
+ mitmproxy_ctx.options = self.options
try:
yield
finally:
mitmproxy_ctx.master = None
mitmproxy_ctx.log = None
+ mitmproxy_ctx.options = None
def tell(self, mtype, m):
m.reply = controller.DummyReply()
diff --git a/test/mitmproxy/addons/test_proxyauth.py b/test/mitmproxy/addons/test_proxyauth.py
index 513f3f08..86621709 100644
--- a/test/mitmproxy/addons/test_proxyauth.py
+++ b/test/mitmproxy/addons/test_proxyauth.py
@@ -66,9 +66,6 @@ def test_configure():
with pytest.raises(exceptions.OptionsError):
ctx.configure(up, proxyauth="any", mode="socks5")
- ctx.configure(up, mode="regular")
- assert up.mode == "regular"
-
def test_check():
up = proxyauth.ProxyAuth()
diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py
index 7078b66e..c92f80b8 100644
--- a/test/mitmproxy/addons/test_serverplayback.py
+++ b/test/mitmproxy/addons/test_serverplayback.py
@@ -39,86 +39,88 @@ def test_tick():
def test_server_playback():
sp = serverplayback.ServerPlayback()
- sp.configure(options.Options(), [])
- f = tflow.tflow(resp=True)
+ with taddons.context() as tctx:
+ tctx.configure(sp)
+ f = tflow.tflow(resp=True)
- assert not sp.flowmap
+ assert not sp.flowmap
- sp.load_flows([f])
- assert sp.flowmap
- assert sp.next_flow(f)
- assert not sp.flowmap
+ sp.load_flows([f])
+ assert sp.flowmap
+ assert sp.next_flow(f)
+ assert not sp.flowmap
- sp.load_flows([f])
- assert sp.flowmap
- sp.clear()
- assert not sp.flowmap
+ sp.load_flows([f])
+ assert sp.flowmap
+ sp.clear()
+ assert not sp.flowmap
def test_ignore_host():
sp = serverplayback.ServerPlayback()
- sp.configure(options.Options(server_replay_ignore_host=True), [])
+ with taddons.context() as tctx:
+ tctx.configure(sp, server_replay_ignore_host=True)
- r = tflow.tflow(resp=True)
- r2 = tflow.tflow(resp=True)
+ r = tflow.tflow(resp=True)
+ r2 = tflow.tflow(resp=True)
- r.request.host = "address"
- r2.request.host = "address"
- assert sp._hash(r) == sp._hash(r2)
- r2.request.host = "wrong_address"
- assert sp._hash(r) == sp._hash(r2)
+ r.request.host = "address"
+ r2.request.host = "address"
+ assert sp._hash(r) == sp._hash(r2)
+ r2.request.host = "wrong_address"
+ assert sp._hash(r) == sp._hash(r2)
def test_ignore_content():
s = serverplayback.ServerPlayback()
- s.configure(options.Options(server_replay_ignore_content=False), [])
+ with taddons.context() as tctx:
+ tctx.configure(s, server_replay_ignore_content=False)
- r = tflow.tflow(resp=True)
- r2 = tflow.tflow(resp=True)
+ r = tflow.tflow(resp=True)
+ r2 = tflow.tflow(resp=True)
- r.request.content = b"foo"
- r2.request.content = b"foo"
- assert s._hash(r) == s._hash(r2)
- r2.request.content = b"bar"
- assert not s._hash(r) == s._hash(r2)
+ r.request.content = b"foo"
+ r2.request.content = b"foo"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.content = b"bar"
+ assert not s._hash(r) == s._hash(r2)
- s.configure(options.Options(server_replay_ignore_content=True), [])
- r = tflow.tflow(resp=True)
- r2 = tflow.tflow(resp=True)
- r.request.content = b"foo"
- r2.request.content = b"foo"
- assert s._hash(r) == s._hash(r2)
- r2.request.content = b"bar"
- assert s._hash(r) == s._hash(r2)
- r2.request.content = b""
- assert s._hash(r) == s._hash(r2)
- r2.request.content = None
- assert s._hash(r) == s._hash(r2)
+ tctx.configure(s, server_replay_ignore_content=True)
+ r = tflow.tflow(resp=True)
+ r2 = tflow.tflow(resp=True)
+ r.request.content = b"foo"
+ r2.request.content = b"foo"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.content = b"bar"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.content = b""
+ assert s._hash(r) == s._hash(r2)
+ r2.request.content = None
+ assert s._hash(r) == s._hash(r2)
def test_ignore_content_wins_over_params():
s = serverplayback.ServerPlayback()
- s.configure(
- options.Options(
+ with taddons.context() as tctx:
+ tctx.configure(
+ s,
server_replay_ignore_content=True,
server_replay_ignore_payload_params=[
"param1", "param2"
]
- ),
- []
- )
- # NOTE: parameters are mutually exclusive in options
+ )
- r = tflow.tflow(resp=True)
- r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
- r.request.content = b"paramx=y"
+ # NOTE: parameters are mutually exclusive in options
+ r = tflow.tflow(resp=True)
+ r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
+ r.request.content = b"paramx=y"
- r2 = tflow.tflow(resp=True)
- r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
- r2.request.content = b"paramx=x"
+ r2 = tflow.tflow(resp=True)
+ r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
+ r2.request.content = b"paramx=x"
- # same parameters
- assert s._hash(r) == s._hash(r2)
+ # same parameters
+ assert s._hash(r) == s._hash(r2)
def test_ignore_payload_params_other_content_type():
@@ -147,136 +149,139 @@ def test_ignore_payload_params_other_content_type():
def test_hash():
s = serverplayback.ServerPlayback()
- s.configure(options.Options(), [])
+ with taddons.context() as tctx:
+ tctx.configure(s)
- r = tflow.tflow()
- r2 = tflow.tflow()
+ r = tflow.tflow()
+ r2 = tflow.tflow()
- assert s._hash(r)
- assert s._hash(r) == s._hash(r2)
- r.request.headers["foo"] = "bar"
- assert s._hash(r) == s._hash(r2)
- r.request.path = "voing"
- assert s._hash(r) != s._hash(r2)
+ assert s._hash(r)
+ assert s._hash(r) == s._hash(r2)
+ r.request.headers["foo"] = "bar"
+ assert s._hash(r) == s._hash(r2)
+ r.request.path = "voing"
+ assert s._hash(r) != s._hash(r2)
- r.request.path = "path?blank_value"
- r2.request.path = "path?"
- assert s._hash(r) != s._hash(r2)
+ r.request.path = "path?blank_value"
+ r2.request.path = "path?"
+ assert s._hash(r) != s._hash(r2)
def test_headers():
s = serverplayback.ServerPlayback()
- s.configure(options.Options(server_replay_use_headers=["foo"]), [])
+ with taddons.context() as tctx:
+ tctx.configure(s, server_replay_use_headers=["foo"])
- r = tflow.tflow(resp=True)
- r.request.headers["foo"] = "bar"
- r2 = tflow.tflow(resp=True)
- assert not s._hash(r) == s._hash(r2)
- r2.request.headers["foo"] = "bar"
- assert s._hash(r) == s._hash(r2)
- r2.request.headers["oink"] = "bar"
- assert s._hash(r) == s._hash(r2)
+ r = tflow.tflow(resp=True)
+ r.request.headers["foo"] = "bar"
+ r2 = tflow.tflow(resp=True)
+ assert not s._hash(r) == s._hash(r2)
+ r2.request.headers["foo"] = "bar"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.headers["oink"] = "bar"
+ assert s._hash(r) == s._hash(r2)
- r = tflow.tflow(resp=True)
- r2 = tflow.tflow(resp=True)
- assert s._hash(r) == s._hash(r2)
+ r = tflow.tflow(resp=True)
+ r2 = tflow.tflow(resp=True)
+ assert s._hash(r) == s._hash(r2)
def test_load():
s = serverplayback.ServerPlayback()
- s.configure(options.Options(), [])
+ with taddons.context() as tctx:
+ tctx.configure(s)
- r = tflow.tflow(resp=True)
- r.request.headers["key"] = "one"
+ r = tflow.tflow(resp=True)
+ r.request.headers["key"] = "one"
- r2 = tflow.tflow(resp=True)
- r2.request.headers["key"] = "two"
+ r2 = tflow.tflow(resp=True)
+ r2.request.headers["key"] = "two"
- s.load_flows([r, r2])
+ s.load_flows([r, r2])
- assert s.count() == 2
+ assert s.count() == 2
- n = s.next_flow(r)
- assert n.request.headers["key"] == "one"
- assert s.count() == 1
+ n = s.next_flow(r)
+ assert n.request.headers["key"] == "one"
+ assert s.count() == 1
- n = s.next_flow(r)
- assert n.request.headers["key"] == "two"
- assert not s.flowmap
- assert s.count() == 0
+ n = s.next_flow(r)
+ assert n.request.headers["key"] == "two"
+ assert not s.flowmap
+ assert s.count() == 0
- assert not s.next_flow(r)
+ assert not s.next_flow(r)
def test_load_with_server_replay_nopop():
s = serverplayback.ServerPlayback()
- s.configure(options.Options(server_replay_nopop=True), [])
+ with taddons.context() as tctx:
+ tctx.configure(s, server_replay_nopop=True)
- r = tflow.tflow(resp=True)
- r.request.headers["key"] = "one"
+ r = tflow.tflow(resp=True)
+ r.request.headers["key"] = "one"
- r2 = tflow.tflow(resp=True)
- r2.request.headers["key"] = "two"
+ r2 = tflow.tflow(resp=True)
+ r2.request.headers["key"] = "two"
- s.load_flows([r, r2])
+ s.load_flows([r, r2])
- assert s.count() == 2
- s.next_flow(r)
- assert s.count() == 2
+ assert s.count() == 2
+ s.next_flow(r)
+ assert s.count() == 2
def test_ignore_params():
s = serverplayback.ServerPlayback()
- s.configure(
- options.Options(
+ with taddons.context() as tctx:
+ tctx.configure(
+ s,
server_replay_ignore_params=["param1", "param2"]
- ),
- []
- )
+ )
- r = tflow.tflow(resp=True)
- r.request.path = "/test?param1=1"
- r2 = tflow.tflow(resp=True)
- r2.request.path = "/test"
- assert s._hash(r) == s._hash(r2)
- r2.request.path = "/test?param1=2"
- assert s._hash(r) == s._hash(r2)
- r2.request.path = "/test?param2=1"
- assert s._hash(r) == s._hash(r2)
- r2.request.path = "/test?param3=2"
- assert not s._hash(r) == s._hash(r2)
+ r = tflow.tflow(resp=True)
+ r.request.path = "/test?param1=1"
+ r2 = tflow.tflow(resp=True)
+ r2.request.path = "/test"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.path = "/test?param1=2"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.path = "/test?param2=1"
+ assert s._hash(r) == s._hash(r2)
+ r2.request.path = "/test?param3=2"
+ assert not s._hash(r) == s._hash(r2)
def thash(r, r2, setter):
s = serverplayback.ServerPlayback()
- s.configure(
- options.Options(
+ with taddons.context() as tctx:
+ s = serverplayback.ServerPlayback()
+ tctx.configure(
+ s,
server_replay_ignore_payload_params=["param1", "param2"]
- ),
- []
- )
-
- setter(r, paramx="x", param1="1")
-
- setter(r2, paramx="x", param1="1")
- # same parameters
- assert s._hash(r) == s._hash(r2)
- # ignored parameters !=
- setter(r2, paramx="x", param1="2")
- assert s._hash(r) == s._hash(r2)
- # missing parameter
- setter(r2, paramx="x")
- assert s._hash(r) == s._hash(r2)
- # ignorable parameter added
- setter(r2, paramx="x", param1="2")
- assert s._hash(r) == s._hash(r2)
- # not ignorable parameter changed
- setter(r2, paramx="y", param1="1")
- assert not s._hash(r) == s._hash(r2)
- # not ignorable parameter missing
- setter(r2, param1="1")
- r2.request.content = b"param1=1"
- assert not s._hash(r) == s._hash(r2)
+ )
+
+ setter(r, paramx="x", param1="1")
+
+ setter(r2, paramx="x", param1="1")
+ # same parameters
+ assert s._hash(r) == s._hash(r2)
+ # ignored parameters !=
+ setter(r2, paramx="x", param1="2")
+ assert s._hash(r) == s._hash(r2)
+ # missing parameter
+ setter(r2, paramx="x")
+ assert s._hash(r) == s._hash(r2)
+ # ignorable parameter added
+ setter(r2, paramx="x", param1="2")
+ assert s._hash(r) == s._hash(r2)
+ # not ignorable parameter changed
+ setter(r2, paramx="y", param1="1")
+ assert not s._hash(r) == s._hash(r2)
+ # not ignorable parameter missing
+ setter(r2, param1="1")
+ r2.request.content = b"param1=1"
+ assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_params():
diff --git a/test/mitmproxy/addons/test_termstatus.py b/test/mitmproxy/addons/test_termstatus.py
index 7becc857..2debaff5 100644
--- a/test/mitmproxy/addons/test_termstatus.py
+++ b/test/mitmproxy/addons/test_termstatus.py
@@ -5,6 +5,7 @@ from mitmproxy.test import taddons
def test_configure():
ts = termstatus.TermStatus()
with taddons.context() as ctx:
+ ctx.configure(ts, server=False)
ts.running()
assert not ctx.master.logs
ctx.configure(ts, server=True)