aboutsummaryrefslogtreecommitdiffstats
path: root/libpathod/pathoc.py
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@nullcube.com>2015-06-05 16:03:17 +1200
committerAldo Cortesi <aldo@nullcube.com>2015-06-05 16:03:17 +1200
commit7412ec83f55e6a9dcdde84603b88cd67bbf8b04d (patch)
treec66f062afe1c08ef05b714fa26306a243e603994 /libpathod/pathoc.py
parentd23691f98c9e421a05db0519e8a19388db3a97be (diff)
downloadmitmproxy-7412ec83f55e6a9dcdde84603b88cd67bbf8b04d.tar.gz
mitmproxy-7412ec83f55e6a9dcdde84603b88cd67bbf8b04d.tar.bz2
mitmproxy-7412ec83f55e6a9dcdde84603b88cd67bbf8b04d.zip
Refactor pathoc message receive to use queues and generators
This gives us a nicer, thread-safe interface.
Diffstat (limited to 'libpathod/pathoc.py')
-rw-r--r--libpathod/pathoc.py46
1 files changed, 32 insertions, 14 deletions
diff --git a/libpathod/pathoc.py b/libpathod/pathoc.py
index cb954e15..385b61db 100644
--- a/libpathod/pathoc.py
+++ b/libpathod/pathoc.py
@@ -85,16 +85,15 @@ class WebsocketFrameReader(threading.Thread):
logfp,
showresp,
hexdump,
- callback,
ws_read_limit):
threading.Thread.__init__(self)
self.ws_read_limit = ws_read_limit
self.logfp = logfp
self.showresp = showresp
self.hexdump = hexdump
- self.rfile, self.callback = rfile, callback
+ self.rfile = rfile
self.terminate = Queue.Queue()
- self.is_done = Queue.Queue()
+ self.frames_queue = Queue.Queue()
def log(self, rfile):
return log.Log(
@@ -121,11 +120,11 @@ class WebsocketFrameReader(threading.Thread):
except tcp.NetLibError:
self.ws_read_limit = 0
break
+ self.frames_queue.put(frm)
log("<< %s" % frm.header.human_readable())
- self.callback(frm)
if self.ws_read_limit is not None:
self.ws_read_limit -= 1
- self.is_done.put(None)
+ self.frames_queue.put(None)
class Pathoc(tcp.TCPClient):
@@ -248,15 +247,33 @@ class Pathoc(tcp.TCPClient):
if self.ws_framereader:
self.ws_framereader.terminate.put(None)
- def wait(self):
+ def wait(self, timeout=0.01, finish=True):
+ """
+ A generator that yields frames until Pathoc terminates.
+
+ timeout: If specified None may be yielded instead if timeout is
+ reached. If timeout is None, wait forever. If timeout is 0, return
+ immedately if nothing is on the queue.
+
+ finish: If true, consume messages until the reader shuts down.
+ Otherwise, return None on timeout.
+ """
if self.ws_framereader:
while True:
try:
- self.ws_framereader.is_done.get(timeout=0.05)
+ frm = self.ws_framereader.frames_queue.get(
+ timeout = timeout,
+ block = True if timeout != 0 else False
+ )
+ except Queue.Empty:
+ if finish:
+ continue
+ else:
+ return
+ if frm is None:
self.ws_framereader.join()
return
- except Queue.Empty:
- pass
+ yield frm
def websocket_get_frame(self, frame):
"""
@@ -281,13 +298,11 @@ class Pathoc(tcp.TCPClient):
return None
raise
- def websocket_start(self, r, callback=None, limit=None):
+ def websocket_start(self, r, limit=None):
"""
Performs an HTTP request, and attempts to drop into websocket
connection.
- callback: A callback called within the websocket thread for every
- server frame.
limit: Disconnect after receiving N server frames.
"""
resp = self.http(r)
@@ -297,7 +312,6 @@ class Pathoc(tcp.TCPClient):
self.fp,
self.showresp,
self.hexdump,
- callback,
self.ws_read_limit
)
self.ws_framereader.start()
@@ -432,9 +446,13 @@ def main(args): # pragma: nocover
ret = p.request(spec)
if ret and args.oneshot:
return
+ # We consume the queue when we can, so it doesn't build up.
+ for i in p.wait(timeout=0, finish=False):
+ pass
except (http.HttpError, tcp.NetLibError) as v:
break
- p.wait()
+ for i in p.wait(timeout=0.01, finish=True):
+ pass
except KeyboardInterrupt:
pass
if p: