aboutsummaryrefslogtreecommitdiffstats
path: root/test/pathod/test_pathod.py
blob: ed4ef49fbc42351d7d1ebc0ab99e1ea91c77fe52 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129pre { line-height: 125%; margin: 0; }
td.linenos pre { color: #000000; background-color: #f0f0f0; padding: 0 5px 0 5px; }
span.linenos { color: #000000; background-color: #f0f0f0; padding: 0 5px 0 5px; }
td.linenos pre.special { color: #000000; background-color: #ffffc0; padding: 0 5px 0 5px; }
span.linenos.special { color: #000000; background-color: #ffffc0; padding: 0 5px 0 5px; }
.highlight .hll { background-color: #ffffcc }
.highlight { background: #ffffff; }
.highlight .c { color: #888888 } /* Comment */
.highlight .err { color: #a61717; background-color: #e3d2d2 } /* Error */
.highlight .k { color: #008800; font-weight: bold } /* Keyword */
.highlight .ch { color: #888888 } /* Comment.Hashbang */
.highlight .cm { color: #888888 } /* Comment.Multiline */
.highlight .cp { color: #cc0000; font-weight: bold } /* Comment.Preproc */
.highlight .cpf { color: #888888 } /* Comment.PreprocFile */
.highlight .c1 { color: #888888 } /* Comment.Single */
.highlight .cs { color: #cc0000; font-weight: bold; background-color: #fff0f0 } /* Comment.Special */
.highlight .gd { color: #000000; background-color: #ffdddd } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .gr { color: #aa0000 } /* Generic.Error */
.highlight .gh { color: #333333 } /* Generic.Heading */
.highlight .gi { color: #000000; background-color: #ddffdd } /* Generic.Inserted */
.highlight .go { color: #888888 } /* Generic.Output */
.highlight .gp { color: #555555 } /* Generic.Prompt */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #666666 } /* Generic.Subheading */
.highlight .gt { color: #aa0000 } /* Generic.Traceback */
.highlight .kc { color: #008800; font-weight: bold } /* Keyword.Constant */
.highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */
.highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */
.highlight .kp { color: #008800 } /* Keyword.Pseudo */
.highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */
.highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */
.highlight .m { color: #0000DD; font-weight: bold } /* Literal.Number */
.highlight .s { color: #dd2200; background-color: #fff0f0 } /* Literal.String */
.highlight .na { color: #336699 } /* Name.Attribute */
.highlight .nb { color: #003388 } /* Name.Builtin */
.highlight .nc { color: #bb0066; font-weight: bold } /* Name.Class */
.highlight .no { color: #003366; font-weight: bold } /* Name.Constant */
.highlight .nd { color: #555555 } /* Name.Decorator */
.highlight .ne { color: #bb0066; font-weight: bold } /* Name.Exception */
.highlight .nf { color: #0066bb; font-weight: bold } /* Name.Function */
.highlight .nl { color: #336699; font-style: italic } /* Name.Label */
.highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */
.highlight .py { color: #336699; font-weight: bold } /* Name.Property */
.highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */
.highlight .nv { color: #336699 } /* Name.Variable */
.highlight .ow { color: #008800 } /* Operator.Word */
.highlight .w { color: #bbbbbb } /* Text.Whitespace */
.highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */
.highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */
.highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */
.highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */
.highlight .mo { color: #0000DD; font-weight: bold } /* Literal.Number.Oct */
.highlight .sa { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Affix */
.highlight .sb { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Backtick */
.highlight .sc { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Char */
.highlight .dl { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Delimiter */
.highlight .sd { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Doc */
.highlight .s2 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Double */
.highlight .se { color: #0044dd; background-color: #fff0f0 } /* Literal.String.Escape */
.highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */
.highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */
.highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */
.highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */
.highlight
from six.moves import cStringIO as StringIO

from pathod import pathod
from netlib import tcp
from netlib.exceptions import HttpException, TlsException
import tutils


class TestPathod(object):

    def test_logging(self):
        s = StringIO()
        p = pathod.Pathod(("127.0.0.1", 0), logfp=s)
        assert len(p.get_log()) == 0
        id = p.add_log(dict(s="foo"))
        assert p.log_by_id(id)
        assert len(p.get_log()) == 1
        p.clear_log()
        assert len(p.get_log()) == 0

        for _ in range(p.LOGBUF + 1):
            p.add_log(dict(s="foo"))
        assert len(p.get_log()) <= p.LOGBUF


class TestTimeout(tutils.DaemonTests):
    timeout = 0.01

    def test_timeout(self):
        # FIXME: Add float values to spec language, reduce test timeout to
        # increase test performance
        # This is a bodge - we have some platform difference that causes
        # different exceptions to be raised here.
        tutils.raises(Exception, self.pathoc, ["get:/:p1,1"])
        assert self.d.last_log()["type"] == "timeout"


class TestNotAfterConnect(tutils.DaemonTests):
    ssl = False
    ssloptions = dict(
        not_after_connect=True
    )

    def test_connect(self):
        r, _ = self.pathoc(
            [r"get:'http://foo.com/p/202':da"],
            connect_to=("localhost", self.d.port)
        )
        assert r[0].status_code == 202


class TestCustomCert(tutils.DaemonTests):
    ssl = True
    ssloptions = dict(
        certs=[("*", tutils.test_data.path("data/testkey.pem"))],
    )

    def test_connect(self):
        r, _ = self.pathoc([r"get:/p/202"])
        r = r[0]
        assert r.status_code == 202
        assert r.sslinfo
        assert "test.com" in str(r.sslinfo.certchain[0].get_subject())


class TestSSLCN(tutils.DaemonTests):
    ssl = True
    ssloptions = dict(
        cn="foo.com"
    )

    def test_connect(self):
        r, _ = self.pathoc([r"get:/p/202"])
        r = r[0]
        assert r.status_code == 202
        assert r.sslinfo
        assert r.sslinfo.certchain[0].get_subject().CN == "foo.com"


class TestNohang(tutils.DaemonTests):
    nohang = True

    def test_nohang(self):
        r = self.get("200:p0,0")
        assert r.status_code == 800
        l = self.d.last_log()
        assert "Pauses have been disabled" in l["response"]["msg"]


class TestHexdump(tutils.DaemonTests):
    hexdump = True

    def test_hexdump(self):
        assert self.get(r"200:b'\xf0'")


class TestNocraft(tutils.DaemonTests):
    nocraft = True

    def test_nocraft(self):
        r = self.get(r"200:b'\xf0'")
        assert r.status_code == 800
        assert "Crafting disabled" in r.content


class CommonTests(tutils.DaemonTests):

    def test_binarydata(self):
        assert self.get(r"200:b'\xf0'")
        assert self.d.last_log()
        # FIXME: Other binary data elements

    def test_sizelimit(self):
        r = self.get("200:b@1g")
        assert r.status_code == 800
        l = self.d.last_log()
        assert "too large" in l["response"]["msg"]

    def test_preline(self):
        r, _ = self.pathoc([r"get:'/p/200':i0,'\r\n'"])
        assert r[0].status_code == 200

    def test_logs(self):
        self.d.clear_log()
        assert self.get("202:da")
        assert self.d.expect_log(1)
        self.d.clear_log()
        assert len(self.d.log()) == 0

    def test_disconnect(self):
        tutils.raises("unexpected eof", self.get, "202:b@100k:d200")

    def test_parserr(self):
        rsp = self.get("400:msg,b:")
        assert rsp.status_code == 800

    def test_static(self):
        rsp = self.get("200:b<file")
        assert rsp.status_code == 200
        assert rsp.content.strip() == "testfile"

    def test_anchor(self):
        rsp = self.getpath("/anchor/foo")
        assert rsp.status_code == 202

    def test_invalid_first_line(self):
        c = tcp.TCPClient(("localhost", self.d.port))
        with c.connect():
            if self.ssl:
                c.convert_to_ssl()
            c.wfile.write("foo\n\n\n")
            c.wfile.flush()
            l = self.d.last_log()
            assert l["type"] == "error"
            assert "foo" in l["msg"]

    def test_invalid_content_length(self):
        tutils.raises(
            HttpException,
            self.pathoc,
            ["get:/:h'content-length'='foo'"]
        )
        l = self.d.last_log()
        assert l["type"] == "error"
        assert "Unparseable Content Length" in l["msg"]

    def test_invalid_headers(self):
        tutils.raises(HttpException, self.pathoc, ["get:/:h'\t'='foo'"])
        l = self.d.last_log()
        assert l["type"] == "error"
        assert "Invalid headers" in l["msg"]

    def test_access_denied(self):
        rsp = self.get("=nonexistent")
        assert rsp.status_code == 800

    def test_source_access_denied(self):
        rsp = self.get("200:b</foo")
        assert rsp.status_code == 800
        assert "File access denied" in rsp.content

    def test_proxy(self):
        r, _ = self.pathoc([r"get:'http://foo.com/p/202':da"])
        assert r[0].status_code == 202

    def test_websocket(self):
        r, _ = self.pathoc(["ws:/p/"], ws_read_limit=0)
        assert r[0].status_code == 101

        r, _ = self.pathoc(["ws:/p/ws"], ws_read_limit=0)
        assert r[0].status_code == 101

    def test_websocket_frame(self):
        r, _ = self.pathoc(
            ["ws:/p/", "wf:f'wf:b\"test\"':pa,1"],
            ws_read_limit=1
        )
        assert r[1].payload == "test"

    def test_websocket_frame_reflect_error(self):
        r, _ = self.pathoc(
            ["ws:/p/", "wf:-mask:knone:f'wf:b@10':i13,'a'"],
            ws_read_limit=1,
            timeout=1
        )
        # FIXME: Race Condition?
        assert "Parse error" in self.d.text_log()

    def test_websocket_frame_disconnect_error(self):
        self.pathoc(["ws:/p/", "wf:b@10:d3"], ws_read_limit=0)
        assert self.d.last_log()


class TestDaemon(CommonTests):
    ssl = False

    def test_connect(self):
        r, _ = self.pathoc(
            [r"get:'http://foo.com/p/202':da"],
            connect_to=("localhost", self.d.port),
            ssl=True
        )
        assert r[0].status_code == 202

    def test_connect_err(self):
        tutils.raises(
            HttpException,
            self.pathoc,
            [r"get:'http://foo.com/p/202':da"],
            connect_to=("localhost", self.d.port)
        )


class TestDaemonSSL(CommonTests):
    ssl = True

    def test_ssl_conn_failure(self):
        c = tcp.TCPClient(("localhost", self.d.port))
        c.rbufsize = 0
        c.wbufsize = 0
        with c.connect():
            c.wfile.write("\0\0\0\0")
            tutils.raises(TlsException, c.convert_to_ssl)
            l = self.d.last_log()
            assert l["type"] == "error"
            assert "SSL" in l["msg"]

    def test_ssl_cipher(self):
        r, _ = self.pathoc([r"get:/p/202"])
        assert r[0].status_code == 202
        assert self.d.last_log()["cipher"][1] > 0


class TestHTTP2(tutils.DaemonTests):
    ssl = True
    nohang = True

    if tcp.HAS_ALPN:

        def test_http2(self):
            r, _ = self.pathoc(["GET:/"], ssl=True, use_http2=True)
            assert r[0].status_code == 800