diff options
-rw-r--r-- | libpathod/pathoc.py | 64 | ||||
-rw-r--r-- | libpathod/pathod.py | 3 | ||||
-rw-r--r-- | libpathod/test.py | 9 | ||||
-rw-r--r-- | test/test_pathod.py | 48 | ||||
-rw-r--r-- | test/tutils.py | 8 |
5 files changed, 87 insertions, 45 deletions
diff --git a/libpathod/pathoc.py b/libpathod/pathoc.py index ecf1d4d9..fcb254c0 100644 --- a/libpathod/pathoc.py +++ b/libpathod/pathoc.py @@ -1,3 +1,4 @@ +import contextlib import sys import os import itertools @@ -86,8 +87,11 @@ class WebsocketFrameReader(threading.Thread): logfp, showresp, hexdump, - ws_read_limit): + ws_read_limit, + timeout + ): threading.Thread.__init__(self) + self.timeout = timeout self.ws_read_limit = ws_read_limit self.logfp = logfp self.showresp = showresp @@ -104,28 +108,33 @@ class WebsocketFrameReader(threading.Thread): None ) + @contextlib.contextmanager + def terminator(self): + yield + self.frames_queue.put(None) + def run(self): - while True: - if self.ws_read_limit == 0: - break - r, _, _ = select.select([self.rfile], [], [], 0.05) - try: - self.terminate.get_nowait() - break - except Queue.Empty: - pass - for rfile in r: - with self.log(rfile) as log: - try: + starttime = time.time() + with self.terminator(): + while True: + if self.ws_read_limit == 0: + return + r, _, x = select.select([self.rfile], [], [], 0.05) + if not r and time.time() - starttime > self.timeout: + return + try: + self.terminate.get_nowait() + return + except Queue.Empty: + pass + for rfile in r: + with self.log(rfile) as log: frm = websockets.Frame.from_file(self.rfile) - except tcp.NetLibError: - self.ws_read_limit = 0 - break - self.frames_queue.put(frm) - log("<< %s" % frm.header.human_readable()) - if self.ws_read_limit is not None: - self.ws_read_limit -= 1 - self.frames_queue.put(None) + self.frames_queue.put(frm) + log("<< %s" % frm.header.human_readable()) + if self.ws_read_limit is not None: + self.ws_read_limit -= 1 + starttime = time.time() class Pathoc(tcp.TCPClient): @@ -143,6 +152,9 @@ class Pathoc(tcp.TCPClient): # Websockets ws_read_limit = None, + # Network + timeout = None, + # Output control showreq = False, showresp = False, @@ -178,6 +190,8 @@ class Pathoc(tcp.TCPClient): self.ws_read_limit = ws_read_limit + self.timeout = timeout + self.showreq = showreq self.showresp = showresp self.explain = explain @@ -219,6 +233,8 @@ class Pathoc(tcp.TCPClient): an HTTP CONNECT request. """ tcp.TCPClient.connect(self) + if self.timeout: + self.settimeout(self.timeout) if connect_to: self.http_connect(connect_to) self.sslinfo = None @@ -313,7 +329,8 @@ class Pathoc(tcp.TCPClient): self.fp, self.showresp, self.hexdump, - self.ws_read_limit + self.ws_read_limit, + self.timeout ) self.ws_framereader.start() return resp @@ -412,6 +429,7 @@ def main(args): # pragma: nocover explain = args.explain, hexdump = args.hexdump, ignorecodes = args.ignorecodes, + timeout = args.timeout, ignoretimeout = args.ignoretimeout, showsummary = True ) @@ -424,8 +442,6 @@ def main(args): # pragma: nocover except PathocError as v: print >> sys.stderr, str(v) sys.exit(1) - if args.timeout: - p.settimeout(args.timeout) for spec in playlist: if args.explain or args.memo: spec = spec.freeze(p.settings) diff --git a/libpathod/pathod.py b/libpathod/pathod.py index 3c42573d..13f602b4 100644 --- a/libpathod/pathod.py +++ b/libpathod/pathod.py @@ -133,7 +133,8 @@ class PathodHandler(tcp.BaseHandler): try: wf_gen = language.parse_websocket_frame(nest) except language.exceptions.ParseException, v: - lg( + log.write( + self.logfp, "Parse error in reflected frame specifcation:" " %s" % v.msg ) diff --git a/libpathod/test.py b/libpathod/test.py index 6a15182a..ebb3a49f 100644 --- a/libpathod/test.py +++ b/libpathod/test.py @@ -1,3 +1,4 @@ +import cStringIO import threading import Queue @@ -13,6 +14,8 @@ class Daemon: def __init__(self, ssl=None, **daemonargs): self.q = Queue.Queue() + self.logfp = cStringIO.StringIO() + daemonargs["logfp"] = self.logfp self.thread = _PaThread(self.IFACE, self.q, ssl, daemonargs) self.thread.start() self.port = self.q.get(True, 5) @@ -26,6 +29,7 @@ class Daemon: return self def __exit__(self, type, value, traceback): + self.logfp.truncate(0) self.shutdown() return False @@ -42,6 +46,9 @@ class Daemon: resp = requests.get("%s/api/info" % self.urlbase, verify=False) return resp.json() + def text_log(self): + return self.logfp.getvalue() + def last_log(self): """ Returns the last logged request, or None. @@ -62,6 +69,7 @@ class Daemon: """ Clear the log. """ + self.logfp.truncate(0) resp = requests.get("%s/api/clear_log" % self.urlbase, verify=False) return resp.ok @@ -84,7 +92,6 @@ class _PaThread(threading.Thread): self.server = pathod.Pathod( (self.iface, 0), ssl = self.ssl, - logfp = None, **self.daemonargs ) self.name = "PathodThread (%s:%s)" % ( diff --git a/test/test_pathod.py b/test/test_pathod.py index 55a5b32e..7d5f90b6 100644 --- a/test/test_pathod.py +++ b/test/test_pathod.py @@ -59,7 +59,7 @@ class TestNotAfterConnect(tutils.DaemonTests): ) def test_connect(self): - r = self.pathoc( + r, _ = self.pathoc( [r"get:'http://foo.com/p/202':da"], connect_to=("localhost", self.d.port) ) @@ -73,7 +73,8 @@ class TestCustomCert(tutils.DaemonTests): ) def test_connect(self): - r = self.pathoc([r"get:/p/202"])[0] + r, _ = self.pathoc([r"get:/p/202"]) + r = r[0] assert r.status_code == 202 assert r.sslinfo assert "test.com" in str(r.sslinfo.certchain[0].get_subject()) @@ -86,7 +87,8 @@ class TestSSLCN(tutils.DaemonTests): ) def test_connect(self): - r = self.pathoc([r"get:/p/202"])[0] + r, _ = self.pathoc([r"get:/p/202"]) + r = r[0] assert r.status_code == 202 assert r.sslinfo assert r.sslinfo.certchain[0].get_subject().CN == "foo.com" @@ -131,8 +133,8 @@ class CommonTests(tutils.DaemonTests): assert "too large" in l["response"]["msg"] def test_preline(self): - r = self.pathoc([r"get:'/p/200':i0,'\r\n'"])[0] - assert r.status_code == 200 + r, _ = self.pathoc([r"get:'/p/200':i0,'\r\n'"]) + assert r[0].status_code == 200 def test_info(self): assert tuple(self.d.info()["version"]) == version.IVERSION @@ -199,31 +201,43 @@ class CommonTests(tutils.DaemonTests): assert "File access denied" in rsp.content def test_proxy(self): - r = self.pathoc([r"get:'http://foo.com/p/202':da"])[0] - assert r.status_code == 202 + r, _ = self.pathoc([r"get:'http://foo.com/p/202':da"]) + assert r[0].status_code == 202 def test_websocket(self): - r = self.pathoc(["ws:/p/"], ws_read_limit=0)[0] - assert r.status_code == 101 + r, _ = self.pathoc(["ws:/p/"], ws_read_limit=0) + assert r[0].status_code == 101 - r = self.pathoc(["ws:/p/ws"], ws_read_limit=0)[0] - assert r.status_code == 101 + r, _ = self.pathoc(["ws:/p/ws"], ws_read_limit=0) + assert r[0].status_code == 101 def test_websocket_frame(self): - r = self.pathoc(["ws:/p/", "wf:f'wf:b\"test\"'"], ws_read_limit=1) + r, _ = self.pathoc(["ws:/p/", "wf:f'wf:b\"test\"'"], ws_read_limit=1) assert r[1].payload == "test" + def test_websocket_frame_reflect_error(self): + r, _ = self.pathoc( + ["ws:/p/", "wf:-mask:knone:f'wf:b@10':i13,'a'"], + ws_read_limit = 1, + timeout = 1 + ) + assert "Parse error" in self.d.text_log() + + def test_websocket_frame_disconnect_error(self): + self.pathoc(["ws:/p/", "wf:b@10:d3"], ws_read_limit=0) + assert self.d.last_log() + class TestDaemon(CommonTests): ssl = False def test_connect(self): - r = self.pathoc( + r, _ = self.pathoc( [r"get:'http://foo.com/p/202':da"], connect_to=("localhost", self.d.port), ssl=True - )[0] - assert r.status_code == 202 + ) + assert r[0].status_code == 202 def test_connect_err(self): tutils.raises( @@ -249,6 +263,6 @@ class TestDaemonSSL(CommonTests): assert "SSL" in l["msg"] def test_ssl_cipher(self): - r = self.pathoc([r"get:/p/202"])[0] - assert r.status_code == 202 + r, _ = self.pathoc([r"get:/p/202"]) + assert r[0].status_code == 202 assert self.d.last_log()["cipher"][1] > 0 diff --git a/test/tutils.py b/test/tutils.py index 1e33dfe7..5988e846 100644 --- a/test/tutils.py +++ b/test/tutils.py @@ -75,13 +75,17 @@ class DaemonTests(object): ssl=None, ws_read_limit=None ): + """ + Returns a (messages, text log) tuple. + """ if ssl is None: ssl = self.ssl + logfp = cStringIO.StringIO() c = pathoc.Pathoc( ("localhost", self.d.port), ssl=ssl, ws_read_limit=ws_read_limit, - fp = None + fp = logfp ) c.connect(connect_to) if timeout: @@ -93,7 +97,7 @@ class DaemonTests(object): ret.append(resp) for frm in c.wait(): ret.append(frm) - return ret + return ret, logfp.getvalue() @contextmanager def tmpdir(*args, **kwargs): |