diff options
Diffstat (limited to 'pathod/test/test_pathoc.py')
-rw-r--r-- | pathod/test/test_pathoc.py | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/pathod/test/test_pathoc.py b/pathod/test/test_pathoc.py new file mode 100644 index 00000000..62696a64 --- /dev/null +++ b/pathod/test/test_pathoc.py @@ -0,0 +1,308 @@ +import json +import cStringIO +import re +import OpenSSL +from mock import Mock + +from netlib import tcp, http, socks +from netlib.exceptions import HttpException, TcpException, NetlibException +from netlib.http import http1, http2 + +from libpathod import pathoc, test, version, pathod, language +from netlib.tutils import raises +import tutils + + +def test_response(): + r = http.Response("HTTP/1.1", 200, "Message", {}, None, None) + assert repr(r) + + +class _TestDaemon: + ssloptions = pathod.SSLOptions() + + @classmethod + def setup_class(cls): + cls.d = test.Daemon( + ssl=cls.ssl, + ssloptions=cls.ssloptions, + staticdir=tutils.test_data.path("data"), + anchors=[ + (re.compile("/anchor/.*"), "202") + ] + ) + + @classmethod + def teardown_class(cls): + cls.d.shutdown() + + def setUp(self): + self.d.clear_log() + + def test_info(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + ssl=self.ssl, + fp=None + ) + c.connect() + resp = c.request("get:/api/info") + assert tuple(json.loads(resp.content)["version"]) == version.IVERSION + + def tval( + self, + requests, + showreq=False, + showresp=False, + explain=False, + showssl=False, + hexdump=False, + timeout=None, + ignorecodes=(), + ignoretimeout=None, + showsummary=True + ): + s = cStringIO.StringIO() + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + ssl=self.ssl, + showreq=showreq, + showresp=showresp, + explain=explain, + hexdump=hexdump, + ignorecodes=ignorecodes, + ignoretimeout=ignoretimeout, + showsummary=showsummary, + fp=s + ) + c.connect(showssl=showssl, fp=s) + if timeout: + c.settimeout(timeout) + for i in requests: + r = language.parse_pathoc(i).next() + if explain: + r = r.freeze(language.Settings()) + try: + c.request(r) + except NetlibException: + pass + return s.getvalue() + + +class TestDaemonSSL(_TestDaemon): + ssl = True + ssloptions = pathod.SSLOptions( + request_client_cert=True, + sans=["test1.com", "test2.com"], + alpn_select=b'h2', + ) + + def test_sni(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + ssl=True, + sni="foobar.com", + fp=None + ) + c.connect() + c.request("get:/p/200") + r = c.request("get:/api/log") + d = json.loads(r.content) + assert d["log"][0]["request"]["sni"] == "foobar.com" + + def test_showssl(self): + assert "certificate chain" in self.tval(["get:/p/200"], showssl=True) + + def test_clientcert(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + ssl=True, + clientcert=tutils.test_data.path("data/clientcert/client.pem"), + fp=None + ) + c.connect() + c.request("get:/p/200") + r = c.request("get:/api/log") + d = json.loads(r.content) + assert d["log"][0]["request"]["clientcert"]["keyinfo"] + + def test_http2_without_ssl(self): + fp = cStringIO.StringIO() + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + use_http2=True, + ssl=False, + fp = fp + ) + tutils.raises(NotImplementedError, c.connect) + + +class TestDaemon(_TestDaemon): + ssl = False + + def test_ssl_error(self): + c = pathoc.Pathoc(("127.0.0.1", self.d.port), ssl=True, fp=None) + tutils.raises("ssl handshake", c.connect) + + def test_showssl(self): + assert not "certificate chain" in self.tval( + ["get:/p/200"], + showssl=True) + + def test_ignorecodes(self): + assert "200" in self.tval(["get:'/p/200:b@1'"]) + assert "200" in self.tval(["get:'/p/200:b@1'"]) + assert "200" in self.tval(["get:'/p/200:b@1'"]) + assert "200" not in self.tval(["get:'/p/200:b@1'"], ignorecodes=[200]) + assert "200" not in self.tval( + ["get:'/p/200:b@1'"], + ignorecodes=[ + 200, + 201]) + assert "202" in self.tval(["get:'/p/202:b@1'"], ignorecodes=[200, 201]) + + def test_timeout(self): + assert "Timeout" in self.tval(["get:'/p/200:p0,100'"], timeout=0.01) + assert "HTTP" in self.tval( + ["get:'/p/200:p5,100'"], + showresp=True, + timeout=1 + ) + assert not "HTTP" in self.tval( + ["get:'/p/200:p3,100'"], + showresp=True, + timeout=1, + ignoretimeout=True + ) + + def test_showresp(self): + reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"] + assert self.tval(reqs).count("200") == 2 + assert self.tval(reqs, showresp=True).count("HTTP/1.1 200 OK") == 2 + assert self.tval( + reqs, showresp=True, hexdump=True + ).count("0000000000") == 2 + + def test_showresp_httperr(self): + v = self.tval(["get:'/p/200:d20'"], showresp=True, showsummary=True) + assert "Invalid headers" in v + assert "HTTP/" in v + + def test_explain(self): + reqs = ["get:/p/200:b@100"] + assert "b@100" not in self.tval(reqs, explain=True) + + def test_showreq(self): + reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"] + assert self.tval(reqs, showreq=True).count("GET /api") == 2 + assert self.tval( + reqs, showreq=True, hexdump=True + ).count("0000000000") == 2 + + def test_conn_err(self): + assert "Invalid server response" in self.tval(["get:'/p/200:d2'"]) + + def test_websocket_shutdown(self): + c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None) + c.connect() + c.request("ws:/") + c.stop() + + def test_wait_finish(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + fp=None, + ws_read_limit=1 + ) + c.connect() + c.request("ws:/") + c.request("wf:f'wf:x100'") + [i for i in c.wait(timeout=0, finish=False)] + [i for i in c.wait(timeout=0)] + + def test_connect_fail(self): + to = ("foobar", 80) + c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None) + c.rfile, c.wfile = cStringIO.StringIO(), cStringIO.StringIO() + with raises("connect failed"): + c.http_connect(to) + c.rfile = cStringIO.StringIO( + "HTTP/1.1 500 OK\r\n" + ) + with raises("connect failed"): + c.http_connect(to) + c.rfile = cStringIO.StringIO( + "HTTP/1.1 200 OK\r\n" + ) + c.http_connect(to) + + def test_socks_connect(self): + to = ("foobar", 80) + c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None) + c.rfile, c.wfile = tutils.treader(""), cStringIO.StringIO() + tutils.raises(pathoc.PathocError, c.socks_connect, to) + + c.rfile = tutils.treader( + "\x05\xEE" + ) + tutils.raises("SOCKS without authentication", c.socks_connect, ("example.com", 0xDEAD)) + + c.rfile = tutils.treader( + "\x05\x00" + + "\x05\xEE\x00\x03\x0bexample.com\xDE\xAD" + ) + tutils.raises("SOCKS server error", c.socks_connect, ("example.com", 0xDEAD)) + + c.rfile = tutils.treader( + "\x05\x00" + + "\x05\x00\x00\x03\x0bexample.com\xDE\xAD" + ) + c.socks_connect(("example.com", 0xDEAD)) + + +class TestDaemonHTTP2(_TestDaemon): + ssl = True + + if OpenSSL._util.lib.Cryptography_HAS_ALPN: + + def test_http2(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + fp=None, + ssl=True, + use_http2=True, + ) + assert isinstance(c.protocol, http2.HTTP2Protocol) + + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + ) + assert c.protocol == http1 + + def test_http2_alpn(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + fp=None, + ssl=True, + use_http2=True, + http2_skip_connection_preface=True, + ) + + tmp_convert_to_ssl = c.convert_to_ssl + c.convert_to_ssl = Mock() + c.convert_to_ssl.side_effect = tmp_convert_to_ssl + c.connect() + + _, kwargs = c.convert_to_ssl.call_args + assert set(kwargs['alpn_protos']) == set([b'http/1.1', b'h2']) + + def test_request(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + fp=None, + ssl=True, + use_http2=True, + ) + c.connect() + resp = c.request("get:/p/200") + assert resp.status_code == 200 |