diff options
author | Aldo Cortesi <aldo@nullcube.com> | 2014-12-12 22:08:15 +1300 |
---|---|---|
committer | Aldo Cortesi <aldo@nullcube.com> | 2014-12-12 22:08:15 +1300 |
commit | 01fa5d3f07d26d52e5ad7eef139e1ed6f9b7dae1 (patch) | |
tree | 43c2460a9dc670421ee4e361b133a2aa45ae9e31 /libmproxy/flow.py | |
parent | 93d4a0132a1f31597fa24a5001c4c2b2cd752b4f (diff) | |
parent | dbb51640d967f7857ceb70b5b697e089085b7c6b (diff) | |
download | mitmproxy-01fa5d3f07d26d52e5ad7eef139e1ed6f9b7dae1.tar.gz mitmproxy-01fa5d3f07d26d52e5ad7eef139e1ed6f9b7dae1.tar.bz2 mitmproxy-01fa5d3f07d26d52e5ad7eef139e1ed6f9b7dae1.zip |
Merge pull request #414 from mitmproxy/flowviews2
Flowviews2
Diffstat (limited to 'libmproxy/flow.py')
-rw-r--r-- | libmproxy/flow.py | 257 |
1 files changed, 191 insertions, 66 deletions
diff --git a/libmproxy/flow.py b/libmproxy/flow.py index a6bf17d8..d3ae383e 100644 --- a/libmproxy/flow.py +++ b/libmproxy/flow.py @@ -2,6 +2,7 @@ This module provides more sophisticated flow tracking and provides filtering and interception facilities. """ from __future__ import absolute_import +from abc import abstractmethod, ABCMeta import hashlib import Cookie import cookielib @@ -338,80 +339,216 @@ class StickyAuthState: f.request.headers["authorization"] = self.hosts[host] +class FlowList(object): + __metaclass__ = ABCMeta + + def __iter__(self): + return iter(self._list) + + def __contains__(self, item): + return item in self._list + + def __getitem__(self, item): + return self._list[item] + + def __nonzero__(self): + return bool(self._list) + + def __len__(self): + return len(self._list) + + def index(self, f): + return self._list.index(f) + + @abstractmethod + def _add(self, f): + return + + @abstractmethod + def _update(self, f): + return + + @abstractmethod + def _remove(self, f): + return + + +class FlowView(FlowList): + def __init__(self, store, filt=None): + self._list = [] + if not filt: + filt = lambda flow: True + self._build(store, filt) + + self.store = store + self.store.views.append(self) + + def _close(self): + self.store.views.remove(self) + + def _build(self, flows, filt=None): + if filt: + self.filt = filt + self._list = list(filter(self.filt, flows)) + + def _add(self, f): + if self.filt(f): + self._list.append(f) + + def _update(self, f): + if f not in self._list: + self._add(f) + elif not self.filt(f): + self._remove(f) + + def _remove(self, f): + if f in self._list: + self._list.remove(f) + + def _recalculate(self, flows): + self._build(flows) + + +class FlowStore(FlowList): + """ + Responsible for handling flows in the state: + Keeps a list of all flows and provides views on them. + """ + def __init__(self): + self._list = [] + self._set = set() # Used for O(1) lookups + self.views = [] + self._recalculate_views() + + def __contains__(self, f): + return f in self._set + + def _add(self, f): + """ + Adds a flow to the state. + The flow to add must not be present in the state. + """ + self._list.append(f) + self._set.add(f) + for view in self.views: + view._add(f) + + def _update(self, f): + """ + Notifies the state that a flow has been updated. + The flow must be present in the state. + """ + for view in self.views: + view._update(f) + + def _remove(self, f): + """ + Deletes a flow from the state. + The flow must be present in the state. + """ + self._list.remove(f) + self._set.remove(f) + for view in self.views: + view._remove(f) + + # Expensive bulk operations + + def _extend(self, flows): + """ + Adds a list of flows to the state. + The list of flows to add must not contain flows that are already in the state. + """ + self._list.extend(flows) + self._set.update(flows) + self._recalculate_views() + + def _clear(self): + self._list = [] + self._set = set() + self._recalculate_views() + + def _recalculate_views(self): + """ + Expensive operation: Recalculate all the views after a bulk change. + """ + for view in self.views: + view._recalculate(self) + + # Utility functions. + # There are some common cases where we need to argue about all flows + # irrespective of filters on the view etc (i.e. on shutdown). + + def active_count(self): + c = 0 + for i in self._list: + if not i.response and not i.error: + c += 1 + return c + + # TODO: Should accept_all operate on views or on all flows? + def accept_all(self): + for f in self._list: + f.accept_intercept() + + def kill_all(self, master): + for f in self._list: + f.kill(master) + + class State(object): def __init__(self): - self._flow_list = [] - self.view = [] + self.flows = FlowStore() + self.view = FlowView(self.flows, None) # These are compiled filt expressions: - self._limit = None self.intercept = None @property def limit_txt(self): - if self._limit: - return self._limit.pattern - else: - return None + return getattr(self.view.filt, "pattern", None) def flow_count(self): - return len(self._flow_list) + return len(self.flows) + # TODO: All functions regarding flows that don't cause side-effects should be moved into FlowStore. def index(self, f): - return self._flow_list.index(f) + return self.flows.index(f) def active_flow_count(self): - c = 0 - for i in self._flow_list: - if not i.response and not i.error: - c += 1 - return c + return self.flows.active_count() - def add_request(self, flow): + def add_flow(self, f): """ - Add a request to the state. Returns the matching flow. + Add a request to the state. """ - if flow in self._flow_list: # catch flow replay - return flow - self._flow_list.append(flow) - if flow.match(self._limit): - self.view.append(flow) - return flow - - def add_response(self, f): - """ - Add a response to the state. Returns the matching flow. - """ - if not f: - return False - if f.match(self._limit) and not f in self.view: - self.view.append(f) + self.flows._add(f) return f - def add_error(self, f): + def update_flow(self, f): """ - Add an error response to the state. Returns the matching flow, or - None if there isn't one. + Add a response to the state. """ - if not f: - return None - if f.match(self._limit) and not f in self.view: - self.view.append(f) + self.flows._update(f) return f + def delete_flow(self, f): + self.flows._remove(f) + def load_flows(self, flows): - self._flow_list.extend(flows) - self.recalculate_view() + self.flows._extend(flows) def set_limit(self, txt): + if txt == self.limit_txt: + return if txt: f = filt.parse(txt) if not f: return "Invalid filter expression." - self._limit = f + self.view._close() + self.view = FlowView(self.flows, f) else: - self._limit = None - self.recalculate_view() + self.view._close() + self.view = FlowView(self.flows, None) def set_intercept(self, txt): if txt: @@ -419,37 +556,24 @@ class State(object): if not f: return "Invalid filter expression." self.intercept = f - self.intercept_txt = txt else: self.intercept = None - self.intercept_txt = None - - def recalculate_view(self): - if self._limit: - self.view = [i for i in self._flow_list if i.match(self._limit)] - else: - self.view = self._flow_list[:] - def delete_flow(self, f): - self._flow_list.remove(f) - if f in self.view: - self.view.remove(f) - return True + @property + def intercept_txt(self): + return getattr(self.intercept, "pattern", None) def clear(self): - for i in self._flow_list[:]: - self.delete_flow(i) + self.flows._clear() def accept_all(self): - for i in self._flow_list[:]: - i.accept_intercept() + self.flows.accept_all() def revert(self, f): f.revert() def killall(self, master): - for i in self._flow_list: - i.kill(master) + self.flows.kill_all(master) class FlowMaster(controller.Master): @@ -716,7 +840,7 @@ class FlowMaster(controller.Master): sc.reply() def handle_error(self, f): - self.state.add_error(f) + self.state.update_flow(f) self.run_script_hook("error", f) if self.client_playback: self.client_playback.clear(f) @@ -736,7 +860,8 @@ class FlowMaster(controller.Master): self.add_event("Error in wsgi app. %s"%err, "error") f.reply(protocol.KILL) return - self.state.add_request(f) + if f not in self.state.flows: # don't add again on replay + self.state.add_flow(f) self.replacehooks.run(f) self.setheaders.run(f) self.run_script_hook("request", f) @@ -757,7 +882,7 @@ class FlowMaster(controller.Master): return f def handle_response(self, f): - self.state.add_response(f) + self.state.update_flow(f) self.replacehooks.run(f) self.setheaders.run(f) self.run_script_hook("response", f) @@ -772,7 +897,7 @@ class FlowMaster(controller.Master): self.unload_scripts() controller.Master.shutdown(self) if self.stream: - for i in self.state._flow_list: + for i in self.state.flows: if not i.response: self.stream.add(i) self.stop_stream() |