aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libmproxy/console/__init__.py8
-rw-r--r--libmproxy/flow.py257
-rw-r--r--libmproxy/protocol/http.py5
-rw-r--r--libmproxy/web/__init__.py38
-rw-r--r--libmproxy/web/app.py44
-rw-r--r--libmproxy/web/static/js/app.js294
-rw-r--r--test/test_console.py8
-rw-r--r--test/test_flow.py40
-rw-r--r--test/test_server.py20
-rw-r--r--web/src/js/app.js8
-rw-r--r--web/src/js/components/mainview.jsx.js12
-rw-r--r--web/src/js/connection.js51
-rw-r--r--web/src/js/stores/flowstore.js221
13 files changed, 679 insertions, 327 deletions
diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py
index e6bc9b41..38a16751 100644
--- a/libmproxy/console/__init__.py
+++ b/libmproxy/console/__init__.py
@@ -277,16 +277,16 @@ class ConsoleState(flow.State):
d = self.flowsettings.get(flow, {})
return d.get(key, default)
- def add_request(self, f):
- flow.State.add_request(self, f)
+ def add_flow(self, f):
+ super(ConsoleState, self).add_flow(f)
if self.focus is None:
self.set_focus(0)
elif self.follow_focus:
self.set_focus(len(self.view) - 1)
return f
- def add_response(self, resp):
- f = flow.State.add_response(self, resp)
+ def update_flow(self, f):
+ super(ConsoleState, self).update_flow(f)
if self.focus is None:
self.set_focus(0)
return f
diff --git a/libmproxy/flow.py b/libmproxy/flow.py
index a6bf17d8..d3ae383e 100644
--- a/libmproxy/flow.py
+++ b/libmproxy/flow.py
@@ -2,6 +2,7 @@
This module provides more sophisticated flow tracking and provides filtering and interception facilities.
"""
from __future__ import absolute_import
+from abc import abstractmethod, ABCMeta
import hashlib
import Cookie
import cookielib
@@ -338,80 +339,216 @@ class StickyAuthState:
f.request.headers["authorization"] = self.hosts[host]
+class FlowList(object):
+ __metaclass__ = ABCMeta
+
+ def __iter__(self):
+ return iter(self._list)
+
+ def __contains__(self, item):
+ return item in self._list
+
+ def __getitem__(self, item):
+ return self._list[item]
+
+ def __nonzero__(self):
+ return bool(self._list)
+
+ def __len__(self):
+ return len(self._list)
+
+ def index(self, f):
+ return self._list.index(f)
+
+ @abstractmethod
+ def _add(self, f):
+ return
+
+ @abstractmethod
+ def _update(self, f):
+ return
+
+ @abstractmethod
+ def _remove(self, f):
+ return
+
+
+class FlowView(FlowList):
+ def __init__(self, store, filt=None):
+ self._list = []
+ if not filt:
+ filt = lambda flow: True
+ self._build(store, filt)
+
+ self.store = store
+ self.store.views.append(self)
+
+ def _close(self):
+ self.store.views.remove(self)
+
+ def _build(self, flows, filt=None):
+ if filt:
+ self.filt = filt
+ self._list = list(filter(self.filt, flows))
+
+ def _add(self, f):
+ if self.filt(f):
+ self._list.append(f)
+
+ def _update(self, f):
+ if f not in self._list:
+ self._add(f)
+ elif not self.filt(f):
+ self._remove(f)
+
+ def _remove(self, f):
+ if f in self._list:
+ self._list.remove(f)
+
+ def _recalculate(self, flows):
+ self._build(flows)
+
+
+class FlowStore(FlowList):
+ """
+ Responsible for handling flows in the state:
+ Keeps a list of all flows and provides views on them.
+ """
+ def __init__(self):
+ self._list = []
+ self._set = set() # Used for O(1) lookups
+ self.views = []
+ self._recalculate_views()
+
+ def __contains__(self, f):
+ return f in self._set
+
+ def _add(self, f):
+ """
+ Adds a flow to the state.
+ The flow to add must not be present in the state.
+ """
+ self._list.append(f)
+ self._set.add(f)
+ for view in self.views:
+ view._add(f)
+
+ def _update(self, f):
+ """
+ Notifies the state that a flow has been updated.
+ The flow must be present in the state.
+ """
+ for view in self.views:
+ view._update(f)
+
+ def _remove(self, f):
+ """
+ Deletes a flow from the state.
+ The flow must be present in the state.
+ """
+ self._list.remove(f)
+ self._set.remove(f)
+ for view in self.views:
+ view._remove(f)
+
+ # Expensive bulk operations
+
+ def _extend(self, flows):
+ """
+ Adds a list of flows to the state.
+ The list of flows to add must not contain flows that are already in the state.
+ """
+ self._list.extend(flows)
+ self._set.update(flows)
+ self._recalculate_views()
+
+ def _clear(self):
+ self._list = []
+ self._set = set()
+ self._recalculate_views()
+
+ def _recalculate_views(self):
+ """
+ Expensive operation: Recalculate all the views after a bulk change.
+ """
+ for view in self.views:
+ view._recalculate(self)
+
+ # Utility functions.
+ # There are some common cases where we need to argue about all flows
+ # irrespective of filters on the view etc (i.e. on shutdown).
+
+ def active_count(self):
+ c = 0
+ for i in self._list:
+ if not i.response and not i.error:
+ c += 1
+ return c
+
+ # TODO: Should accept_all operate on views or on all flows?
+ def accept_all(self):
+ for f in self._list:
+ f.accept_intercept()
+
+ def kill_all(self, master):
+ for f in self._list:
+ f.kill(master)
+
+
class State(object):
def __init__(self):
- self._flow_list = []
- self.view = []
+ self.flows = FlowStore()
+ self.view = FlowView(self.flows, None)
# These are compiled filt expressions:
- self._limit = None
self.intercept = None
@property
def limit_txt(self):
- if self._limit:
- return self._limit.pattern
- else:
- return None
+ return getattr(self.view.filt, "pattern", None)
def flow_count(self):
- return len(self._flow_list)
+ return len(self.flows)
+ # TODO: All functions regarding flows that don't cause side-effects should be moved into FlowStore.
def index(self, f):
- return self._flow_list.index(f)
+ return self.flows.index(f)
def active_flow_count(self):
- c = 0
- for i in self._flow_list:
- if not i.response and not i.error:
- c += 1
- return c
+ return self.flows.active_count()
- def add_request(self, flow):
+ def add_flow(self, f):
"""
- Add a request to the state. Returns the matching flow.
+ Add a request to the state.
"""
- if flow in self._flow_list: # catch flow replay
- return flow
- self._flow_list.append(flow)
- if flow.match(self._limit):
- self.view.append(flow)
- return flow
-
- def add_response(self, f):
- """
- Add a response to the state. Returns the matching flow.
- """
- if not f:
- return False
- if f.match(self._limit) and not f in self.view:
- self.view.append(f)
+ self.flows._add(f)
return f
- def add_error(self, f):
+ def update_flow(self, f):
"""
- Add an error response to the state. Returns the matching flow, or
- None if there isn't one.
+ Add a response to the state.
"""
- if not f:
- return None
- if f.match(self._limit) and not f in self.view:
- self.view.append(f)
+ self.flows._update(f)
return f
+ def delete_flow(self, f):
+ self.flows._remove(f)
+
def load_flows(self, flows):
- self._flow_list.extend(flows)
- self.recalculate_view()
+ self.flows._extend(flows)
def set_limit(self, txt):
+ if txt == self.limit_txt:
+ return
if txt:
f = filt.parse(txt)
if not f:
return "Invalid filter expression."
- self._limit = f
+ self.view._close()
+ self.view = FlowView(self.flows, f)
else:
- self._limit = None
- self.recalculate_view()
+ self.view._close()
+ self.view = FlowView(self.flows, None)
def set_intercept(self, txt):
if txt:
@@ -419,37 +556,24 @@ class State(object):
if not f:
return "Invalid filter expression."
self.intercept = f
- self.intercept_txt = txt
else:
self.intercept = None
- self.intercept_txt = None
-
- def recalculate_view(self):
- if self._limit:
- self.view = [i for i in self._flow_list if i.match(self._limit)]
- else:
- self.view = self._flow_list[:]
- def delete_flow(self, f):
- self._flow_list.remove(f)
- if f in self.view:
- self.view.remove(f)
- return True
+ @property
+ def intercept_txt(self):
+ return getattr(self.intercept, "pattern", None)
def clear(self):
- for i in self._flow_list[:]:
- self.delete_flow(i)
+ self.flows._clear()
def accept_all(self):
- for i in self._flow_list[:]:
- i.accept_intercept()
+ self.flows.accept_all()
def revert(self, f):
f.revert()
def killall(self, master):
- for i in self._flow_list:
- i.kill(master)
+ self.flows.kill_all(master)
class FlowMaster(controller.Master):
@@ -716,7 +840,7 @@ class FlowMaster(controller.Master):
sc.reply()
def handle_error(self, f):
- self.state.add_error(f)
+ self.state.update_flow(f)
self.run_script_hook("error", f)
if self.client_playback:
self.client_playback.clear(f)
@@ -736,7 +860,8 @@ class FlowMaster(controller.Master):
self.add_event("Error in wsgi app. %s"%err, "error")
f.reply(protocol.KILL)
return
- self.state.add_request(f)
+ if f not in self.state.flows: # don't add again on replay
+ self.state.add_flow(f)
self.replacehooks.run(f)
self.setheaders.run(f)
self.run_script_hook("request", f)
@@ -757,7 +882,7 @@ class FlowMaster(controller.Master):
return f
def handle_response(self, f):
- self.state.add_response(f)
+ self.state.update_flow(f)
self.replacehooks.run(f)
self.setheaders.run(f)
self.run_script_hook("response", f)
@@ -772,7 +897,7 @@ class FlowMaster(controller.Master):
self.unload_scripts()
controller.Master.shutdown(self)
if self.stream:
- for i in self.state._flow_list:
+ for i in self.state.flows:
if not i.response:
self.stream.add(i)
self.stop_stream()
diff --git a/libmproxy/protocol/http.py b/libmproxy/protocol/http.py
index 26a94040..b32a55ed 100644
--- a/libmproxy/protocol/http.py
+++ b/libmproxy/protocol/http.py
@@ -117,7 +117,10 @@ class HTTPMessage(stateobject.StateObject):
def get_state(self, short=False):
ret = super(HTTPMessage, self).get_state(short)
if short:
- ret["contentLength"] = len(self.content)
+ if self.content:
+ ret["contentLength"] = len(self.content)
+ else:
+ ret["contentLength"] = 0
return ret
def get_decoded_content(self):
diff --git a/libmproxy/web/__init__.py b/libmproxy/web/__init__.py
index 69971436..f762466a 100644
--- a/libmproxy/web/__init__.py
+++ b/libmproxy/web/__init__.py
@@ -9,9 +9,32 @@ class Stop(Exception):
pass
+class WebFlowView(flow.FlowView):
+ def __init__(self, store):
+ super(WebFlowView, self).__init__(store, None)
+
+ def _add(self, f):
+ super(WebFlowView, self)._add(f)
+ app.FlowUpdates.broadcast("add", f.get_state(short=True))
+
+ def _update(self, f):
+ super(WebFlowView, self)._update(f)
+ app.FlowUpdates.broadcast("update", f.get_state(short=True))
+
+ def _remove(self, f):
+ super(WebFlowView, self)._remove(f)
+ app.FlowUpdates.broadcast("remove", f.get_state(short=True))
+
+ def _recalculate(self, flows):
+ super(WebFlowView, self)._recalculate(flows)
+ app.FlowUpdates.broadcast("recalculate", None)
+
+
class WebState(flow.State):
def __init__(self):
- flow.State.__init__(self)
+ super(WebState, self).__init__()
+ self.view._close()
+ self.view = WebFlowView(self.flows)
class Options(object):
@@ -58,8 +81,8 @@ class Options(object):
class WebMaster(flow.FlowMaster):
def __init__(self, server, options):
self.options = options
- self.app = app.Application(self.options.wdebug)
super(WebMaster, self).__init__(server, WebState())
+ self.app = app.Application(self.state, self.options.wdebug)
self.last_log_id = 0
@@ -83,24 +106,17 @@ class WebMaster(flow.FlowMaster):
self.shutdown()
def handle_request(self, f):
- app.ClientConnection.broadcast("add_flow", f.get_state(True))
- flow.FlowMaster.handle_request(self, f)
+ super(WebMaster, self).handle_request(f)
if f:
f.reply()
return f
def handle_response(self, f):
- app.ClientConnection.broadcast("update_flow", f.get_state(True))
- flow.FlowMaster.handle_response(self, f)
+ super(WebMaster, self).handle_response(f)
if f:
f.reply()
return f
- def handle_error(self, f):
- app.ClientConnection.broadcast("update_flow", f.get_state(True))
- flow.FlowMaster.handle_error(self, f)
- return f
-
def handle_log(self, l):
self.last_log_id += 1
app.ClientConnection.broadcast(
diff --git a/libmproxy/web/app.py b/libmproxy/web/app.py
index e2765a6d..4fdff783 100644
--- a/libmproxy/web/app.py
+++ b/libmproxy/web/app.py
@@ -3,6 +3,7 @@ import tornado.web
import tornado.websocket
import logging
import json
+from .. import flow
class IndexHandler(tornado.web.RequestHandler):
@@ -10,36 +11,53 @@ class IndexHandler(tornado.web.RequestHandler):
self.render("index.html")
-class ClientConnection(tornado.websocket.WebSocketHandler):
- connections = set()
+class WebSocketEventBroadcaster(tornado.websocket.WebSocketHandler):
+ connections = None # raise an error if inherited class doesn't specify its own instance.
def open(self):
- ClientConnection.connections.add(self)
+ self.connections.add(self)
def on_close(self):
- ClientConnection.connections.remove(self)
+ self.connections.remove(self)
@classmethod
def broadcast(cls, type, data):
+ message = json.dumps(
+ {
+ "type": type,
+ "data": data
+ }
+ )
for conn in cls.connections:
try:
- conn.write_message(
- json.dumps(
- {
- "type": type,
- "data": data
- }
- )
- )
+ conn.write_message(message)
except:
logging.error("Error sending message", exc_info=True)
+class FlowsHandler(tornado.web.RequestHandler):
+ def get(self):
+ self.write(dict(
+ flows=[f.get_state(short=True) for f in self.application.state.flows]
+ ))
+
+
+class FlowUpdates(WebSocketEventBroadcaster):
+ connections = set()
+
+
+class ClientConnection(WebSocketEventBroadcaster):
+ connections = set()
+
+
class Application(tornado.web.Application):
- def __init__(self, debug):
+ def __init__(self, state, debug):
+ self.state = state
handlers = [
(r"/", IndexHandler),
(r"/updates", ClientConnection),
+ (r"/flows", FlowsHandler),
+ (r"/flows/updates", FlowUpdates),
]
settings = dict(
template_path=os.path.join(os.path.dirname(__file__), "templates"),
diff --git a/libmproxy/web/static/js/app.js b/libmproxy/web/static/js/app.js
index fe317d7f..ddbb14f4 100644
--- a/libmproxy/web/static/js/app.js
+++ b/libmproxy/web/static/js/app.js
@@ -335,132 +335,216 @@ _.extend(_EventLogStore.prototype, EventEmitter.prototype, {
var EventLogStore = new _EventLogStore();
AppDispatcher.register(EventLogStore.handle.bind(EventLogStore));
-function FlowView(store, live) {
- EventEmitter.call(this);
- this._store = store;
- this.live = live;
- this.flows = [];
-
- this.add = this.add.bind(this);
- this.update = this.update.bind(this);
-
- if (live) {
- this._store.addListener(ActionTypes.ADD_FLOW, this.add);
- this._store.addListener(ActionTypes.UPDATE_FLOW, this.update);
- }
+function FlowStore(endpoint) {
+ this._views = [];
+ this.reset();
}
-
-_.extend(FlowView.prototype, EventEmitter.prototype, {
- close: function () {
- this._store.removeListener(ActionTypes.ADD_FLOW, this.add);
- this._store.removeListener(ActionTypes.UPDATE_FLOW, this.update);
+_.extend(FlowStore.prototype, {
+ add: function (flow) {
+ this._pos_map[flow.id] = this._flow_list.length;
+ this._flow_list.push(flow);
+ for (var i = 0; i < this._views.length; i++) {
+ this._views[i].add(flow);
+ }
},
- getAll: function () {
- return this.flows;
+ update: function (flow) {
+ this._flow_list[this._pos_map[flow.id]] = flow;
+ for (var i = 0; i < this._views.length; i++) {
+ this._views[i].update(flow);
+ }
},
- add: function (flow) {
- return this.update(flow);
- },
- add_bulk: function (flows) {
- //Treat all previously received updates as newer than the bulk update.
- //If they weren't newer, we're about to receive an update for them very soon.
- var updates = this.flows;
- this.flows = flows;
- updates.forEach(function(flow){
- this._update(flow);
- }.bind(this));
- this.emit("change");
+ remove: function (flow_id) {
+ this._flow_list.splice(this._pos_map[flow_id], 1);
+ this._build_map();
+ for (var i = 0; i < this._views.length; i++) {
+ this._views[i].remove(flow_id);
+ }
},
- _update: function(flow){
- var idx = _.findIndex(this.flows, function(f){
- return flow.id === f.id;
- });
+ reset: function (flows) {
+ this._flow_list = flows || [];
+ this._build_map();
+ for (var i = 0; i < this._views.length; i++) {
+ this._views[i].recalculate(this._flow_list);
+ }
+ },
+ _build_map: function () {
+ this._pos_map = {};
+ for (var i = 0; i < this._flow_list.length; i++) {
+ var flow = this._flow_list[i];
+ this._pos_map[flow.id] = i;
+ }
+ },
+ open_view: function (filt, sort) {
+ var view = new FlowView(this._flow_list, filt, sort);
+ this._views.push(view);
+ return view;
+ },
+ close_view: function (view) {
+ this._views = _.without(this._views, view);
+ }
+});
+
- if(idx < 0){
- this.flows.push(flow);
- //if(this.flows.length > 100){
- // this.flows.shift();
- //}
+function LiveFlowStore(endpoint) {
+ FlowStore.call(this);
+ this.updates_before_init = []; // (empty array is true in js)
+ this.endpoint = endpoint || "/flows";
+ this.conn = new Connection(this.endpoint + "/updates");
+ this.conn.onopen = this._onopen.bind(this);
+ this.conn.onmessage = function (e) {
+ var message = JSON.parse(e.data);
+ this.handle_update(message.type, message.data);
+ }.bind(this);
+}
+_.extend(LiveFlowStore.prototype, FlowStore.prototype, {
+ handle_update: function (type, data) {
+ console.log("LiveFlowStore.handle_update", type, data);
+ if (this.updates_before_init) {
+ console.log("defer update", type, data);
+ this.updates_before_init.push(arguments);
} else {
- this.flows[idx] = flow;
+ this[type](data);
}
},
- update: function(flow){
- this._update(flow);
- this.emit("change");
+ handle_fetch: function (data) {
+ console.log("Flows fetched.");
+ this.reset(data.flows);
+ var updates = this.updates_before_init;
+ this.updates_before_init = false;
+ for (var i = 0; i < updates.length; i++) {
+ this.handle_update.apply(this, updates[i]);
+ }
+ },
+ _onopen: function () {
+ //Update stream openend, fetch list of flows.
+ console.log("Update Connection opened, fetching flows...");
+ $.getJSON(this.endpoint, this.handle_fetch.bind(this));
},
});
+function SortByInsertionOrder() {
+ this.i = 0;
+ this.map = {};
+ this.key = this.key.bind(this);
+}
+SortByInsertionOrder.prototype.key = function (flow) {
+ if (!(flow.id in this.map)) {
+ this.i++;
+ this.map[flow.id] = this.i;
+ }
+ return this.map[flow.id];
+};
-function _FlowStore() {
+var default_sort = (new SortByInsertionOrder()).key;
+
+function FlowView(flows, filt, sort) {
EventEmitter.call(this);
+ filt = filt || function (flow) {
+ return true;
+ };
+ sort = sort || default_sort;
+ this.recalculate(flows, filt, sort);
}
-_.extend(_FlowStore.prototype, EventEmitter.prototype, {
- getView: function (since) {
- var view = new FlowView(this, !since);
-
- $.getJSON("/static/flows.json", function(flows){
- flows = flows.concat(_.cloneDeep(flows)).concat(_.cloneDeep(flows));
- var id = 1;
- flows.forEach(function(flow){
- flow.id = "uuid-" + id++;
- });
- view.add_bulk(flows);
- });
+_.extend(FlowView.prototype, EventEmitter.prototype, {
+ recalculate: function (flows, filt, sort) {
+ if (filt) {
+ this.filt = filt;
+ }
+ if (sort) {
+ this.sort = sort;
+ }
+ this.flows = flows.filter(this.filt);
+ this.flows.sort(function (a, b) {
+ return this.sort(a) - this.sort(b);
+ }.bind(this));
+ this.emit("recalculate");
+ },
+ add: function (flow) {
+ if (this.filt(flow)) {
+ var idx = _.sortedIndex(this.flows, flow, this.sort);
+ if (idx === this.flows.length) { //happens often, .push is way faster.
+ this.flows.push(flow);
+ } else {
+ this.flows.splice(idx, 0, flow);
+ }
+ this.emit("add", flow, idx);
+ }
+ },
+ update: function (flow) {
+ var idx;
+ var i = this.flows.length;
+ // Search from the back, we usually update the latest flows.
+ while (i--) {
+ if (this.flows[i].id === flow.id) {
+ idx = i;
+ break;
+ }
+ }
- return view;
+ if (idx === -1) { //not contained in list
+ this.add(flow);
+ } else if (!this.filt(flow)) {
+ this.remove(flow.id);
+ } else {
+ if (this.sort(this.flows[idx]) !== this.sort(flow)) { //sortpos has changed
+ this.remove(this.flows[idx]);
+ this.add(flow);
+ } else {
+ this.flows[idx] = flow;
+ this.emit("update", flow, idx);
+ }
+ }
},
- handle: function (action) {
- switch (action.type) {
- case ActionTypes.ADD_FLOW:
- case ActionTypes.UPDATE_FLOW:
- this.emit(action.type, action.data);
+ remove: function (flow_id) {
+ var i = this.flows.length;
+ while (i--) {
+ if (this.flows[i].id === flow_id) {
+ this.flows.splice(i, 1);
+ this.emit("remove", flow_id, i);
break;
- default:
- return;
+ }
}
}
});
-
-
-var FlowStore = new _FlowStore();
-AppDispatcher.register(FlowStore.handle.bind(FlowStore));
-
-function _Connection(url) {
- this.url = url;
+function Connection(url) {
+ if(url[0] != "/"){
+ this.url = url;
+ } else {
+ this.url = location.origin.replace("http", "ws") + url;
+ }
+ var ws = new WebSocket(this.url);
+ ws.onopen = function(){
+ this.onopen.apply(this, arguments);
+ }.bind(this);
+ ws.onmessage = function(){
+ this.onmessage.apply(this, arguments);
+ }.bind(this);
+ ws.onerror = function(){
+ this.onerror.apply(this, arguments);
+ }.bind(this);
+ ws.onclose = function(){
+ this.onclose.apply(this, arguments);
+ }.bind(this);
+ this.ws = ws;
}
-_Connection.prototype.init = function () {
- this.openWebSocketConnection();
-};
-_Connection.prototype.openWebSocketConnection = function () {
- this.ws = new WebSocket(this.url.replace("http", "ws"));
- var ws = this.ws;
-
- ws.onopen = this.onopen.bind(this);
- ws.onmessage = this.onmessage.bind(this);
- ws.onerror = this.onerror.bind(this);
- ws.onclose = this.onclose.bind(this);
-};
-_Connection.prototype.onopen = function (open) {
+Connection.prototype.onopen = function (open) {
console.debug("onopen", this, arguments);
};
-_Connection.prototype.onmessage = function (message) {
- //AppDispatcher.dispatchServerAction(...);
- var m = JSON.parse(message.data);
- AppDispatcher.dispatchServerAction(m);
+Connection.prototype.onmessage = function (message) {
+ console.warn("onmessage (not implemented)", this, message.data);
};
-_Connection.prototype.onerror = function (error) {
+Connection.prototype.onerror = function (error) {
EventLogActions.add_event("WebSocket Connection Error.");
console.debug("onerror", this, arguments);
};
-_Connection.prototype.onclose = function (close) {
+Connection.prototype.onclose = function (close) {
EventLogActions.add_event("WebSocket Connection closed.");
console.debug("onclose", this, arguments);
};
-
-var Connection = new _Connection(location.origin + "/updates");
-
+Connection.prototype.close = function(){
+ this.ws.close();
+};
/** @jsx React.DOM */
//React utils. For other utilities, see ../utils.js
@@ -1214,8 +1298,14 @@ var MainView = React.createClass({displayName: 'MainView',
};
},
componentDidMount: function () {
- this.flowStore = FlowStore.getView();
- this.flowStore.addListener("change",this.onFlowChange);
+ //FIXME: The store should be global, move out of here.
+ window.flowstore = new LiveFlowStore();
+
+ this.flowStore = window.flowstore.open_view();
+ this.flowStore.addListener("add",this.onFlowChange);
+ this.flowStore.addListener("update",this.onFlowChange);
+ this.flowStore.addListener("remove",this.onFlowChange);
+ this.flowStore.addListener("recalculate",this.onFlowChange);
},
componentWillUnmount: function () {
this.flowStore.removeListener("change",this.onFlowChange);
@@ -1223,7 +1313,7 @@ var MainView = React.createClass({displayName: 'MainView',
},
onFlowChange: function () {
this.setState({
- flows: this.flowStore.getAll()
+ flows: this.flowStore.flows
});
},
selectDetailTab: function(panel) {
@@ -1518,7 +1608,11 @@ var ProxyApp = (
)
);
$(function () {
- Connection.init();
- app = React.renderComponent(ProxyApp, document.body);
+ window.app = React.renderComponent(ProxyApp, document.body);
+ var UpdateConnection = new Connection("/updates");
+ UpdateConnection.onmessage = function (message) {
+ var m = JSON.parse(message.data);
+ AppDispatcher.dispatchServerAction(m);
+ };
});
//# sourceMappingURL=app.js.map \ No newline at end of file
diff --git a/test/test_console.py b/test/test_console.py
index 3b6c941d..d66bd8b0 100644
--- a/test/test_console.py
+++ b/test/test_console.py
@@ -15,7 +15,7 @@ class TestConsoleState:
"""
c = console.ConsoleState()
f = self._add_request(c)
- assert f in c._flow_list
+ assert f in c.flows
assert c.get_focus() == (f, 0)
def test_focus(self):
@@ -52,19 +52,19 @@ class TestConsoleState:
def _add_request(self, state):
f = tutils.tflow()
- return state.add_request(f)
+ return state.add_flow(f)
def _add_response(self, state):
f = self._add_request(state)
f.response = tutils.tresp()
- state.add_response(f)
+ state.update_flow(f)
def test_add_response(self):
c = console.ConsoleState()
f = self._add_request(c)
f.response = tutils.tresp()
c.focus = None
- c.add_response(f)
+ c.update_flow(f)
def test_focus_view(self):
c = console.ConsoleState()
diff --git a/test/test_flow.py b/test/test_flow.py
index 22abb4d4..fdfac62f 100644
--- a/test/test_flow.py
+++ b/test/test_flow.py
@@ -364,7 +364,7 @@ class TestState:
def test_backup(self):
c = flow.State()
f = tutils.tflow()
- c.add_request(f)
+ c.add_flow(f)
f.backup()
c.revert(f)
@@ -376,42 +376,42 @@ class TestState:
"""
c = flow.State()
f = tutils.tflow()
- c.add_request(f)
+ c.add_flow(f)
assert f
assert c.flow_count() == 1
assert c.active_flow_count() == 1
newf = tutils.tflow()
- assert c.add_request(newf)
+ assert c.add_flow(newf)
assert c.active_flow_count() == 2
f.response = tutils.tresp()
- assert c.add_response(f)
+ assert c.update_flow(f)
assert c.flow_count() == 2
assert c.active_flow_count() == 1
_ = tutils.tresp()
- assert not c.add_response(None)
+ assert not c.update_flow(None)
assert c.active_flow_count() == 1
newf.response = tutils.tresp()
- assert c.add_response(newf)
+ assert c.update_flow(newf)
assert c.active_flow_count() == 0
def test_err(self):
c = flow.State()
f = tutils.tflow()
- c.add_request(f)
+ c.add_flow(f)
f.error = Error("message")
- assert c.add_error(f)
+ assert c.update_flow(f)
c = flow.State()
f = tutils.tflow()
- c.add_request(f)
+ c.add_flow(f)
c.set_limit("~e")
assert not c.view
f.error = tutils.terr()
- assert c.add_error(f)
+ assert c.update_flow(f)
assert c.view
def test_set_limit(self):
@@ -420,20 +420,20 @@ class TestState:
f = tutils.tflow()
assert len(c.view) == 0
- c.add_request(f)
+ c.add_flow(f)
assert len(c.view) == 1
c.set_limit("~s")
assert c.limit_txt == "~s"
assert len(c.view) == 0
f.response = tutils.tresp()
- c.add_response(f)
+ c.update_flow(f)
assert len(c.view) == 1
c.set_limit(None)
assert len(c.view) == 1
f = tutils.tflow()
- c.add_request(f)
+ c.add_flow(f)
assert len(c.view) == 2
c.set_limit("~q")
assert len(c.view) == 1
@@ -452,18 +452,18 @@ class TestState:
def _add_request(self, state):
f = tutils.tflow()
- state.add_request(f)
+ state.add_flow(f)
return f
def _add_response(self, state):
f = tutils.tflow()
- state.add_request(f)
+ state.add_flow(f)
f.response = tutils.tresp()
- state.add_response(f)
+ state.update_flow(f)
def _add_error(self, state):
f = tutils.tflow(err=True)
- state.add_request(f)
+ state.add_flow(f)
def test_clear(self):
c = flow.State()
@@ -487,7 +487,7 @@ class TestState:
c.clear()
c.load_flows(flows)
- assert isinstance(c._flow_list[0], Flow)
+ assert isinstance(c.flows[0], Flow)
def test_accept_all(self):
c = flow.State()
@@ -532,7 +532,7 @@ class TestSerialize:
s = flow.State()
fm = flow.FlowMaster(None, s)
fm.load_flows(r)
- assert len(s._flow_list) == 6
+ assert len(s.flows) == 6
def test_load_flows_reverse(self):
r = self._treader()
@@ -540,7 +540,7 @@ class TestSerialize:
conf = ProxyConfig(mode="reverse", upstream_server=[True,True,"use-this-domain",80])
fm = flow.FlowMaster(DummyServer(conf), s)
fm.load_flows(r)
- assert s._flow_list[0].request.host == "use-this-domain"
+ assert s.flows[0].request.host == "use-this-domain"
def test_filter(self):
sio = StringIO()
diff --git a/test/test_server.py b/test/test_server.py
index c81eab2b..a611d30f 100644
--- a/test/test_server.py
+++ b/test/test_server.py
@@ -747,19 +747,19 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
assert req.content == "content"
assert req.status_code == 418
- assert not self.chain[1].tmaster.state._flow_list[0].response # killed
- assert self.chain[1].tmaster.state._flow_list[1].response
+ assert not self.chain[1].tmaster.state.flows[0].response # killed
+ assert self.chain[1].tmaster.state.flows[1].response
- assert self.proxy.tmaster.state._flow_list[0].request.form_in == "authority"
- assert self.proxy.tmaster.state._flow_list[1].request.form_in == "relative"
+ assert self.proxy.tmaster.state.flows[0].request.form_in == "authority"
+ assert self.proxy.tmaster.state.flows[1].request.form_in == "relative"
- assert self.chain[0].tmaster.state._flow_list[0].request.form_in == "authority"
- assert self.chain[0].tmaster.state._flow_list[1].request.form_in == "relative"
- assert self.chain[0].tmaster.state._flow_list[2].request.form_in == "authority"
- assert self.chain[0].tmaster.state._flow_list[3].request.form_in == "relative"
+ assert self.chain[0].tmaster.state.flows[0].request.form_in == "authority"
+ assert self.chain[0].tmaster.state.flows[1].request.form_in == "relative"
+ assert self.chain[0].tmaster.state.flows[2].request.form_in == "authority"
+ assert self.chain[0].tmaster.state.flows[3].request.form_in == "relative"
- assert self.chain[1].tmaster.state._flow_list[0].request.form_in == "relative"
- assert self.chain[1].tmaster.state._flow_list[1].request.form_in == "relative"
+ assert self.chain[1].tmaster.state.flows[0].request.form_in == "relative"
+ assert self.chain[1].tmaster.state.flows[1].request.form_in == "relative"
req = p.request("get:'/p/418:b\"content2\"'")
diff --git a/web/src/js/app.js b/web/src/js/app.js
index 736072dc..4ee35d60 100644
--- a/web/src/js/app.js
+++ b/web/src/js/app.js
@@ -1,4 +1,8 @@
$(function () {
- Connection.init();
- app = React.renderComponent(ProxyApp, document.body);
+ window.app = React.renderComponent(ProxyApp, document.body);
+ var UpdateConnection = new Connection("/updates");
+ UpdateConnection.onmessage = function (message) {
+ var m = JSON.parse(message.data);
+ AppDispatcher.dispatchServerAction(m);
+ };
}); \ No newline at end of file
diff --git a/web/src/js/components/mainview.jsx.js b/web/src/js/components/mainview.jsx.js
index 795b8136..f0dfb59a 100644
--- a/web/src/js/components/mainview.jsx.js
+++ b/web/src/js/components/mainview.jsx.js
@@ -7,8 +7,14 @@ var MainView = React.createClass({
};
},
componentDidMount: function () {
- this.flowStore = FlowStore.getView();
- this.flowStore.addListener("change",this.onFlowChange);
+ //FIXME: The store should be global, move out of here.
+ window.flowstore = new LiveFlowStore();
+
+ this.flowStore = window.flowstore.open_view();
+ this.flowStore.addListener("add",this.onFlowChange);
+ this.flowStore.addListener("update",this.onFlowChange);
+ this.flowStore.addListener("remove",this.onFlowChange);
+ this.flowStore.addListener("recalculate",this.onFlowChange);
},
componentWillUnmount: function () {
this.flowStore.removeListener("change",this.onFlowChange);
@@ -16,7 +22,7 @@ var MainView = React.createClass({
},
onFlowChange: function () {
this.setState({
- flows: this.flowStore.getAll()
+ flows: this.flowStore.flows
});
},
selectDetailTab: function(panel) {
diff --git a/web/src/js/connection.js b/web/src/js/connection.js
index 3edbfc20..64d550bf 100644
--- a/web/src/js/connection.js
+++ b/web/src/js/connection.js
@@ -1,33 +1,38 @@
-function _Connection(url) {
- this.url = url;
+function Connection(url) {
+ if(url[0] != "/"){
+ this.url = url;
+ } else {
+ this.url = location.origin.replace("http", "ws") + url;
+ }
+ var ws = new WebSocket(this.url);
+ ws.onopen = function(){
+ this.onopen.apply(this, arguments);
+ }.bind(this);
+ ws.onmessage = function(){
+ this.onmessage.apply(this, arguments);
+ }.bind(this);
+ ws.onerror = function(){
+ this.onerror.apply(this, arguments);
+ }.bind(this);
+ ws.onclose = function(){
+ this.onclose.apply(this, arguments);
+ }.bind(this);
+ this.ws = ws;
}
-_Connection.prototype.init = function () {
- this.openWebSocketConnection();
-};
-_Connection.prototype.openWebSocketConnection = function () {
- this.ws = new WebSocket(this.url.replace("http", "ws"));
- var ws = this.ws;
-
- ws.onopen = this.onopen.bind(this);
- ws.onmessage = this.onmessage.bind(this);
- ws.onerror = this.onerror.bind(this);
- ws.onclose = this.onclose.bind(this);
-};
-_Connection.prototype.onopen = function (open) {
+Connection.prototype.onopen = function (open) {
console.debug("onopen", this, arguments);
};
-_Connection.prototype.onmessage = function (message) {
- //AppDispatcher.dispatchServerAction(...);
- var m = JSON.parse(message.data);
- AppDispatcher.dispatchServerAction(m);
+Connection.prototype.onmessage = function (message) {
+ console.warn("onmessage (not implemented)", this, message.data);
};
-_Connection.prototype.onerror = function (error) {
+Connection.prototype.onerror = function (error) {
EventLogActions.add_event("WebSocket Connection Error.");
console.debug("onerror", this, arguments);
};
-_Connection.prototype.onclose = function (close) {
+Connection.prototype.onclose = function (close) {
EventLogActions.add_event("WebSocket Connection closed.");
console.debug("onclose", this, arguments);
};
-
-var Connection = new _Connection(location.origin + "/updates");
+Connection.prototype.close = function(){
+ this.ws.close();
+}; \ No newline at end of file
diff --git a/web/src/js/stores/flowstore.js b/web/src/js/stores/flowstore.js
index 7c0bddbd..53048441 100644
--- a/web/src/js/stores/flowstore.js
+++ b/web/src/js/stores/flowstore.js
@@ -1,91 +1,172 @@
-function FlowView(store, live) {
- EventEmitter.call(this);
- this._store = store;
- this.live = live;
- this.flows = [];
-
- this.add = this.add.bind(this);
- this.update = this.update.bind(this);
-
- if (live) {
- this._store.addListener(ActionTypes.ADD_FLOW, this.add);
- this._store.addListener(ActionTypes.UPDATE_FLOW, this.update);
- }
+function FlowStore(endpoint) {
+ this._views = [];
+ this.reset();
}
-
-_.extend(FlowView.prototype, EventEmitter.prototype, {
- close: function () {
- this._store.removeListener(ActionTypes.ADD_FLOW, this.add);
- this._store.removeListener(ActionTypes.UPDATE_FLOW, this.update);
+_.extend(FlowStore.prototype, {
+ add: function (flow) {
+ this._pos_map[flow.id] = this._flow_list.length;
+ this._flow_list.push(flow);
+ for (var i = 0; i < this._views.length; i++) {
+ this._views[i].add(flow);
+ }
},
- getAll: function () {
- return this.flows;
+ update: function (flow) {
+ this._flow_list[this._pos_map[flow.id]] = flow;
+ for (var i = 0; i < this._views.length; i++) {
+ this._views[i].update(flow);
+ }
},
- add: function (flow) {
- return this.update(flow);
+ remove: function (flow_id) {
+ this._flow_list.splice(this._pos_map[flow_id], 1);
+ this._build_map();
+ for (var i = 0; i < this._views.length; i++) {
+ this._views[i].remove(flow_id);
+ }
},
- add_bulk: function (flows) {
- //Treat all previously received updates as newer than the bulk update.
- //If they weren't newer, we're about to receive an update for them very soon.
- var updates = this.flows;
- this.flows = flows;
- updates.forEach(function(flow){
- this._update(flow);
- }.bind(this));
- this.emit("change");
+ reset: function (flows) {
+ this._flow_list = flows || [];
+ this._build_map();
+ for (var i = 0; i < this._views.length; i++) {
+ this._views[i].recalculate(this._flow_list);
+ }
+ },
+ _build_map: function () {
+ this._pos_map = {};
+ for (var i = 0; i < this._flow_list.length; i++) {
+ var flow = this._flow_list[i];
+ this._pos_map[flow.id] = i;
+ }
+ },
+ open_view: function (filt, sort) {
+ var view = new FlowView(this._flow_list, filt, sort);
+ this._views.push(view);
+ return view;
},
- _update: function(flow){
- var idx = _.findIndex(this.flows, function(f){
- return flow.id === f.id;
- });
+ close_view: function (view) {
+ this._views = _.without(this._views, view);
+ }
+});
+
- if(idx < 0){
- this.flows.push(flow);
- //if(this.flows.length > 100){
- // this.flows.shift();
- //}
+function LiveFlowStore(endpoint) {
+ FlowStore.call(this);
+ this.updates_before_init = []; // (empty array is true in js)
+ this.endpoint = endpoint || "/flows";
+ this.conn = new Connection(this.endpoint + "/updates");
+ this.conn.onopen = this._onopen.bind(this);
+ this.conn.onmessage = function (e) {
+ var message = JSON.parse(e.data);
+ this.handle_update(message.type, message.data);
+ }.bind(this);
+}
+_.extend(LiveFlowStore.prototype, FlowStore.prototype, {
+ handle_update: function (type, data) {
+ console.log("LiveFlowStore.handle_update", type, data);
+ if (this.updates_before_init) {
+ console.log("defer update", type, data);
+ this.updates_before_init.push(arguments);
} else {
- this.flows[idx] = flow;
+ this[type](data);
}
},
- update: function(flow){
- this._update(flow);
- this.emit("change");
+ handle_fetch: function (data) {
+ console.log("Flows fetched.");
+ this.reset(data.flows);
+ var updates = this.updates_before_init;
+ this.updates_before_init = false;
+ for (var i = 0; i < updates.length; i++) {
+ this.handle_update.apply(this, updates[i]);
+ }
+ },
+ _onopen: function () {
+ //Update stream openend, fetch list of flows.
+ console.log("Update Connection opened, fetching flows...");
+ $.getJSON(this.endpoint, this.handle_fetch.bind(this));
},
});
+function SortByInsertionOrder() {
+ this.i = 0;
+ this.map = {};
+ this.key = this.key.bind(this);
+}
+SortByInsertionOrder.prototype.key = function (flow) {
+ if (!(flow.id in this.map)) {
+ this.i++;
+ this.map[flow.id] = this.i;
+ }
+ return this.map[flow.id];
+};
+
+var default_sort = (new SortByInsertionOrder()).key;
-function _FlowStore() {
+function FlowView(flows, filt, sort) {
EventEmitter.call(this);
+ filt = filt || function (flow) {
+ return true;
+ };
+ sort = sort || default_sort;
+ this.recalculate(flows, filt, sort);
}
-_.extend(_FlowStore.prototype, EventEmitter.prototype, {
- getView: function (since) {
- var view = new FlowView(this, !since);
-
- $.getJSON("/static/flows.json", function(flows){
- flows = flows.concat(_.cloneDeep(flows)).concat(_.cloneDeep(flows));
- var id = 1;
- flows.forEach(function(flow){
- flow.id = "uuid-" + id++;
- });
- view.add_bulk(flows);
- });
+_.extend(FlowView.prototype, EventEmitter.prototype, {
+ recalculate: function (flows, filt, sort) {
+ if (filt) {
+ this.filt = filt;
+ }
+ if (sort) {
+ this.sort = sort;
+ }
+ this.flows = flows.filter(this.filt);
+ this.flows.sort(function (a, b) {
+ return this.sort(a) - this.sort(b);
+ }.bind(this));
+ this.emit("recalculate");
+ },
+ add: function (flow) {
+ if (this.filt(flow)) {
+ var idx = _.sortedIndex(this.flows, flow, this.sort);
+ if (idx === this.flows.length) { //happens often, .push is way faster.
+ this.flows.push(flow);
+ } else {
+ this.flows.splice(idx, 0, flow);
+ }
+ this.emit("add", flow, idx);
+ }
+ },
+ update: function (flow) {
+ var idx;
+ var i = this.flows.length;
+ // Search from the back, we usually update the latest flows.
+ while (i--) {
+ if (this.flows[i].id === flow.id) {
+ idx = i;
+ break;
+ }
+ }
- return view;
+ if (idx === -1) { //not contained in list
+ this.add(flow);
+ } else if (!this.filt(flow)) {
+ this.remove(flow.id);
+ } else {
+ if (this.sort(this.flows[idx]) !== this.sort(flow)) { //sortpos has changed
+ this.remove(this.flows[idx]);
+ this.add(flow);
+ } else {
+ this.flows[idx] = flow;
+ this.emit("update", flow, idx);
+ }
+ }
},
- handle: function (action) {
- switch (action.type) {
- case ActionTypes.ADD_FLOW:
- case ActionTypes.UPDATE_FLOW:
- this.emit(action.type, action.data);
+ remove: function (flow_id) {
+ var i = this.flows.length;
+ while (i--) {
+ if (this.flows[i].id === flow_id) {
+ this.flows.splice(i, 1);
+ this.emit("remove", flow_id, i);
break;
- default:
- return;
+ }
}
}
-});
-
-
-var FlowStore = new _FlowStore();
-AppDispatcher.register(FlowStore.handle.bind(FlowStore));
+}); \ No newline at end of file