diff options
Diffstat (limited to 'libmproxy/flow.py')
-rw-r--r-- | libmproxy/flow.py | 207 |
1 files changed, 207 insertions, 0 deletions
diff --git a/libmproxy/flow.py b/libmproxy/flow.py new file mode 100644 index 00000000..bdca5ebd --- /dev/null +++ b/libmproxy/flow.py @@ -0,0 +1,207 @@ +""" + This module provides more sophisticated flow tracking. These match requests + with their responses, and provide filtering and interception facilities. +""" +import proxy, threading + +class ReplayConnection: + pass + + +class ReplayThread(threading.Thread): + def __init__(self, flow, masterq): + self.flow, self.masterq = flow, masterq + threading.Thread.__init__(self) + + def run(self): + try: + server = proxy.ServerConnection(self.flow.request) + response = server.read_response() + response.send(self.masterq) + except proxy.ProxyError, v: + err = proxy.Error(self.flow.connection, v.msg) + err.send(self.masterq) + + +class Flow: + def __init__(self, connection): + self.connection = connection + self.request, self.response, self.error = None, None, None + self.waiting = True + self.intercepting = False + self._backup = None + + def backup(self): + if not self._backup: + self._backup = [ + self.connection.copy() if self.connection else None, + self.request.copy() if self.request else None, + self.response.copy() if self.response else None, + self.error.copy() if self.error else None, + ] + + def revert(self): + if self._backup: + self.waiting = False + restore = [i.copy() if i else None for i in self._backup] + self.connection, self.request, self.response, self.error = restore + + def match(self, pattern): + if pattern: + if self.response: + return pattern(self.response) + elif self.request: + return pattern(self.request) + return False + + def is_replay(self): + return isinstance(self.connection, ReplayConnection) + + def kill(self): + if self.intercepting: + if not self.request.acked: + self.request.kill = True + self.request.ack() + elif self.response and not self.response.acked: + self.response.kill = True + self.response.ack() + self.intercepting = False + + def intercept(self): + self.intercepting = True + + def accept_intercept(self): + if self.request: + if not self.request.acked: + self.request.ack() + elif self.response and not self.response.acked: + self.response.ack() + self.intercepting = False + + +class State: + def __init__(self): + self.flow_map = {} + self.flow_list = [] + # These are compiled filt expressions: + self.limit = None + self.intercept = None + + def add_browserconnect(self, f): + """ + Start a browser connection. + """ + self.flow_list.insert(0, f) + self.flow_map[f.connection] = f + + def add_request(self, req): + """ + Add a request to the state. Returns the matching flow. + """ + f = self.flow_map.get(req.connection) + if not f: + return False + f.request = req + return f + + def add_response(self, resp): + """ + Add a response to the state. Returns the matching flow. + """ + f = self.flow_map.get(resp.request.connection) + if not f: + return False + f.response = resp + f.waiting = False + f.backup() + return f + + def add_error(self, err): + """ + Add an error response to the state. Returns the matching flow, or + None if there isn't one. + """ + f = self.flow_map.get(err.connection) + if not f: + return None + f.error = err + f.waiting = False + f.backup() + return f + + def set_limit(self, limit): + """ + Limit is a compiled filter expression, or None. + """ + self.limit = limit + + def get_connection(self, itm): + if isinstance(itm, (proxy.BrowserConnection, ReplayConnection)): + return itm + elif hasattr(itm, "connection"): + return itm.connection + elif hasattr(itm, "request"): + return itm.request.connection + + def lookup(self, itm): + """ + Checks for matching connection, using a Flow, Replay Connection, + BrowserConnection, Request, Response or Error object. Returns None + if not found. + """ + connection = self.get_connection(itm) + return self.flow_map.get(connection) + + def delete_flow(self, f): + if not f.intercepting: + c = self.get_connection(f) + del self.flow_map[c] + self.flow_list.remove(f) + return True + return False + + def clear(self): + for i in self.flow_list[:]: + self.delete_flow(i) + + def accept_all(self): + for i in self.flow_list[:]: + i.accept_intercept() + + def kill_flow(self, f): + f.kill() + self.delete_flow(f) + + def revert(self, f): + """ + Replaces the matching connection object with a ReplayConnection object. + """ + conn = self.get_connection(f) + del self.flow_map[conn] + f.revert() + self.flow_map[f.connection] = f + + def replay(self, f, masterq): + """ + Replaces the matching connection object with a ReplayConnection object. + + Returns None if successful, or error message if not. + """ + #begin nocover + if f.intercepting: + return "Can't replay while intercepting..." + if f.request: + f.backup() + conn = self.get_connection(f) + del self.flow_map[conn] + rp = ReplayConnection() + f.connection = rp + f.request.connection = rp + if f.request.content: + f.request.headers["content-length"] = [str(len(f.request.content))] + f.response = None + f.error = None + self.flow_map[rp] = f + rt = ReplayThread(f, masterq) + rt.start() + #end nocover |