diff options
Diffstat (limited to 'test/pathod/language/test_http.py')
-rw-r--r-- | test/pathod/language/test_http.py | 355 |
1 files changed, 355 insertions, 0 deletions
diff --git a/test/pathod/language/test_http.py b/test/pathod/language/test_http.py new file mode 100644 index 00000000..a5b35c05 --- /dev/null +++ b/test/pathod/language/test_http.py @@ -0,0 +1,355 @@ +import io +import pytest + +from pathod import language +from pathod.language import http, base + +from .. import tservers + + +def parse_request(s): + return next(language.parse_pathoc(s)) + + +def test_make_error_response(): + d = io.BytesIO() + s = http.make_error_response("foo") + language.serve(s, d, {}) + + +class TestRequest: + + def test_nonascii(self): + with pytest.raises(Exception, match="ASCII"): + parse_request("get:\xf0") + + def test_err(self): + with pytest.raises(language.ParseException): + parse_request('GET') + + def test_simple(self): + r = parse_request('GET:"/foo"') + assert r.method.string() == b"GET" + assert r.path.string() == b"/foo" + r = parse_request('GET:/foo') + assert r.path.string() == b"/foo" + r = parse_request('GET:@1k') + assert len(r.path.string()) == 1024 + + def test_multiple(self): + r = list(language.parse_pathoc("GET:/ PUT:/")) + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"PUT" + assert len(r) == 2 + + l = """ + GET + "/foo" + ir,@1 + + PUT + + "/foo + + + + bar" + + ir,@1 + """ + r = list(language.parse_pathoc(l)) + assert len(r) == 2 + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"PUT" + + l = """ + get:"http://localhost:9999/p/200":ir,@1 + get:"http://localhost:9999/p/200":ir,@2 + """ + r = list(language.parse_pathoc(l)) + assert len(r) == 2 + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"GET" + + def test_nested_response(self): + l = "get:/p:s'200'" + r = list(language.parse_pathoc(l)) + assert len(r) == 1 + assert len(r[0].tokens) == 3 + assert isinstance(r[0].tokens[2], http.NestedResponse) + assert r[0].values({}) + + def test_render(self): + s = io.BytesIO() + r = parse_request("GET:'/foo'") + assert language.serve( + r, + s, + language.Settings(request_host="foo.com") + ) + + def test_multiline(self): + l = """ + GET + "/foo" + ir,@1 + """ + r = parse_request(l) + assert r.method.string() == b"GET" + assert r.path.string() == b"/foo" + assert r.actions + + l = """ + GET + + "/foo + + + + bar" + + ir,@1 + """ + r = parse_request(l) + assert r.method.string() == b"GET" + assert r.path.string().endswith(b"bar") + assert r.actions + + def test_spec(self): + def rt(s): + s = parse_request(s).spec() + assert parse_request(s).spec() == s + rt("get:/foo") + rt("get:/foo:da") + + def test_freeze(self): + r = parse_request("GET:/:b@100").freeze(language.Settings()) + assert len(r.spec()) > 100 + + def test_path_generator(self): + r = parse_request("GET:@100").freeze(language.Settings()) + assert len(r.spec()) > 100 + + def test_websocket(self): + r = parse_request('ws:/path/') + res = r.resolve(language.Settings()) + assert res.method.string().lower() == b"get" + assert res.tok(http.Path).value.val == b"/path/" + assert res.tok(http.Method).value.val.lower() == b"get" + assert http.get_header(b"Upgrade", res.headers).value.val == b"websocket" + + r = parse_request('ws:put:/path/') + res = r.resolve(language.Settings()) + assert r.method.string().lower() == b"put" + assert res.tok(http.Path).value.val == b"/path/" + assert res.tok(http.Method).value.val.lower() == b"put" + assert http.get_header(b"Upgrade", res.headers).value.val == b"websocket" + + +class TestResponse: + + def dummy_response(self): + return next(language.parse_pathod("400'msg'")) + + def test_response(self): + r = next(language.parse_pathod("400:m'msg'")) + assert r.status_code.string() == b"400" + assert r.reason.string() == b"msg" + + r = next(language.parse_pathod("400:m'msg':b@100b")) + assert r.reason.string() == b"msg" + assert r.body.values({}) + assert str(r) + + r = next(language.parse_pathod("200")) + assert r.status_code.string() == b"200" + assert not r.reason + assert b"OK" in [i[:] for i in r.preamble({})] + + def test_render(self): + s = io.BytesIO() + r = next(language.parse_pathod("400:m'msg'")) + assert language.serve(r, s, {}) + + r = next(language.parse_pathod("400:p0,100:dr")) + assert "p0" in r.spec() + s = r.preview_safe() + assert "p0" not in s.spec() + + def test_raw(self): + s = io.BytesIO() + r = next(language.parse_pathod("400:b'foo'")) + language.serve(r, s, {}) + v = s.getvalue() + assert b"Content-Length" in v + + s = io.BytesIO() + r = next(language.parse_pathod("400:b'foo':r")) + language.serve(r, s, {}) + v = s.getvalue() + assert b"Content-Length" not in v + + def test_length(self): + def testlen(x): + s = io.BytesIO() + x = next(x) + language.serve(x, s, language.Settings()) + assert x.length(language.Settings()) == len(s.getvalue()) + testlen(language.parse_pathod("400:m'msg':r")) + testlen(language.parse_pathod("400:m'msg':h'foo'='bar':r")) + testlen(language.parse_pathod("400:m'msg':h'foo'='bar':b@100b:r")) + + def test_maximum_length(self): + def testlen(x): + x = next(x) + s = io.BytesIO() + m = x.maximum_length({}) + language.serve(x, s, {}) + assert m >= len(s.getvalue()) + + r = language.parse_pathod("400:m'msg':b@100:d0") + testlen(r) + + r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'") + testlen(r) + + r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'") + testlen(r) + + def test_parse_err(self): + with pytest.raises(language.ParseException): + language.parse_pathod("400:msg,b:") + try: + language.parse_pathod("400'msg':b:") + except language.ParseException as v: + assert v.marked() + assert str(v) + + def test_nonascii(self): + with pytest.raises(Exception, match="ASCII"): + language.parse_pathod("foo:b\xf0") + + def test_parse_header(self): + r = next(language.parse_pathod('400:h"foo"="bar"')) + assert http.get_header(b"foo", r.headers) + + def test_parse_pause_before(self): + r = next(language.parse_pathod("400:p0,10")) + assert r.actions[0].spec() == "p0,10" + + def test_parse_pause_after(self): + r = next(language.parse_pathod("400:pa,10")) + assert r.actions[0].spec() == "pa,10" + + def test_parse_pause_random(self): + r = next(language.parse_pathod("400:pr,10")) + assert r.actions[0].spec() == "pr,10" + + def test_parse_stress(self): + # While larger values are known to work on linux, len() technically + # returns an int and a python 2.7 int on windows has 32bit precision. + # Therefore, we should keep the body length < 2147483647 bytes in our + # tests. + r = next(language.parse_pathod("400:b@1g")) + assert r.length({}) + + def test_spec(self): + def rt(s): + s = next(language.parse_pathod(s)).spec() + assert next(language.parse_pathod(s)).spec() == s + rt("400:b@100g") + rt("400") + rt("400:da") + + def test_websockets(self): + r = next(language.parse_pathod("ws")) + with pytest.raises(Exception, match="No websocket key"): + r.resolve(language.Settings()) + res = r.resolve(language.Settings(websocket_key=b"foo")) + assert res.status_code.string() == b"101" + + +def test_ctype_shortcut(): + e = http.ShortcutContentType.expr() + v = e.parseString("c'foo'")[0] + assert v.key.val == b"Content-Type" + assert v.value.val == b"foo" + + s = v.spec() + assert s == e.parseString(s)[0].spec() + + e = http.ShortcutContentType.expr() + v = e.parseString("c@100")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +def test_location_shortcut(): + e = http.ShortcutLocation.expr() + v = e.parseString("l'foo'")[0] + assert v.key.val == b"Location" + assert v.value.val == b"foo" + + s = v.spec() + assert s == e.parseString(s)[0].spec() + + e = http.ShortcutLocation.expr() + v = e.parseString("l@100")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +def test_shortcuts(): + assert next(language.parse_pathod( + "400:c'foo'")).headers[0].key.val == b"Content-Type" + assert next(language.parse_pathod( + "400:l'foo'")).headers[0].key.val == b"Location" + + assert b"Android" in tservers.render(parse_request("get:/:ua")) + assert b"User-Agent" in tservers.render(parse_request("get:/:ua")) + + +def test_user_agent(): + e = http.ShortcutUserAgent.expr() + v = e.parseString("ua")[0] + assert b"Android" in v.string() + + e = http.ShortcutUserAgent.expr() + v = e.parseString("u'a'")[0] + assert b"Android" not in v.string() + + v = e.parseString("u@100'")[0] + assert len(str(v.freeze({}).value)) > 100 + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +def test_nested_response(): + e = http.NestedResponse.expr() + v = e.parseString("s'200'")[0] + assert v.value.val == b"200" + with pytest.raises(language.ParseException): + e.parseString("s'foo'") + + v = e.parseString('s"200:b@1"')[0] + assert "@1" in v.spec() + f = v.freeze({}) + assert "@1" not in f.spec() + + +def test_nested_response_freeze(): + e = http.NestedResponse( + base.TokValueLiteral( + r"200:b\'foo\':i10,\'\\x27\'" + ) + ) + assert e.freeze({}) + assert e.values({}) + + +def test_unique_components(): + with pytest.raises(Exception, match="multiple body clauses"): + language.parse_pathod("400:b@1:b@1") |