aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/flow/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'mitmproxy/flow/master.py')
-rw-r--r--mitmproxy/flow/master.py156
1 files changed, 66 insertions, 90 deletions
diff --git a/mitmproxy/flow/master.py b/mitmproxy/flow/master.py
index b71c2c8d..94b46f3f 100644
--- a/mitmproxy/flow/master.py
+++ b/mitmproxy/flow/master.py
@@ -15,6 +15,30 @@ from mitmproxy.onboarding import app
from mitmproxy.protocol import http_replay
+def event_sequence(f):
+ if isinstance(f, models.HTTPFlow):
+ if f.request:
+ yield "request", f
+ if f.response:
+ yield "responseheaders", f
+ yield "response", f
+ if f.error:
+ yield "error", f
+ elif isinstance(f, models.TCPFlow):
+ messages = f.messages
+ f.messages = []
+ f.reply = controller.DummyReply()
+ yield "tcp_open", f
+ while messages:
+ f.messages.append(messages.pop(0))
+ yield "tcp_message", f
+ if f.error:
+ yield "tcp_error", f
+ yield "tcp_close", f
+ else:
+ raise NotImplementedError
+
+
class FlowMaster(controller.Master):
@property
@@ -29,16 +53,11 @@ class FlowMaster(controller.Master):
if server:
self.add_server(server)
self.state = state
- self.client_playback = None # type: Optional[modules.ClientPlaybackState]
self.stream_large_bodies = None # type: Optional[modules.StreamLargeBodies]
self.apps = modules.AppRegistry()
def start_app(self, host, port):
- self.apps.add(
- app.mapp,
- host,
- port
- )
+ self.apps.add(app.mapp, host, port)
def set_stream_large_bodies(self, max_size):
if max_size is not None:
@@ -46,31 +65,6 @@ class FlowMaster(controller.Master):
else:
self.stream_large_bodies = False
- def start_client_playback(self, flows, exit):
- """
- flows: List of flows.
- """
- self.client_playback = modules.ClientPlaybackState(flows, exit)
-
- def stop_client_playback(self):
- self.client_playback = None
-
- def tick(self, timeout):
- if self.client_playback:
- stop = (
- self.client_playback.done() and
- self.state.active_flow_count() == 0
- )
- exit = self.client_playback.exit
- if stop:
- self.stop_client_playback()
- if exit:
- self.shutdown()
- else:
- self.client_playback.tick(self)
-
- return super(FlowMaster, self).tick(timeout)
-
def duplicate_flow(self, f):
"""
Duplicate flow, and insert it into state without triggering any of
@@ -114,28 +108,9 @@ class FlowMaster(controller.Master):
f.request.host = self.server.config.upstream_server.address.host
f.request.port = self.server.config.upstream_server.address.port
f.request.scheme = self.server.config.upstream_server.scheme
-
- f.reply = controller.DummyReply()
- if f.request:
- self.request(f)
- if f.response:
- self.responseheaders(f)
- self.response(f)
- if f.error:
- self.error(f)
- elif isinstance(f, models.TCPFlow):
- messages = f.messages
- f.messages = []
- f.reply = controller.DummyReply()
- self.tcp_open(f)
- while messages:
- f.messages.append(messages.pop(0))
- self.tcp_message(f)
- if f.error:
- self.tcp_error(f)
- self.tcp_close(f)
- else:
- raise NotImplementedError()
+ f.reply = controller.DummyReply()
+ for e, o in event_sequence(f):
+ getattr(self, e)(o)
def load_flows(self, fr):
"""
@@ -163,35 +138,47 @@ class FlowMaster(controller.Master):
def replay_request(self, f, block=False):
"""
- Returns None if successful, or error message if not.
+ Returns an http_replay.RequestReplayThred object.
+ May raise exceptions.ReplayError.
"""
if f.live:
- return "Can't replay live request."
+ raise exceptions.ReplayError(
+ "Can't replay live flow."
+ )
if f.intercepted:
- return "Can't replay while intercepting..."
+ raise exceptions.ReplayError(
+ "Can't replay intercepted flow."
+ )
if f.request.raw_content is None:
- return "Can't replay request with missing content..."
- if f.request:
- f.backup()
- f.request.is_replay = True
-
- # TODO: We should be able to remove this.
- if "Content-Length" in f.request.headers:
- f.request.headers["Content-Length"] = str(len(f.request.raw_content))
-
- f.response = None
- f.error = None
- # FIXME: process through all addons?
- # self.process_new_request(f)
- rt = http_replay.RequestReplayThread(
- self.server.config,
- f,
- self.event_queue,
- self.should_exit
+ raise exceptions.ReplayError(
+ "Can't replay flow with missing content."
)
- rt.start() # pragma: no cover
- if block:
- rt.join()
+ if not f.request:
+ raise exceptions.ReplayError(
+ "Can't replay flow with missing request."
+ )
+
+ f.backup()
+ f.request.is_replay = True
+
+ # TODO: We should be able to remove this.
+ if "Content-Length" in f.request.headers:
+ f.request.headers["Content-Length"] = str(len(f.request.raw_content))
+
+ f.response = None
+ f.error = None
+ # FIXME: process through all addons?
+ # self.process_new_request(f)
+ rt = http_replay.RequestReplayThread(
+ self.server.config,
+ f,
+ self.event_queue,
+ self.should_exit
+ )
+ rt.start() # pragma: no cover
+ if block:
+ rt.join()
+ return rt
@controller.handler
def log(self, l):
@@ -220,9 +207,6 @@ class FlowMaster(controller.Master):
@controller.handler
def error(self, f):
self.state.update_flow(f)
- if self.client_playback:
- self.client_playback.clear(f)
- return f
@controller.handler
def request(self, f):
@@ -240,7 +224,6 @@ class FlowMaster(controller.Master):
return
if f not in self.state.flows: # don't add again on replay
self.state.add_flow(f)
- return f
@controller.handler
def responseheaders(self, f):
@@ -250,18 +233,14 @@ class FlowMaster(controller.Master):
except netlib.exceptions.HttpException:
f.reply.kill()
return
- return f
@controller.handler
def response(self, f):
self.state.update_flow(f)
- if self.client_playback:
- self.client_playback.clear(f)
- return f
@controller.handler
def websockets_handshake(self, f):
- return f
+ pass
def handle_intercept(self, f):
self.state.update_flow(f)
@@ -281,10 +260,7 @@ class FlowMaster(controller.Master):
@controller.handler
def tcp_error(self, flow):
- self.add_log("Error in TCP connection to {}: {}".format(
- repr(flow.server_conn.address),
- flow.error
- ), "info")
+ pass
@controller.handler
def tcp_close(self, flow):