diff options
Diffstat (limited to 'mitmproxy/flow/master.py')
-rw-r--r-- | mitmproxy/flow/master.py | 156 |
1 files changed, 66 insertions, 90 deletions
diff --git a/mitmproxy/flow/master.py b/mitmproxy/flow/master.py index b71c2c8d..94b46f3f 100644 --- a/mitmproxy/flow/master.py +++ b/mitmproxy/flow/master.py @@ -15,6 +15,30 @@ from mitmproxy.onboarding import app from mitmproxy.protocol import http_replay +def event_sequence(f): + if isinstance(f, models.HTTPFlow): + if f.request: + yield "request", f + if f.response: + yield "responseheaders", f + yield "response", f + if f.error: + yield "error", f + elif isinstance(f, models.TCPFlow): + messages = f.messages + f.messages = [] + f.reply = controller.DummyReply() + yield "tcp_open", f + while messages: + f.messages.append(messages.pop(0)) + yield "tcp_message", f + if f.error: + yield "tcp_error", f + yield "tcp_close", f + else: + raise NotImplementedError + + class FlowMaster(controller.Master): @property @@ -29,16 +53,11 @@ class FlowMaster(controller.Master): if server: self.add_server(server) self.state = state - self.client_playback = None # type: Optional[modules.ClientPlaybackState] self.stream_large_bodies = None # type: Optional[modules.StreamLargeBodies] self.apps = modules.AppRegistry() def start_app(self, host, port): - self.apps.add( - app.mapp, - host, - port - ) + self.apps.add(app.mapp, host, port) def set_stream_large_bodies(self, max_size): if max_size is not None: @@ -46,31 +65,6 @@ class FlowMaster(controller.Master): else: self.stream_large_bodies = False - def start_client_playback(self, flows, exit): - """ - flows: List of flows. - """ - self.client_playback = modules.ClientPlaybackState(flows, exit) - - def stop_client_playback(self): - self.client_playback = None - - def tick(self, timeout): - if self.client_playback: - stop = ( - self.client_playback.done() and - self.state.active_flow_count() == 0 - ) - exit = self.client_playback.exit - if stop: - self.stop_client_playback() - if exit: - self.shutdown() - else: - self.client_playback.tick(self) - - return super(FlowMaster, self).tick(timeout) - def duplicate_flow(self, f): """ Duplicate flow, and insert it into state without triggering any of @@ -114,28 +108,9 @@ class FlowMaster(controller.Master): f.request.host = self.server.config.upstream_server.address.host f.request.port = self.server.config.upstream_server.address.port f.request.scheme = self.server.config.upstream_server.scheme - - f.reply = controller.DummyReply() - if f.request: - self.request(f) - if f.response: - self.responseheaders(f) - self.response(f) - if f.error: - self.error(f) - elif isinstance(f, models.TCPFlow): - messages = f.messages - f.messages = [] - f.reply = controller.DummyReply() - self.tcp_open(f) - while messages: - f.messages.append(messages.pop(0)) - self.tcp_message(f) - if f.error: - self.tcp_error(f) - self.tcp_close(f) - else: - raise NotImplementedError() + f.reply = controller.DummyReply() + for e, o in event_sequence(f): + getattr(self, e)(o) def load_flows(self, fr): """ @@ -163,35 +138,47 @@ class FlowMaster(controller.Master): def replay_request(self, f, block=False): """ - Returns None if successful, or error message if not. + Returns an http_replay.RequestReplayThred object. + May raise exceptions.ReplayError. """ if f.live: - return "Can't replay live request." + raise exceptions.ReplayError( + "Can't replay live flow." + ) if f.intercepted: - return "Can't replay while intercepting..." + raise exceptions.ReplayError( + "Can't replay intercepted flow." + ) if f.request.raw_content is None: - return "Can't replay request with missing content..." - if f.request: - f.backup() - f.request.is_replay = True - - # TODO: We should be able to remove this. - if "Content-Length" in f.request.headers: - f.request.headers["Content-Length"] = str(len(f.request.raw_content)) - - f.response = None - f.error = None - # FIXME: process through all addons? - # self.process_new_request(f) - rt = http_replay.RequestReplayThread( - self.server.config, - f, - self.event_queue, - self.should_exit + raise exceptions.ReplayError( + "Can't replay flow with missing content." ) - rt.start() # pragma: no cover - if block: - rt.join() + if not f.request: + raise exceptions.ReplayError( + "Can't replay flow with missing request." + ) + + f.backup() + f.request.is_replay = True + + # TODO: We should be able to remove this. + if "Content-Length" in f.request.headers: + f.request.headers["Content-Length"] = str(len(f.request.raw_content)) + + f.response = None + f.error = None + # FIXME: process through all addons? + # self.process_new_request(f) + rt = http_replay.RequestReplayThread( + self.server.config, + f, + self.event_queue, + self.should_exit + ) + rt.start() # pragma: no cover + if block: + rt.join() + return rt @controller.handler def log(self, l): @@ -220,9 +207,6 @@ class FlowMaster(controller.Master): @controller.handler def error(self, f): self.state.update_flow(f) - if self.client_playback: - self.client_playback.clear(f) - return f @controller.handler def request(self, f): @@ -240,7 +224,6 @@ class FlowMaster(controller.Master): return if f not in self.state.flows: # don't add again on replay self.state.add_flow(f) - return f @controller.handler def responseheaders(self, f): @@ -250,18 +233,14 @@ class FlowMaster(controller.Master): except netlib.exceptions.HttpException: f.reply.kill() return - return f @controller.handler def response(self, f): self.state.update_flow(f) - if self.client_playback: - self.client_playback.clear(f) - return f @controller.handler def websockets_handshake(self, f): - return f + pass def handle_intercept(self, f): self.state.update_flow(f) @@ -281,10 +260,7 @@ class FlowMaster(controller.Master): @controller.handler def tcp_error(self, flow): - self.add_log("Error in TCP connection to {}: {}".format( - repr(flow.server_conn.address), - flow.error - ), "info") + pass @controller.handler def tcp_close(self, flow): |