diff options
-rw-r--r-- | mitmproxy/addons/state.py | 272 | ||||
-rw-r--r-- | mitmproxy/tools/web/app.py | 35 | ||||
-rw-r--r-- | mitmproxy/tools/web/master.py | 92 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_state.py | 19 | ||||
-rw-r--r-- | test/mitmproxy/test_flow.py | 152 | ||||
-rw-r--r-- | test/mitmproxy/test_web_master.py | 2 |
6 files changed, 62 insertions, 510 deletions
diff --git a/mitmproxy/addons/state.py b/mitmproxy/addons/state.py deleted file mode 100644 index b6f575d1..00000000 --- a/mitmproxy/addons/state.py +++ /dev/null @@ -1,272 +0,0 @@ -from abc import abstractmethod, ABCMeta - -from typing import List # noqa - -from mitmproxy import flowfilter -from mitmproxy import flow # noqa - - -class FlowList(metaclass=ABCMeta): - def __init__(self): - self._list = [] # type: List[flow.Flow] - - def __iter__(self): - return iter(self._list) - - def __contains__(self, item): - return item in self._list - - def __getitem__(self, item): - return self._list[item] - - def __bool__(self): - return bool(self._list) - - def __len__(self): - return len(self._list) - - def index(self, f): - return self._list.index(f) - - @abstractmethod - def _add(self, f): - return - - @abstractmethod - def _update(self, f): - return - - @abstractmethod - def _remove(self, f): - return - - -def _pos(*args): - return True - - -class FlowView(FlowList): - def __init__(self, store, flt=None): - super().__init__() - if not flt: - flt = _pos - self._build(store, flt) - - self.store = store - self.store.views.append(self) - - def _close(self): - self.store.views.remove(self) - - def _build(self, flows, flt=None): - if flt: - self.filter = flt - self._list = list(filter(self.filter, flows)) - - def _add(self, f): - if self.filter(f): - self._list.append(f) - - def _update(self, f): - if f not in self._list: - self._add(f) - elif not self.filter(f): - self._remove(f) - - def _remove(self, f): - if f in self._list: - self._list.remove(f) - - def _recalculate(self, flows): - self._build(flows) - - -class FlowStore(FlowList): - """ - Responsible for handling flows in the state: - Keeps a list of all flows and provides views on them. - """ - - def __init__(self): - super().__init__() - self._set = set() # Used for O(1) lookups - self.views = [] - self._recalculate_views() - - def get(self, flow_id): - for f in self._list: - if f.id == flow_id: - return f - - def __contains__(self, f): - return f in self._set - - def _add(self, f): - """ - Adds a flow to the state. - The flow to add must not be present in the state. - """ - self._list.append(f) - self._set.add(f) - for view in self.views: - view._add(f) - - def _update(self, f): - """ - Notifies the state that a flow has been updated. - The flow must be present in the state. - """ - if f in self: - for view in self.views: - view._update(f) - - def _remove(self, f): - """ - Deletes a flow from the state. - The flow must be present in the state. - """ - self._list.remove(f) - self._set.remove(f) - for view in self.views: - view._remove(f) - - # Expensive bulk operations - - def _extend(self, flows): - """ - Adds a list of flows to the state. - The list of flows to add must not contain flows that are already in the state. - """ - self._list.extend(flows) - self._set.update(flows) - self._recalculate_views() - - def _clear(self): - self._list = [] - self._set = set() - self._recalculate_views() - - def _recalculate_views(self): - """ - Expensive operation: Recalculate all the views after a bulk change. - """ - for view in self.views: - view._recalculate(self) - - # Utility functions. - # There are some common cases where we need to argue about all flows - # irrespective of filters on the view etc (i.e. on shutdown). - - def active_count(self): - c = 0 - for i in self._list: - if not i.response and not i.error: - c += 1 - return c - - # TODO: Should accept_all operate on views or on all flows? - def accept_all(self, master): - for f in self._list: - f.resume(master) - - def kill_all(self, master): - for f in self._list: - if f.killable: - f.kill(master) - - -class State: - def __init__(self): - self.flows = FlowStore() - self.view = FlowView(self.flows, None) - - @property - def filter_txt(self): - return getattr(self.view.filter, "pattern", None) - - def flow_count(self): - return len(self.flows) - - # TODO: All functions regarding flows that don't cause side-effects should - # be moved into FlowStore. - def index(self, f): - return self.flows.index(f) - - def active_flow_count(self): - return self.flows.active_count() - - def add_flow(self, f): - """ - Add a request to the state. - """ - self.flows._add(f) - return f - - def update_flow(self, f): - """ - Add a response to the state. - """ - self.flows._update(f) - return f - - def delete_flow(self, f): - self.flows._remove(f) - - def load_flows(self, flows): - self.flows._extend(flows) - - def set_view_filter(self, txt): - if txt == self.filter_txt: - return - if txt: - flt = flowfilter.parse(txt) - if not flt: - return "Invalid filter expression." - self.view._close() - self.view = FlowView(self.flows, flt) - else: - self.view._close() - self.view = FlowView(self.flows, None) - - def clear(self): - self.flows._clear() - - def accept_all(self, master): - self.flows.accept_all(master) - - def backup(self, f): - f.backup() - self.update_flow(f) - - def revert(self, f): - f.revert() - self.update_flow(f) - - def killall(self, master): - self.flows.kill_all(master) - - def duplicate_flow(self, f): - """ - Duplicate flow, and insert it into state without triggering any of - the normal flow events. - """ - f2 = f.copy() - self.add_flow(f2) - return f2 - - # Event handlers - def intercept(self, f): - self.update_flow(f) - - def resume(self, f): - self.update_flow(f) - - def error(self, f): - self.update_flow(f) - - def request(self, f): - if f not in self.flows: # don't add again on replay - self.add_flow(f) - - def response(self, f): - self.update_flow(f) diff --git a/mitmproxy/tools/web/app.py b/mitmproxy/tools/web/app.py index 41701d36..bbdc2bf9 100644 --- a/mitmproxy/tools/web/app.py +++ b/mitmproxy/tools/web/app.py @@ -116,8 +116,8 @@ class RequestHandler(BasicAuth, tornado.web.RequestHandler): return json.loads(self.request.body.decode()) @property - def state(self): - return self.application.master.state + def view(self): + return self.application.master.view @property def master(self) -> "mitmproxy.tools.web.master.WebMaster": @@ -126,7 +126,8 @@ class RequestHandler(BasicAuth, tornado.web.RequestHandler): @property def flow(self): flow_id = str(self.path_kwargs["flow_id"]) - flow = self.state.flows.get(flow_id) + # FIXME: Add a facility to addon.view to safely access the store + flow = self.view._store.get(flow_id) if flow: return flow else: @@ -184,7 +185,7 @@ class Flows(RequestHandler): def get(self): self.write(dict( - data=[convert_flow_to_json_dict(f) for f in self.state.flows] + data=[convert_flow_to_json_dict(f) for f in self.view] )) @@ -195,31 +196,31 @@ class DumpFlows(RequestHandler): bio = BytesIO() fw = io.FlowWriter(bio) - for f in self.state.flows: + for f in self.view: fw.add(f) self.write(bio.getvalue()) bio.close() def post(self): - self.state.clear() + self.view.clear() content = self.request.files.values()[0][0].body bio = BytesIO(content) - self.state.load_flows(io.FlowReader(bio).stream()) + self.view.load_flows(io.FlowReader(bio).stream()) bio.close() class ClearAll(RequestHandler): def post(self): - self.state.clear() + self.view.clear() class AcceptFlows(RequestHandler): def post(self): - self.state.flows.accept_all(self.master) + self.view.flows.accept_all(self.master) class AcceptFlow(RequestHandler): @@ -233,7 +234,7 @@ class FlowHandler(RequestHandler): def delete(self, flow_id): if self.flow.killable: self.flow.kill(self.master) - self.state.delete_flow(self.flow) + self.view.delete_flow(self.flow) def put(self, flow_id): flow = self.flow @@ -270,19 +271,19 @@ class FlowHandler(RequestHandler): print("Warning: Unknown update {}.{}: {}".format(a, k, v)) else: print("Warning: Unknown update {}: {}".format(a, b)) - self.state.update_flow(flow) + self.view.update_flow(flow) class DuplicateFlow(RequestHandler): def post(self, flow_id): - self.master.state.duplicate_flow(self.flow) + self.master.view.duplicate_flow(self.flow) class RevertFlow(RequestHandler): def post(self, flow_id): - self.state.revert(self.flow) + self.flow.revert() class ReplayFlow(RequestHandler): @@ -290,7 +291,7 @@ class ReplayFlow(RequestHandler): def post(self, flow_id): self.flow.backup() self.flow.response = None - self.state.update_flow(self.flow) + self.view.update_flow(self.flow) r = self.master.replay_request(self.flow) if r: @@ -303,7 +304,7 @@ class FlowContent(RequestHandler): self.flow.backup() message = getattr(self.flow, message) message.content = self.request.files.values()[0][0].body - self.state.update_flow(self.flow) + self.view.update_flow(self.flow) def get(self, flow_id, message): message = getattr(self.flow, message) @@ -340,7 +341,7 @@ class FlowContentView(RequestHandler): message = getattr(self.flow, message) description, lines, error = contentviews.get_message_content_view( - contentviews.get(content_view.replace('_', ' ')), message + contentviews.get(content_view.replace('_', ' ')).name, message ) # if error: # add event log @@ -355,7 +356,7 @@ class Events(RequestHandler): def get(self): self.write(dict( - data=list(self.state.events) + data=list([]) )) diff --git a/mitmproxy/tools/web/master.py b/mitmproxy/tools/web/master.py index cf0bfd73..0922733d 100644 --- a/mitmproxy/tools/web/master.py +++ b/mitmproxy/tools/web/master.py @@ -8,7 +8,7 @@ from typing import Optional from mitmproxy import addons from mitmproxy import exceptions -from mitmproxy.addons import state +from mitmproxy.addons import view from mitmproxy.addons import intercept from mitmproxy import options from mitmproxy import master @@ -20,53 +20,7 @@ class Stop(Exception): pass -class WebFlowView(state.FlowView): - - def __init__(self, store): - super().__init__(store, None) - - def _add(self, f): - super()._add(f) - app.ClientConnection.broadcast( - type="UPDATE_FLOWS", - cmd="add", - data=app.convert_flow_to_json_dict(f) - ) - - def _update(self, f): - super()._update(f) - app.ClientConnection.broadcast( - type="UPDATE_FLOWS", - cmd="update", - data=app.convert_flow_to_json_dict(f) - ) - - def _remove(self, f): - super()._remove(f) - app.ClientConnection.broadcast( - type="UPDATE_FLOWS", - cmd="remove", - data=dict(id=f.id) - ) - - def _recalculate(self, flows): - super()._recalculate(flows) - app.ClientConnection.broadcast( - type="UPDATE_FLOWS", - cmd="reset" - ) - - -class WebState(state.State): - - def __init__(self): - super().__init__() - self.view._close() - self.view = WebFlowView(self.flows) - - self._last_event_id = 0 - self.events = collections.deque(maxlen=1000) - +class _WebState(): def add_log(self, e, level): self._last_event_id += 1 entry = { @@ -136,9 +90,14 @@ class WebMaster(master.Master): def __init__(self, options, server): super().__init__(options, server) - self.state = WebState() + self.view = view.View() + self.view.sig_add.connect(self._sig_add) + self.view.sig_remove.connect(self._sig_remove) + self.view.sig_update.connect(self._sig_update) + self.view.sig_refresh.connect(self._sig_refresh) + self.addons.add(*addons.default_addons()) - self.addons.add(self.state, intercept.Intercept()) + self.addons.add(self.view, intercept.Intercept()) self.app = app.Application( self, self.options.wdebug, self.options.wauthenticator ) @@ -162,6 +121,33 @@ class WebMaster(master.Master): print("Stream file error: {}".format(err), file=sys.stderr) sys.exit(1) + def _sig_add(self, view, flow): + app.ClientConnection.broadcast( + type="UPDATE_FLOWS", + cmd="add", + data=app.convert_flow_to_json_dict(flow) + ) + + def _sig_update(self, view, flow): + app.ClientConnection.broadcast( + type="UPDATE_FLOWS", + cmd="update", + data=app.convert_flow_to_json_dict(flow) + ) + + def _sig_remove(self, view, flow): + app.ClientConnection.broadcast( + type="UPDATE_FLOWS", + cmd="remove", + data=dict(id=flow.id) + ) + + def _sig_refresh(self, view): + app.ClientConnection.broadcast( + type="UPDATE_FLOWS", + cmd="reset" + ) + def run(self): # pragma: no cover iol = tornado.ioloop.IOLoop.instance() @@ -178,6 +164,6 @@ class WebMaster(master.Master): except (Stop, KeyboardInterrupt): self.shutdown() - def add_log(self, e, level="info"): - super().add_log(e, level) - return self.state.add_log(e, level) + # def add_log(self, e, level="info"): + # super().add_log(e, level) + # return self.state.add_log(e, level) diff --git a/test/mitmproxy/addons/test_state.py b/test/mitmproxy/addons/test_state.py deleted file mode 100644 index 16e79e0d..00000000 --- a/test/mitmproxy/addons/test_state.py +++ /dev/null @@ -1,19 +0,0 @@ -from mitmproxy.test import tflow -from mitmproxy import proxy -from mitmproxy import master -from mitmproxy.addons import state - - -class TestState: - def test_duplicate_flow(self): - s = state.State() - fm = master.Master(None, proxy.DummyServer()) - fm.addons.add(s) - f = tflow.tflow(resp=True) - fm.load_flow(f) - assert s.flow_count() == 1 - - f2 = s.duplicate_flow(f) - assert f2.response - assert s.flow_count() == 2 - assert s.index(f2) == 1 diff --git a/test/mitmproxy/test_flow.py b/test/mitmproxy/test_flow.py index 7a9d0901..295e2bea 100644 --- a/test/mitmproxy/test_flow.py +++ b/test/mitmproxy/test_flow.py @@ -6,7 +6,6 @@ import mitmproxy.test.tutils from mitmproxy.net.http import Headers import mitmproxy.io from mitmproxy import flowfilter, options -from mitmproxy.addons import state from mitmproxy.contrib import tnetstring from mitmproxy.exceptions import FlowReadException, Kill from mitmproxy import flow @@ -15,7 +14,7 @@ from mitmproxy import connections from mitmproxy.proxy import ProxyConfig from mitmproxy.proxy.server import DummyServer from mitmproxy import master -from . import tutils +from . import tutils, tservers class TestHTTPFlow: @@ -107,20 +106,6 @@ class TestHTTPFlow: assert not f.killable assert f.reply.value == Kill - def test_killall(self): - srv = DummyServer(None) - s = state.State() - fm = master.Master(None, srv) - fm.addons.add(s) - - f = tflow.tflow() - f.reply.handle() - f.intercept(fm) - - s.killall(fm) - for i in s.view: - assert "killed" in str(i.error) - def test_resume(self): f = tflow.tflow() f.reply.handle() @@ -186,135 +171,6 @@ class TestTCPFlow: tutils.raises(ValueError, flowfilter.match, "~", f) -class TestState: - - def test_backup(self): - c = state.State() - f = tflow.tflow() - c.add_flow(f) - f.backup() - c.revert(f) - - def test_flow(self): - """ - normal flow: - - connect -> request -> response - """ - c = state.State() - f = tflow.tflow() - c.add_flow(f) - assert f - assert c.flow_count() == 1 - assert c.active_flow_count() == 1 - - newf = tflow.tflow() - assert c.add_flow(newf) - assert c.active_flow_count() == 2 - - f.response = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp()) - assert c.update_flow(f) - assert c.flow_count() == 2 - assert c.active_flow_count() == 1 - - assert not c.update_flow(None) - assert c.active_flow_count() == 1 - - newf.response = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp()) - assert c.update_flow(newf) - assert c.active_flow_count() == 0 - - def test_err(self): - c = state.State() - f = tflow.tflow() - c.add_flow(f) - f.error = flow.Error("message") - assert c.update_flow(f) - - c = state.State() - f = tflow.tflow() - c.add_flow(f) - c.set_view_filter("~e") - assert not c.view - f.error = tflow.terr() - assert c.update_flow(f) - assert c.view - - def test_set_view_filter(self): - c = state.State() - - f = tflow.tflow() - assert len(c.view) == 0 - - c.add_flow(f) - assert len(c.view) == 1 - - c.set_view_filter("~s") - assert c.filter_txt == "~s" - assert len(c.view) == 0 - f.response = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp()) - c.update_flow(f) - assert len(c.view) == 1 - c.set_view_filter(None) - assert len(c.view) == 1 - - f = tflow.tflow() - c.add_flow(f) - assert len(c.view) == 2 - c.set_view_filter("~q") - assert len(c.view) == 1 - c.set_view_filter("~s") - assert len(c.view) == 1 - - assert "Invalid" in c.set_view_filter("~") - - def _add_request(self, state): - f = tflow.tflow() - state.add_flow(f) - return f - - def _add_response(self, state): - f = tflow.tflow() - state.add_flow(f) - f.response = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp()) - state.update_flow(f) - - def _add_error(self, state): - f = tflow.tflow(err=True) - state.add_flow(f) - - def test_clear(self): - c = state.State() - f = self._add_request(c) - f.intercepted = True - - c.clear() - assert c.flow_count() == 0 - - def test_dump_flows(self): - c = state.State() - self._add_request(c) - self._add_response(c) - self._add_request(c) - self._add_response(c) - self._add_request(c) - self._add_response(c) - self._add_error(c) - - flows = c.view[:] - c.clear() - - c.load_flows(flows) - assert isinstance(c.flows[0], flow.Flow) - - def test_accept_all(self): - c = state.State() - self._add_request(c) - self._add_response(c) - self._add_request(c) - c.accept_all(mock.Mock()) - - class TestSerialize: def _treader(self): @@ -354,7 +210,7 @@ class TestSerialize: def test_load_flows(self): r = self._treader() - s = state.State() + s = tservers.TestState() fm = master.Master(None, DummyServer()) fm.addons.add(s) fm.load_flows(r) @@ -362,7 +218,7 @@ class TestSerialize: def test_load_flows_reverse(self): r = self._treader() - s = state.State() + s = tservers.TestState() opts = options.Options( mode="reverse", upstream_server="https://use-this-domain" @@ -431,7 +287,7 @@ class TestFlowMaster: assert fm.create_request("GET", "http", "example.com", 80, "/") def test_all(self): - s = state.State() + s = tservers.TestState() fm = master.Master(None, DummyServer()) fm.addons.add(s) f = tflow.tflow(req=None) diff --git a/test/mitmproxy/test_web_master.py b/test/mitmproxy/test_web_master.py index 77280644..298b14eb 100644 --- a/test/mitmproxy/test_web_master.py +++ b/test/mitmproxy/test_web_master.py @@ -12,4 +12,4 @@ class TestWebMaster(mastertest.MasterTest): m = self.mkmaster() for i in (1, 2, 3): self.dummy_cycle(m, 1, b"") - assert len(m.state.flows) == i + assert len(m.view) == i |