aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy/flow.py
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@nullcube.com>2014-12-12 22:08:15 +1300
committerAldo Cortesi <aldo@nullcube.com>2014-12-12 22:08:15 +1300
commit01fa5d3f07d26d52e5ad7eef139e1ed6f9b7dae1 (patch)
tree43c2460a9dc670421ee4e361b133a2aa45ae9e31 /libmproxy/flow.py
parent93d4a0132a1f31597fa24a5001c4c2b2cd752b4f (diff)
parentdbb51640d967f7857ceb70b5b697e089085b7c6b (diff)
downloadmitmproxy-01fa5d3f07d26d52e5ad7eef139e1ed6f9b7dae1.tar.gz
mitmproxy-01fa5d3f07d26d52e5ad7eef139e1ed6f9b7dae1.tar.bz2
mitmproxy-01fa5d3f07d26d52e5ad7eef139e1ed6f9b7dae1.zip
Merge pull request #414 from mitmproxy/flowviews2
Flowviews2
Diffstat (limited to 'libmproxy/flow.py')
-rw-r--r--libmproxy/flow.py257
1 files changed, 191 insertions, 66 deletions
diff --git a/libmproxy/flow.py b/libmproxy/flow.py
index a6bf17d8..d3ae383e 100644
--- a/libmproxy/flow.py
+++ b/libmproxy/flow.py
@@ -2,6 +2,7 @@
This module provides more sophisticated flow tracking and provides filtering and interception facilities.
"""
from __future__ import absolute_import
+from abc import abstractmethod, ABCMeta
import hashlib
import Cookie
import cookielib
@@ -338,80 +339,216 @@ class StickyAuthState:
f.request.headers["authorization"] = self.hosts[host]
+class FlowList(object):
+ __metaclass__ = ABCMeta
+
+ def __iter__(self):
+ return iter(self._list)
+
+ def __contains__(self, item):
+ return item in self._list
+
+ def __getitem__(self, item):
+ return self._list[item]
+
+ def __nonzero__(self):
+ return bool(self._list)
+
+ def __len__(self):
+ return len(self._list)
+
+ def index(self, f):
+ return self._list.index(f)
+
+ @abstractmethod
+ def _add(self, f):
+ return
+
+ @abstractmethod
+ def _update(self, f):
+ return
+
+ @abstractmethod
+ def _remove(self, f):
+ return
+
+
+class FlowView(FlowList):
+ def __init__(self, store, filt=None):
+ self._list = []
+ if not filt:
+ filt = lambda flow: True
+ self._build(store, filt)
+
+ self.store = store
+ self.store.views.append(self)
+
+ def _close(self):
+ self.store.views.remove(self)
+
+ def _build(self, flows, filt=None):
+ if filt:
+ self.filt = filt
+ self._list = list(filter(self.filt, flows))
+
+ def _add(self, f):
+ if self.filt(f):
+ self._list.append(f)
+
+ def _update(self, f):
+ if f not in self._list:
+ self._add(f)
+ elif not self.filt(f):
+ self._remove(f)
+
+ def _remove(self, f):
+ if f in self._list:
+ self._list.remove(f)
+
+ def _recalculate(self, flows):
+ self._build(flows)
+
+
+class FlowStore(FlowList):
+ """
+ Responsible for handling flows in the state:
+ Keeps a list of all flows and provides views on them.
+ """
+ def __init__(self):
+ self._list = []
+ self._set = set() # Used for O(1) lookups
+ self.views = []
+ self._recalculate_views()
+
+ def __contains__(self, f):
+ return f in self._set
+
+ def _add(self, f):
+ """
+ Adds a flow to the state.
+ The flow to add must not be present in the state.
+ """
+ self._list.append(f)
+ self._set.add(f)
+ for view in self.views:
+ view._add(f)
+
+ def _update(self, f):
+ """
+ Notifies the state that a flow has been updated.
+ The flow must be present in the state.
+ """
+ for view in self.views:
+ view._update(f)
+
+ def _remove(self, f):
+ """
+ Deletes a flow from the state.
+ The flow must be present in the state.
+ """
+ self._list.remove(f)
+ self._set.remove(f)
+ for view in self.views:
+ view._remove(f)
+
+ # Expensive bulk operations
+
+ def _extend(self, flows):
+ """
+ Adds a list of flows to the state.
+ The list of flows to add must not contain flows that are already in the state.
+ """
+ self._list.extend(flows)
+ self._set.update(flows)
+ self._recalculate_views()
+
+ def _clear(self):
+ self._list = []
+ self._set = set()
+ self._recalculate_views()
+
+ def _recalculate_views(self):
+ """
+ Expensive operation: Recalculate all the views after a bulk change.
+ """
+ for view in self.views:
+ view._recalculate(self)
+
+ # Utility functions.
+ # There are some common cases where we need to argue about all flows
+ # irrespective of filters on the view etc (i.e. on shutdown).
+
+ def active_count(self):
+ c = 0
+ for i in self._list:
+ if not i.response and not i.error:
+ c += 1
+ return c
+
+ # TODO: Should accept_all operate on views or on all flows?
+ def accept_all(self):
+ for f in self._list:
+ f.accept_intercept()
+
+ def kill_all(self, master):
+ for f in self._list:
+ f.kill(master)
+
+
class State(object):
def __init__(self):
- self._flow_list = []
- self.view = []
+ self.flows = FlowStore()
+ self.view = FlowView(self.flows, None)
# These are compiled filt expressions:
- self._limit = None
self.intercept = None
@property
def limit_txt(self):
- if self._limit:
- return self._limit.pattern
- else:
- return None
+ return getattr(self.view.filt, "pattern", None)
def flow_count(self):
- return len(self._flow_list)
+ return len(self.flows)
+ # TODO: All functions regarding flows that don't cause side-effects should be moved into FlowStore.
def index(self, f):
- return self._flow_list.index(f)
+ return self.flows.index(f)
def active_flow_count(self):
- c = 0
- for i in self._flow_list:
- if not i.response and not i.error:
- c += 1
- return c
+ return self.flows.active_count()
- def add_request(self, flow):
+ def add_flow(self, f):
"""
- Add a request to the state. Returns the matching flow.
+ Add a request to the state.
"""
- if flow in self._flow_list: # catch flow replay
- return flow
- self._flow_list.append(flow)
- if flow.match(self._limit):
- self.view.append(flow)
- return flow
-
- def add_response(self, f):
- """
- Add a response to the state. Returns the matching flow.
- """
- if not f:
- return False
- if f.match(self._limit) and not f in self.view:
- self.view.append(f)
+ self.flows._add(f)
return f
- def add_error(self, f):
+ def update_flow(self, f):
"""
- Add an error response to the state. Returns the matching flow, or
- None if there isn't one.
+ Add a response to the state.
"""
- if not f:
- return None
- if f.match(self._limit) and not f in self.view:
- self.view.append(f)
+ self.flows._update(f)
return f
+ def delete_flow(self, f):
+ self.flows._remove(f)
+
def load_flows(self, flows):
- self._flow_list.extend(flows)
- self.recalculate_view()
+ self.flows._extend(flows)
def set_limit(self, txt):
+ if txt == self.limit_txt:
+ return
if txt:
f = filt.parse(txt)
if not f:
return "Invalid filter expression."
- self._limit = f
+ self.view._close()
+ self.view = FlowView(self.flows, f)
else:
- self._limit = None
- self.recalculate_view()
+ self.view._close()
+ self.view = FlowView(self.flows, None)
def set_intercept(self, txt):
if txt:
@@ -419,37 +556,24 @@ class State(object):
if not f:
return "Invalid filter expression."
self.intercept = f
- self.intercept_txt = txt
else:
self.intercept = None
- self.intercept_txt = None
-
- def recalculate_view(self):
- if self._limit:
- self.view = [i for i in self._flow_list if i.match(self._limit)]
- else:
- self.view = self._flow_list[:]
- def delete_flow(self, f):
- self._flow_list.remove(f)
- if f in self.view:
- self.view.remove(f)
- return True
+ @property
+ def intercept_txt(self):
+ return getattr(self.intercept, "pattern", None)
def clear(self):
- for i in self._flow_list[:]:
- self.delete_flow(i)
+ self.flows._clear()
def accept_all(self):
- for i in self._flow_list[:]:
- i.accept_intercept()
+ self.flows.accept_all()
def revert(self, f):
f.revert()
def killall(self, master):
- for i in self._flow_list:
- i.kill(master)
+ self.flows.kill_all(master)
class FlowMaster(controller.Master):
@@ -716,7 +840,7 @@ class FlowMaster(controller.Master):
sc.reply()
def handle_error(self, f):
- self.state.add_error(f)
+ self.state.update_flow(f)
self.run_script_hook("error", f)
if self.client_playback:
self.client_playback.clear(f)
@@ -736,7 +860,8 @@ class FlowMaster(controller.Master):
self.add_event("Error in wsgi app. %s"%err, "error")
f.reply(protocol.KILL)
return
- self.state.add_request(f)
+ if f not in self.state.flows: # don't add again on replay
+ self.state.add_flow(f)
self.replacehooks.run(f)
self.setheaders.run(f)
self.run_script_hook("request", f)
@@ -757,7 +882,7 @@ class FlowMaster(controller.Master):
return f
def handle_response(self, f):
- self.state.add_response(f)
+ self.state.update_flow(f)
self.replacehooks.run(f)
self.setheaders.run(f)
self.run_script_hook("response", f)
@@ -772,7 +897,7 @@ class FlowMaster(controller.Master):
self.unload_scripts()
controller.Master.shutdown(self)
if self.stream:
- for i in self.state._flow_list:
+ for i in self.state.flows:
if not i.response:
self.stream.add(i)
self.stop_stream()