aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libpathod/pathoc.py64
-rw-r--r--libpathod/pathod.py3
-rw-r--r--libpathod/test.py9
-rw-r--r--test/test_pathod.py48
-rw-r--r--test/tutils.py8
5 files changed, 87 insertions, 45 deletions
diff --git a/libpathod/pathoc.py b/libpathod/pathoc.py
index ecf1d4d9..fcb254c0 100644
--- a/libpathod/pathoc.py
+++ b/libpathod/pathoc.py
@@ -1,3 +1,4 @@
+import contextlib
import sys
import os
import itertools
@@ -86,8 +87,11 @@ class WebsocketFrameReader(threading.Thread):
logfp,
showresp,
hexdump,
- ws_read_limit):
+ ws_read_limit,
+ timeout
+ ):
threading.Thread.__init__(self)
+ self.timeout = timeout
self.ws_read_limit = ws_read_limit
self.logfp = logfp
self.showresp = showresp
@@ -104,28 +108,33 @@ class WebsocketFrameReader(threading.Thread):
None
)
+ @contextlib.contextmanager
+ def terminator(self):
+ yield
+ self.frames_queue.put(None)
+
def run(self):
- while True:
- if self.ws_read_limit == 0:
- break
- r, _, _ = select.select([self.rfile], [], [], 0.05)
- try:
- self.terminate.get_nowait()
- break
- except Queue.Empty:
- pass
- for rfile in r:
- with self.log(rfile) as log:
- try:
+ starttime = time.time()
+ with self.terminator():
+ while True:
+ if self.ws_read_limit == 0:
+ return
+ r, _, x = select.select([self.rfile], [], [], 0.05)
+ if not r and time.time() - starttime > self.timeout:
+ return
+ try:
+ self.terminate.get_nowait()
+ return
+ except Queue.Empty:
+ pass
+ for rfile in r:
+ with self.log(rfile) as log:
frm = websockets.Frame.from_file(self.rfile)
- except tcp.NetLibError:
- self.ws_read_limit = 0
- break
- self.frames_queue.put(frm)
- log("<< %s" % frm.header.human_readable())
- if self.ws_read_limit is not None:
- self.ws_read_limit -= 1
- self.frames_queue.put(None)
+ self.frames_queue.put(frm)
+ log("<< %s" % frm.header.human_readable())
+ if self.ws_read_limit is not None:
+ self.ws_read_limit -= 1
+ starttime = time.time()
class Pathoc(tcp.TCPClient):
@@ -143,6 +152,9 @@ class Pathoc(tcp.TCPClient):
# Websockets
ws_read_limit = None,
+ # Network
+ timeout = None,
+
# Output control
showreq = False,
showresp = False,
@@ -178,6 +190,8 @@ class Pathoc(tcp.TCPClient):
self.ws_read_limit = ws_read_limit
+ self.timeout = timeout
+
self.showreq = showreq
self.showresp = showresp
self.explain = explain
@@ -219,6 +233,8 @@ class Pathoc(tcp.TCPClient):
an HTTP CONNECT request.
"""
tcp.TCPClient.connect(self)
+ if self.timeout:
+ self.settimeout(self.timeout)
if connect_to:
self.http_connect(connect_to)
self.sslinfo = None
@@ -313,7 +329,8 @@ class Pathoc(tcp.TCPClient):
self.fp,
self.showresp,
self.hexdump,
- self.ws_read_limit
+ self.ws_read_limit,
+ self.timeout
)
self.ws_framereader.start()
return resp
@@ -412,6 +429,7 @@ def main(args): # pragma: nocover
explain = args.explain,
hexdump = args.hexdump,
ignorecodes = args.ignorecodes,
+ timeout = args.timeout,
ignoretimeout = args.ignoretimeout,
showsummary = True
)
@@ -424,8 +442,6 @@ def main(args): # pragma: nocover
except PathocError as v:
print >> sys.stderr, str(v)
sys.exit(1)
- if args.timeout:
- p.settimeout(args.timeout)
for spec in playlist:
if args.explain or args.memo:
spec = spec.freeze(p.settings)
diff --git a/libpathod/pathod.py b/libpathod/pathod.py
index 3c42573d..13f602b4 100644
--- a/libpathod/pathod.py
+++ b/libpathod/pathod.py
@@ -133,7 +133,8 @@ class PathodHandler(tcp.BaseHandler):
try:
wf_gen = language.parse_websocket_frame(nest)
except language.exceptions.ParseException, v:
- lg(
+ log.write(
+ self.logfp,
"Parse error in reflected frame specifcation:"
" %s" % v.msg
)
diff --git a/libpathod/test.py b/libpathod/test.py
index 6a15182a..ebb3a49f 100644
--- a/libpathod/test.py
+++ b/libpathod/test.py
@@ -1,3 +1,4 @@
+import cStringIO
import threading
import Queue
@@ -13,6 +14,8 @@ class Daemon:
def __init__(self, ssl=None, **daemonargs):
self.q = Queue.Queue()
+ self.logfp = cStringIO.StringIO()
+ daemonargs["logfp"] = self.logfp
self.thread = _PaThread(self.IFACE, self.q, ssl, daemonargs)
self.thread.start()
self.port = self.q.get(True, 5)
@@ -26,6 +29,7 @@ class Daemon:
return self
def __exit__(self, type, value, traceback):
+ self.logfp.truncate(0)
self.shutdown()
return False
@@ -42,6 +46,9 @@ class Daemon:
resp = requests.get("%s/api/info" % self.urlbase, verify=False)
return resp.json()
+ def text_log(self):
+ return self.logfp.getvalue()
+
def last_log(self):
"""
Returns the last logged request, or None.
@@ -62,6 +69,7 @@ class Daemon:
"""
Clear the log.
"""
+ self.logfp.truncate(0)
resp = requests.get("%s/api/clear_log" % self.urlbase, verify=False)
return resp.ok
@@ -84,7 +92,6 @@ class _PaThread(threading.Thread):
self.server = pathod.Pathod(
(self.iface, 0),
ssl = self.ssl,
- logfp = None,
**self.daemonargs
)
self.name = "PathodThread (%s:%s)" % (
diff --git a/test/test_pathod.py b/test/test_pathod.py
index 55a5b32e..7d5f90b6 100644
--- a/test/test_pathod.py
+++ b/test/test_pathod.py
@@ -59,7 +59,7 @@ class TestNotAfterConnect(tutils.DaemonTests):
)
def test_connect(self):
- r = self.pathoc(
+ r, _ = self.pathoc(
[r"get:'http://foo.com/p/202':da"],
connect_to=("localhost", self.d.port)
)
@@ -73,7 +73,8 @@ class TestCustomCert(tutils.DaemonTests):
)
def test_connect(self):
- r = self.pathoc([r"get:/p/202"])[0]
+ r, _ = self.pathoc([r"get:/p/202"])
+ r = r[0]
assert r.status_code == 202
assert r.sslinfo
assert "test.com" in str(r.sslinfo.certchain[0].get_subject())
@@ -86,7 +87,8 @@ class TestSSLCN(tutils.DaemonTests):
)
def test_connect(self):
- r = self.pathoc([r"get:/p/202"])[0]
+ r, _ = self.pathoc([r"get:/p/202"])
+ r = r[0]
assert r.status_code == 202
assert r.sslinfo
assert r.sslinfo.certchain[0].get_subject().CN == "foo.com"
@@ -131,8 +133,8 @@ class CommonTests(tutils.DaemonTests):
assert "too large" in l["response"]["msg"]
def test_preline(self):
- r = self.pathoc([r"get:'/p/200':i0,'\r\n'"])[0]
- assert r.status_code == 200
+ r, _ = self.pathoc([r"get:'/p/200':i0,'\r\n'"])
+ assert r[0].status_code == 200
def test_info(self):
assert tuple(self.d.info()["version"]) == version.IVERSION
@@ -199,31 +201,43 @@ class CommonTests(tutils.DaemonTests):
assert "File access denied" in rsp.content
def test_proxy(self):
- r = self.pathoc([r"get:'http://foo.com/p/202':da"])[0]
- assert r.status_code == 202
+ r, _ = self.pathoc([r"get:'http://foo.com/p/202':da"])
+ assert r[0].status_code == 202
def test_websocket(self):
- r = self.pathoc(["ws:/p/"], ws_read_limit=0)[0]
- assert r.status_code == 101
+ r, _ = self.pathoc(["ws:/p/"], ws_read_limit=0)
+ assert r[0].status_code == 101
- r = self.pathoc(["ws:/p/ws"], ws_read_limit=0)[0]
- assert r.status_code == 101
+ r, _ = self.pathoc(["ws:/p/ws"], ws_read_limit=0)
+ assert r[0].status_code == 101
def test_websocket_frame(self):
- r = self.pathoc(["ws:/p/", "wf:f'wf:b\"test\"'"], ws_read_limit=1)
+ r, _ = self.pathoc(["ws:/p/", "wf:f'wf:b\"test\"'"], ws_read_limit=1)
assert r[1].payload == "test"
+ def test_websocket_frame_reflect_error(self):
+ r, _ = self.pathoc(
+ ["ws:/p/", "wf:-mask:knone:f'wf:b@10':i13,'a'"],
+ ws_read_limit = 1,
+ timeout = 1
+ )
+ assert "Parse error" in self.d.text_log()
+
+ def test_websocket_frame_disconnect_error(self):
+ self.pathoc(["ws:/p/", "wf:b@10:d3"], ws_read_limit=0)
+ assert self.d.last_log()
+
class TestDaemon(CommonTests):
ssl = False
def test_connect(self):
- r = self.pathoc(
+ r, _ = self.pathoc(
[r"get:'http://foo.com/p/202':da"],
connect_to=("localhost", self.d.port),
ssl=True
- )[0]
- assert r.status_code == 202
+ )
+ assert r[0].status_code == 202
def test_connect_err(self):
tutils.raises(
@@ -249,6 +263,6 @@ class TestDaemonSSL(CommonTests):
assert "SSL" in l["msg"]
def test_ssl_cipher(self):
- r = self.pathoc([r"get:/p/202"])[0]
- assert r.status_code == 202
+ r, _ = self.pathoc([r"get:/p/202"])
+ assert r[0].status_code == 202
assert self.d.last_log()["cipher"][1] > 0
diff --git a/test/tutils.py b/test/tutils.py
index 1e33dfe7..5988e846 100644
--- a/test/tutils.py
+++ b/test/tutils.py
@@ -75,13 +75,17 @@ class DaemonTests(object):
ssl=None,
ws_read_limit=None
):
+ """
+ Returns a (messages, text log) tuple.
+ """
if ssl is None:
ssl = self.ssl
+ logfp = cStringIO.StringIO()
c = pathoc.Pathoc(
("localhost", self.d.port),
ssl=ssl,
ws_read_limit=ws_read_limit,
- fp = None
+ fp = logfp
)
c.connect(connect_to)
if timeout:
@@ -93,7 +97,7 @@ class DaemonTests(object):
ret.append(resp)
for frm in c.wait():
ret.append(frm)
- return ret
+ return ret, logfp.getvalue()
@contextmanager
def tmpdir(*args, **kwargs):