diff options
Diffstat (limited to 'mitmproxy/flow/master.py')
-rw-r--r-- | mitmproxy/flow/master.py | 544 |
1 files changed, 544 insertions, 0 deletions
diff --git a/mitmproxy/flow/master.py b/mitmproxy/flow/master.py new file mode 100644 index 00000000..ec0bf36d --- /dev/null +++ b/mitmproxy/flow/master.py @@ -0,0 +1,544 @@ +from __future__ import absolute_import, print_function, division + +import os +import sys + +from typing import List, Optional, Set # noqa + +import netlib.exceptions +from mitmproxy import controller +from mitmproxy import exceptions +from mitmproxy import filt +from mitmproxy import models +from mitmproxy import script +from mitmproxy.flow import io +from mitmproxy.flow import modules +from mitmproxy.onboarding import app +from mitmproxy.protocol import http_replay +from mitmproxy.proxy.config import HostMatcher +from netlib import strutils + + +class FlowMaster(controller.Master): + + @property + def server(self): + # At some point, we may want to have support for multiple servers. + # For now, this suffices. + if len(self.servers) > 0: + return self.servers[0] + + def __init__(self, server, state): + super(FlowMaster, self).__init__() + if server: + self.add_server(server) + self.state = state + self.active_flows = set() # type: Set[models.Flow] + self.server_playback = None # type: Optional[modules.ServerPlaybackState] + self.client_playback = None # type: Optional[modules.ClientPlaybackState] + self.kill_nonreplay = False + self.scripts = [] # type: List[script.Script] + self.pause_scripts = False + + self.stickycookie_state = None # type: Optional[modules.StickyCookieState] + self.stickycookie_txt = None + + self.stickyauth_state = False # type: Optional[modules.StickyAuthState] + self.stickyauth_txt = None + + self.anticache = False + self.anticomp = False + self.stream_large_bodies = None # type: Optional[modules.StreamLargeBodies] + self.refresh_server_playback = False + self.replacehooks = modules.ReplaceHooks() + self.setheaders = modules.SetHeaders() + self.replay_ignore_params = False + self.replay_ignore_content = None + self.replay_ignore_host = False + + self.stream = None + self.apps = modules.AppRegistry() + + def start_app(self, host, port): + self.apps.add( + app.mapp, + host, + port + ) + + def add_event(self, e, level="info"): + """ + level: debug, info, error + """ + + def unload_scripts(self): + for s in self.scripts[:]: + self.unload_script(s) + + def unload_script(self, script_obj): + try: + script_obj.unload() + except script.ScriptException as e: + self.add_event("Script error:\n" + str(e), "error") + script.reloader.unwatch(script_obj) + self.scripts.remove(script_obj) + + def load_script(self, command, use_reloader=False): + """ + Loads a script. + + Raises: + ScriptException + """ + s = script.Script(command, script.ScriptContext(self)) + s.load() + if use_reloader: + script.reloader.watch(s, lambda: self.event_queue.put(("script_change", s))) + self.scripts.append(s) + + def _run_single_script_hook(self, script_obj, name, *args, **kwargs): + if script_obj and not self.pause_scripts: + try: + script_obj.run(name, *args, **kwargs) + except script.ScriptException as e: + self.add_event("Script error:\n{}".format(e), "error") + + def run_script_hook(self, name, *args, **kwargs): + for script_obj in self.scripts: + self._run_single_script_hook(script_obj, name, *args, **kwargs) + + def get_ignore_filter(self): + return self.server.config.check_ignore.patterns + + def set_ignore_filter(self, host_patterns): + self.server.config.check_ignore = HostMatcher(host_patterns) + + def get_tcp_filter(self): + return self.server.config.check_tcp.patterns + + def set_tcp_filter(self, host_patterns): + self.server.config.check_tcp = HostMatcher(host_patterns) + + def set_stickycookie(self, txt): + if txt: + flt = filt.parse(txt) + if not flt: + return "Invalid filter expression." + self.stickycookie_state = modules.StickyCookieState(flt) + self.stickycookie_txt = txt + else: + self.stickycookie_state = None + self.stickycookie_txt = None + + def set_stream_large_bodies(self, max_size): + if max_size is not None: + self.stream_large_bodies = modules.StreamLargeBodies(max_size) + else: + self.stream_large_bodies = False + + def set_stickyauth(self, txt): + if txt: + flt = filt.parse(txt) + if not flt: + return "Invalid filter expression." + self.stickyauth_state = modules.StickyAuthState(flt) + self.stickyauth_txt = txt + else: + self.stickyauth_state = None + self.stickyauth_txt = None + + def start_client_playback(self, flows, exit): + """ + flows: List of flows. + """ + self.client_playback = modules.ClientPlaybackState(flows, exit) + + def stop_client_playback(self): + self.client_playback = None + + def start_server_playback( + self, + flows, + kill, + headers, + exit, + nopop, + ignore_params, + ignore_content, + ignore_payload_params, + ignore_host): + """ + flows: List of flows. + kill: Boolean, should we kill requests not part of the replay? + ignore_params: list of parameters to ignore in server replay + ignore_content: true if request content should be ignored in server replay + ignore_payload_params: list of content params to ignore in server replay + ignore_host: true if request host should be ignored in server replay + """ + self.server_playback = modules.ServerPlaybackState( + headers, + flows, + exit, + nopop, + ignore_params, + ignore_content, + ignore_payload_params, + ignore_host) + self.kill_nonreplay = kill + + def stop_server_playback(self): + self.server_playback = None + + def do_server_playback(self, flow): + """ + This method should be called by child classes in the request + handler. Returns True if playback has taken place, None if not. + """ + if self.server_playback: + rflow = self.server_playback.next_flow(flow) + if not rflow: + return None + response = rflow.response.copy() + response.is_replay = True + if self.refresh_server_playback: + response.refresh() + flow.response = response + return True + return None + + def tick(self, timeout): + if self.client_playback: + stop = ( + self.client_playback.done() and + self.state.active_flow_count() == 0 + ) + exit = self.client_playback.exit + if stop: + self.stop_client_playback() + if exit: + self.shutdown() + else: + self.client_playback.tick(self) + + if self.server_playback: + stop = ( + self.server_playback.count() == 0 and + self.state.active_flow_count() == 0 and + not self.kill_nonreplay + ) + exit = self.server_playback.exit + if stop: + self.stop_server_playback() + if exit: + self.shutdown() + return super(FlowMaster, self).tick(timeout) + + def duplicate_flow(self, f): + f2 = f.copy() + self.load_flow(f2) + return f2 + + def create_request(self, method, scheme, host, port, path): + """ + this method creates a new artificial and minimalist request also adds it to flowlist + """ + c = models.ClientConnection.make_dummy(("", 0)) + s = models.ServerConnection.make_dummy((host, port)) + + f = models.HTTPFlow(c, s) + headers = models.Headers() + + req = models.HTTPRequest( + "absolute", + method, + scheme, + host, + port, + path, + b"HTTP/1.1", + headers, + b"" + ) + f.request = req + self.load_flow(f) + return f + + def load_flow(self, f): + """ + Loads a flow + """ + if isinstance(f, models.HTTPFlow): + if self.server and self.server.config.mode == "reverse": + f.request.host = self.server.config.upstream_server.address.host + f.request.port = self.server.config.upstream_server.address.port + f.request.scheme = self.server.config.upstream_server.scheme + + f.reply = controller.DummyReply() + if f.request: + self.request(f) + if f.response: + self.responseheaders(f) + self.response(f) + if f.error: + self.error(f) + elif isinstance(f, models.TCPFlow): + messages = f.messages + f.messages = [] + f.reply = controller.DummyReply() + self.tcp_open(f) + while messages: + f.messages.append(messages.pop(0)) + self.tcp_message(f) + if f.error: + self.tcp_error(f) + self.tcp_close(f) + else: + raise NotImplementedError() + + def load_flows(self, fr): + """ + Load flows from a FlowReader object. + """ + cnt = 0 + for i in fr.stream(): + cnt += 1 + self.load_flow(i) + return cnt + + def load_flows_file(self, path): + path = os.path.expanduser(path) + try: + if path == "-": + # This is incompatible with Python 3 - maybe we can use click? + freader = io.FlowReader(sys.stdin) + return self.load_flows(freader) + else: + with open(path, "rb") as f: + freader = io.FlowReader(f) + return self.load_flows(freader) + except IOError as v: + raise exceptions.FlowReadException(v.strerror) + + def process_new_request(self, f): + if self.stickycookie_state: + self.stickycookie_state.handle_request(f) + if self.stickyauth_state: + self.stickyauth_state.handle_request(f) + + if self.anticache: + f.request.anticache() + if self.anticomp: + f.request.anticomp() + + if self.server_playback: + pb = self.do_server_playback(f) + if not pb and self.kill_nonreplay: + f.kill(self) + + def process_new_response(self, f): + if self.stickycookie_state: + self.stickycookie_state.handle_response(f) + + def replay_request(self, f, block=False, run_scripthooks=True): + """ + Returns None if successful, or error message if not. + """ + if f.live and run_scripthooks: + return "Can't replay live request." + if f.intercepted: + return "Can't replay while intercepting..." + if f.request.content is None: + return "Can't replay request with missing content..." + if f.request: + f.backup() + f.request.is_replay = True + if "Content-Length" in f.request.headers: + f.request.headers["Content-Length"] = str(len(f.request.content)) + f.response = None + f.error = None + self.process_new_request(f) + rt = http_replay.RequestReplayThread( + self.server.config, + f, + self.event_queue if run_scripthooks else False, + self.should_exit + ) + rt.start() # pragma: no cover + if block: + rt.join() + + @controller.handler + def log(self, l): + self.add_event(l.msg, l.level) + + @controller.handler + def clientconnect(self, root_layer): + self.run_script_hook("clientconnect", root_layer) + + @controller.handler + def clientdisconnect(self, root_layer): + self.run_script_hook("clientdisconnect", root_layer) + + @controller.handler + def serverconnect(self, server_conn): + self.run_script_hook("serverconnect", server_conn) + + @controller.handler + def serverdisconnect(self, server_conn): + self.run_script_hook("serverdisconnect", server_conn) + + @controller.handler + def next_layer(self, top_layer): + self.run_script_hook("next_layer", top_layer) + + @controller.handler + def error(self, f): + self.state.update_flow(f) + self.run_script_hook("error", f) + if self.client_playback: + self.client_playback.clear(f) + return f + + @controller.handler + def request(self, f): + if f.live: + app = self.apps.get(f.request) + if app: + err = app.serve( + f, + f.client_conn.wfile, + **{"mitmproxy.master": self} + ) + if err: + self.add_event("Error in wsgi app. %s" % err, "error") + f.reply(exceptions.Kill) + return + if f not in self.state.flows: # don't add again on replay + self.state.add_flow(f) + self.active_flows.add(f) + self.replacehooks.run(f) + self.setheaders.run(f) + self.process_new_request(f) + self.run_script_hook("request", f) + return f + + @controller.handler + def responseheaders(self, f): + try: + if self.stream_large_bodies: + self.stream_large_bodies.run(f, False) + except netlib.exceptions.HttpException: + f.reply(exceptions.Kill) + return + self.run_script_hook("responseheaders", f) + return f + + @controller.handler + def response(self, f): + self.active_flows.discard(f) + self.state.update_flow(f) + self.replacehooks.run(f) + self.setheaders.run(f) + self.run_script_hook("response", f) + if self.client_playback: + self.client_playback.clear(f) + self.process_new_response(f) + if self.stream: + self.stream.add(f) + return f + + def handle_intercept(self, f): + self.state.update_flow(f) + + def handle_accept_intercept(self, f): + self.state.update_flow(f) + + @controller.handler + def script_change(self, s): + """ + Handle a script whose contents have been changed on the file system. + + Args: + s (script.Script): the changed script + + Returns: + True, if reloading was successful. + False, otherwise. + """ + ok = True + # We deliberately do not want to fail here. + # In the worst case, we have an "empty" script object. + try: + s.unload() + except script.ScriptException as e: + ok = False + self.add_event('Error reloading "{}":\n{}'.format(s.filename, e), 'error') + try: + s.load() + except script.ScriptException as e: + ok = False + self.add_event('Error reloading "{}":\n{}'.format(s.filename, e), 'error') + else: + self.add_event('"{}" reloaded.'.format(s.filename), 'info') + return ok + + @controller.handler + def tcp_open(self, flow): + # TODO: This would break mitmproxy currently. + # self.state.add_flow(flow) + self.active_flows.add(flow) + self.run_script_hook("tcp_open", flow) + + @controller.handler + def tcp_message(self, flow): + self.run_script_hook("tcp_message", flow) + message = flow.messages[-1] + direction = "->" if message.from_client else "<-" + self.add_event("{client} {direction} tcp {direction} {server}".format( + client=repr(flow.client_conn.address), + server=repr(flow.server_conn.address), + direction=direction, + ), "info") + self.add_event(strutils.clean_bin(message.content), "debug") + + @controller.handler + def tcp_error(self, flow): + self.add_event("Error in TCP connection to {}: {}".format( + repr(flow.server_conn.address), + flow.error + ), "info") + self.run_script_hook("tcp_error", flow) + + @controller.handler + def tcp_close(self, flow): + self.active_flows.discard(flow) + if self.stream: + self.stream.add(flow) + self.run_script_hook("tcp_close", flow) + + def shutdown(self): + super(FlowMaster, self).shutdown() + + # Add all flows that are still active + if self.stream: + for flow in self.active_flows: + self.stream.add(flow) + self.stop_stream() + + self.unload_scripts() + + def start_stream(self, fp, filt): + self.stream = io.FilteredFlowWriter(fp, filt) + + def stop_stream(self): + self.stream.fo.close() + self.stream = None + + def start_stream_to_path(self, path, mode="wb", filt=None): + path = os.path.expanduser(path) + try: + f = open(path, mode) + self.start_stream(f, filt) + except IOError as v: + return str(v) + self.stream_path = path |