diff options
Diffstat (limited to 'mitmproxy/flow/master.py')
-rw-r--r-- | mitmproxy/flow/master.py | 231 |
1 files changed, 66 insertions, 165 deletions
diff --git a/mitmproxy/flow/master.py b/mitmproxy/flow/master.py index 9cdcc8dd..94b46f3f 100644 --- a/mitmproxy/flow/master.py +++ b/mitmproxy/flow/master.py @@ -15,6 +15,30 @@ from mitmproxy.onboarding import app from mitmproxy.protocol import http_replay +def event_sequence(f): + if isinstance(f, models.HTTPFlow): + if f.request: + yield "request", f + if f.response: + yield "responseheaders", f + yield "response", f + if f.error: + yield "error", f + elif isinstance(f, models.TCPFlow): + messages = f.messages + f.messages = [] + f.reply = controller.DummyReply() + yield "tcp_open", f + while messages: + f.messages.append(messages.pop(0)) + yield "tcp_message", f + if f.error: + yield "tcp_error", f + yield "tcp_close", f + else: + raise NotImplementedError + + class FlowMaster(controller.Master): @property @@ -29,23 +53,11 @@ class FlowMaster(controller.Master): if server: self.add_server(server) self.state = state - self.server_playback = None # type: Optional[modules.ServerPlaybackState] - self.client_playback = None # type: Optional[modules.ClientPlaybackState] - self.kill_nonreplay = False - self.stream_large_bodies = None # type: Optional[modules.StreamLargeBodies] - self.replay_ignore_params = False - self.replay_ignore_content = None - self.replay_ignore_host = False - self.apps = modules.AppRegistry() def start_app(self, host, port): - self.apps.add( - app.mapp, - host, - port - ) + self.apps.add(app.mapp, host, port) def set_stream_large_bodies(self, max_size): if max_size is not None: @@ -53,92 +65,6 @@ class FlowMaster(controller.Master): else: self.stream_large_bodies = False - def start_client_playback(self, flows, exit): - """ - flows: List of flows. - """ - self.client_playback = modules.ClientPlaybackState(flows, exit) - - def stop_client_playback(self): - self.client_playback = None - - def start_server_playback( - self, - flows, - kill, - headers, - exit, - nopop, - ignore_params, - ignore_content, - ignore_payload_params, - ignore_host): - """ - flows: List of flows. - kill: Boolean, should we kill requests not part of the replay? - ignore_params: list of parameters to ignore in server replay - ignore_content: true if request content should be ignored in server replay - ignore_payload_params: list of content params to ignore in server replay - ignore_host: true if request host should be ignored in server replay - """ - self.server_playback = modules.ServerPlaybackState( - headers, - flows, - exit, - nopop, - ignore_params, - ignore_content, - ignore_payload_params, - ignore_host) - self.kill_nonreplay = kill - - def stop_server_playback(self): - self.server_playback = None - - def do_server_playback(self, flow): - """ - This method should be called by child classes in the request - handler. Returns True if playback has taken place, None if not. - """ - if self.server_playback: - rflow = self.server_playback.next_flow(flow) - if not rflow: - return None - response = rflow.response.copy() - response.is_replay = True - if self.options.refresh_server_playback: - response.refresh() - flow.response = response - return True - return None - - def tick(self, timeout): - if self.client_playback: - stop = ( - self.client_playback.done() and - self.state.active_flow_count() == 0 - ) - exit = self.client_playback.exit - if stop: - self.stop_client_playback() - if exit: - self.shutdown() - else: - self.client_playback.tick(self) - - if self.server_playback: - stop = ( - self.server_playback.count() == 0 and - self.state.active_flow_count() == 0 and - not self.kill_nonreplay - ) - exit = self.server_playback.exit - if stop: - self.stop_server_playback() - if exit: - self.shutdown() - return super(FlowMaster, self).tick(timeout) - def duplicate_flow(self, f): """ Duplicate flow, and insert it into state without triggering any of @@ -182,28 +108,9 @@ class FlowMaster(controller.Master): f.request.host = self.server.config.upstream_server.address.host f.request.port = self.server.config.upstream_server.address.port f.request.scheme = self.server.config.upstream_server.scheme - - f.reply = controller.DummyReply() - if f.request: - self.request(f) - if f.response: - self.responseheaders(f) - self.response(f) - if f.error: - self.error(f) - elif isinstance(f, models.TCPFlow): - messages = f.messages - f.messages = [] - f.reply = controller.DummyReply() - self.tcp_open(f) - while messages: - f.messages.append(messages.pop(0)) - self.tcp_message(f) - if f.error: - self.tcp_error(f) - self.tcp_close(f) - else: - raise NotImplementedError() + f.reply = controller.DummyReply() + for e, o in event_sequence(f): + getattr(self, e)(o) def load_flows(self, fr): """ @@ -229,43 +136,49 @@ class FlowMaster(controller.Master): except IOError as v: raise exceptions.FlowReadException(v.strerror) - def process_new_request(self, f): - if self.server_playback: - pb = self.do_server_playback(f) - if not pb and self.kill_nonreplay: - self.add_log("Killed {}".format(f.request.url), "info") - f.reply.kill() - def replay_request(self, f, block=False): """ - Returns None if successful, or error message if not. + Returns an http_replay.RequestReplayThred object. + May raise exceptions.ReplayError. """ if f.live: - return "Can't replay live request." + raise exceptions.ReplayError( + "Can't replay live flow." + ) if f.intercepted: - return "Can't replay while intercepting..." + raise exceptions.ReplayError( + "Can't replay intercepted flow." + ) if f.request.raw_content is None: - return "Can't replay request with missing content..." - if f.request: - f.backup() - f.request.is_replay = True - - # TODO: We should be able to remove this. - if "Content-Length" in f.request.headers: - f.request.headers["Content-Length"] = str(len(f.request.raw_content)) - - f.response = None - f.error = None - self.process_new_request(f) - rt = http_replay.RequestReplayThread( - self.server.config, - f, - self.event_queue, - self.should_exit + raise exceptions.ReplayError( + "Can't replay flow with missing content." + ) + if not f.request: + raise exceptions.ReplayError( + "Can't replay flow with missing request." ) - rt.start() # pragma: no cover - if block: - rt.join() + + f.backup() + f.request.is_replay = True + + # TODO: We should be able to remove this. + if "Content-Length" in f.request.headers: + f.request.headers["Content-Length"] = str(len(f.request.raw_content)) + + f.response = None + f.error = None + # FIXME: process through all addons? + # self.process_new_request(f) + rt = http_replay.RequestReplayThread( + self.server.config, + f, + self.event_queue, + self.should_exit + ) + rt.start() # pragma: no cover + if block: + rt.join() + return rt @controller.handler def log(self, l): @@ -294,9 +207,6 @@ class FlowMaster(controller.Master): @controller.handler def error(self, f): self.state.update_flow(f) - if self.client_playback: - self.client_playback.clear(f) - return f @controller.handler def request(self, f): @@ -314,8 +224,6 @@ class FlowMaster(controller.Master): return if f not in self.state.flows: # don't add again on replay self.state.add_flow(f) - self.process_new_request(f) - return f @controller.handler def responseheaders(self, f): @@ -325,18 +233,14 @@ class FlowMaster(controller.Master): except netlib.exceptions.HttpException: f.reply.kill() return - return f @controller.handler def response(self, f): self.state.update_flow(f) - if self.client_playback: - self.client_playback.clear(f) - return f @controller.handler def websockets_handshake(self, f): - return f + pass def handle_intercept(self, f): self.state.update_flow(f) @@ -356,10 +260,7 @@ class FlowMaster(controller.Master): @controller.handler def tcp_error(self, flow): - self.add_log("Error in TCP connection to {}: {}".format( - repr(flow.server_conn.address), - flow.error - ), "info") + pass @controller.handler def tcp_close(self, flow): |