aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_language.py
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@nullcube.com>2015-05-02 16:17:00 +1200
committerAldo Cortesi <aldo@nullcube.com>2015-05-02 16:17:00 +1200
commit9109b3cc8cca05f34d3ddee062cf7f8bc401af31 (patch)
tree2d1f2f4ed115a9bc7c26eb87e697b34fe860368f /test/test_language.py
parent601cdf70c7339a59537cc8402e4f2648f398b28d (diff)
downloadmitmproxy-9109b3cc8cca05f34d3ddee062cf7f8bc401af31.tar.gz
mitmproxy-9109b3cc8cca05f34d3ddee062cf7f8bc401af31.tar.bz2
mitmproxy-9109b3cc8cca05f34d3ddee062cf7f8bc401af31.zip
Massive refactoring to split up language implementation.
Diffstat (limited to 'test/test_language.py')
-rw-r--r--test/test_language.py219
1 files changed, 108 insertions, 111 deletions
diff --git a/test/test_language.py b/test/test_language.py
index c0eafcaa..514c2180 100644
--- a/test/test_language.py
+++ b/test/test_language.py
@@ -1,63 +1,60 @@
import os
import cStringIO
-from libpathod import language, utils
+from libpathod import language
+from libpathod.language import generators, base, http, websockets, writer, exceptions
import tutils
language.TESTING = True
-def test_quote():
- assert language.quote("'\\\\'")
-
-
def parse_request(s):
return language.parse_requests(s)[0]
class TestWS:
def test_expr(self):
- v = language.WS("foo")
+ v = base.WS("foo")
assert v.expr()
assert v.values(language.Settings())
class TestValueNakedLiteral:
def test_expr(self):
- v = language.ValueNakedLiteral("foo")
+ v = base.ValueNakedLiteral("foo")
assert v.expr()
def test_spec(self):
- v = language.ValueNakedLiteral("foo")
+ v = base.ValueNakedLiteral("foo")
assert v.spec() == repr(v) == "foo"
- v = language.ValueNakedLiteral("f\x00oo")
+ v = base.ValueNakedLiteral("f\x00oo")
assert v.spec() == repr(v) == r"f\x00oo"
class TestValueLiteral:
def test_espr(self):
- v = language.ValueLiteral("foo")
+ v = base.ValueLiteral("foo")
assert v.expr()
assert v.val == "foo"
- v = language.ValueLiteral("foo\n")
+ v = base.ValueLiteral("foo\n")
assert v.expr()
assert v.val == "foo\n"
assert repr(v)
def test_spec(self):
- v = language.ValueLiteral("foo")
+ v = base.ValueLiteral("foo")
assert v.spec() == r"'foo'"
- v = language.ValueLiteral("f\x00oo")
+ v = base.ValueLiteral("f\x00oo")
assert v.spec() == repr(v) == r"'f\x00oo'"
- v = language.ValueLiteral("\"")
+ v = base.ValueLiteral("\"")
assert v.spec() == repr(v) == '\'"\''
def roundtrip(self, spec):
- e = language.ValueLiteral.expr()
- v = language.ValueLiteral(spec)
+ e = base.ValueLiteral.expr()
+ v = base.ValueLiteral(spec)
v2 = e.parseString(v.spec())
assert v.val == v2[0].val
assert v.spec() == v2[0].spec()
@@ -73,56 +70,56 @@ class TestValueLiteral:
class TestValueGenerate:
def test_basic(self):
- v = language.Value.parseString("@10b")[0]
+ v = base.Value.parseString("@10b")[0]
assert v.usize == 10
assert v.unit == "b"
assert v.bytes() == 10
- v = language.Value.parseString("@10")[0]
+ v = base.Value.parseString("@10")[0]
assert v.unit == "b"
- v = language.Value.parseString("@10k")[0]
+ v = base.Value.parseString("@10k")[0]
assert v.bytes() == 10240
- v = language.Value.parseString("@10g")[0]
+ v = base.Value.parseString("@10g")[0]
assert v.bytes() == 1024**3 * 10
- v = language.Value.parseString("@10g,digits")[0]
+ v = base.Value.parseString("@10g,digits")[0]
assert v.datatype == "digits"
g = v.get_generator({})
assert g[:100]
- v = language.Value.parseString("@10,digits")[0]
+ v = base.Value.parseString("@10,digits")[0]
assert v.unit == "b"
assert v.datatype == "digits"
def test_spec(self):
- v = language.ValueGenerate(1, "b", "bytes")
+ v = base.ValueGenerate(1, "b", "bytes")
assert v.spec() == repr(v) == "@1"
- v = language.ValueGenerate(1, "k", "bytes")
+ v = base.ValueGenerate(1, "k", "bytes")
assert v.spec() == repr(v) == "@1k"
- v = language.ValueGenerate(1, "k", "ascii")
+ v = base.ValueGenerate(1, "k", "ascii")
assert v.spec() == repr(v) == "@1k,ascii"
- v = language.ValueGenerate(1, "b", "ascii")
+ v = base.ValueGenerate(1, "b", "ascii")
assert v.spec() == repr(v) == "@1,ascii"
def test_freeze(self):
- v = language.ValueGenerate(100, "b", "ascii")
+ v = base.ValueGenerate(100, "b", "ascii")
f = v.freeze(language.Settings())
assert len(f.val) == 100
class TestValueFile:
def test_file_value(self):
- v = language.Value.parseString("<'one two'")[0]
+ v = base.Value.parseString("<'one two'")[0]
assert str(v)
assert v.path == "one two"
- v = language.Value.parseString("<path")[0]
+ v = base.Value.parseString("<path")[0]
assert v.path == "path"
def test_access_control(self):
- v = language.Value.parseString("<path")[0]
+ v = base.Value.parseString("<path")[0]
with tutils.tmpdir() as t:
p = os.path.join(t, "path")
with open(p, "wb") as f:
@@ -130,9 +127,9 @@ class TestValueFile:
assert v.get_generator(language.Settings(staticdir=t))
- v = language.Value.parseString("<path2")[0]
+ v = base.Value.parseString("<path2")[0]
tutils.raises(
- language.FileAccessDenied,
+ exceptions.FileAccessDenied,
v.get_generator,
language.Settings(staticdir=t)
)
@@ -142,7 +139,7 @@ class TestValueFile:
language.Settings()
)
- v = language.Value.parseString("</outside")[0]
+ v = base.Value.parseString("</outside")[0]
tutils.raises(
"outside",
v.get_generator,
@@ -150,24 +147,24 @@ class TestValueFile:
)
def test_spec(self):
- v = language.Value.parseString("<'one two'")[0]
- v2 = language.Value.parseString(v.spec())[0]
+ v = base.Value.parseString("<'one two'")[0]
+ v2 = base.Value.parseString(v.spec())[0]
assert v2.path == "one two"
def test_freeze(self):
- v = language.Value.parseString("<'one two'")[0]
+ v = base.Value.parseString("<'one two'")[0]
v2 = v.freeze({})
assert v2.path == v.path
class TestMisc:
def test_generators(self):
- v = language.Value.parseString("'val'")[0]
+ v = base.Value.parseString("'val'")[0]
g = v.get_generator({})
assert g[:] == "val"
def test_randomgenerator(self):
- g = language.RandomGenerator("bytes", 100)
+ g = generators.RandomGenerator("bytes", 100)
assert repr(g)
assert len(g[:10]) == 10
assert len(g[1:10]) == 9
@@ -176,7 +173,7 @@ class TestMisc:
assert g[0]
def test_literalgenerator(self):
- g = language.LiteralGenerator("one")
+ g = generators.LiteralGenerator("one")
assert repr(g)
assert g[:] == "one"
assert g[1] == "n"
@@ -187,7 +184,7 @@ class TestMisc:
f = open(path, "wb")
f.write("x"*10000)
f.close()
- g = language.FileGenerator(path)
+ g = generators.FileGenerator(path)
assert len(g) == 10000
assert g[0] == "x"
assert g[-1] == "x"
@@ -196,15 +193,15 @@ class TestMisc:
del g # remove all references to FileGenerator instance to close the file handle.
def test_value(self):
- assert language.Value.parseString("'val'")[0].val == "val"
- assert language.Value.parseString('"val"')[0].val == "val"
- assert language.Value.parseString('"\'val\'"')[0].val == "'val'"
+ assert base.Value.parseString("'val'")[0].val == "val"
+ assert base.Value.parseString('"val"')[0].val == "val"
+ assert base.Value.parseString('"\'val\'"')[0].val == "'val'"
def test_path(self):
- e = language.Path.expr()
+ e = base.Path.expr()
assert e.parseString('"/foo"')[0].value.val == "/foo"
- v = language.Path("/foo")
+ v = base.Path("/foo")
assert v.value.val == "/foo"
v = e.parseString("@100")[0]
@@ -217,7 +214,7 @@ class TestMisc:
assert s == v.expr().parseString(s)[0].spec()
def test_method(self):
- e = language.Method.expr()
+ e = base.Method.expr()
assert e.parseString("get")[0].value.val == "GET"
assert e.parseString("'foo'")[0].value.val == "foo"
assert e.parseString("'get'")[0].value.val == "get"
@@ -237,13 +234,13 @@ class TestMisc:
assert v2.value.val == v3.value.val
def test_raw(self):
- e = language.Raw.expr().parseString("r")[0]
+ e = base.Raw.expr().parseString("r")[0]
assert e
assert e.spec() == "r"
assert e.freeze({}).spec() == "r"
def test_body(self):
- e = language.Body.expr()
+ e = base.Body.expr()
v = e.parseString("b'foo'")[0]
assert v.value.val == "foo"
@@ -261,7 +258,7 @@ class TestMisc:
assert s == e.parseString(s)[0].spec()
def test_pathodspec(self):
- e = language.PathodSpec.expr()
+ e = base.PathodSpec.expr()
v = e.parseString("s'200'")[0]
assert v.value.val == "200"
tutils.raises(
@@ -276,8 +273,8 @@ class TestMisc:
assert "@1" not in f.spec()
def test_pathodspec_freeze(self):
- e = language.PathodSpec(
- language.ValueLiteral(
+ e = base.PathodSpec(
+ base.ValueLiteral(
"200:b'foo':i10,'\\''".encode(
"string_escape"
)
@@ -287,7 +284,7 @@ class TestMisc:
assert e.values({})
def test_code(self):
- e = language.Code.expr()
+ e = base.Code.expr()
v = e.parseString("200")[0]
assert v.string() == "200"
assert v.spec() == "200"
@@ -295,7 +292,7 @@ class TestMisc:
assert v.freeze({}).code == v.code
def test_reason(self):
- e = language.Reason.expr()
+ e = base.Reason.expr()
v = e.parseString("m'msg'")[0]
assert v.value.val == "msg"
@@ -309,13 +306,13 @@ class TestMisc:
def test_internal_response(self):
d = cStringIO.StringIO()
- s = language.make_error_response("foo")
+ s = http.make_error_response("foo")
language.serve(s, d, {})
class TestHeaders:
def test_header(self):
- e = language.Header.expr()
+ e = base.Header.expr()
v = e.parseString("h'foo'='bar'")[0]
assert v.key.val == "foo"
assert v.value.val == "bar"
@@ -328,7 +325,7 @@ class TestHeaders:
assert s == e.parseString(s)[0].spec()
def test_header_freeze(self):
- e = language.Header.expr()
+ e = base.Header.expr()
v = e.parseString("h@10=@10'")[0]
v2 = v.freeze({})
v3 = v2.freeze({})
@@ -336,7 +333,7 @@ class TestHeaders:
assert v2.value.val == v3.value.val
def test_ctype_shortcut(self):
- e = language.ShortcutContentType.expr()
+ e = base.ShortcutContentType.expr()
v = e.parseString("c'foo'")[0]
assert v.key.val == "Content-Type"
assert v.value.val == "foo"
@@ -344,14 +341,14 @@ class TestHeaders:
s = v.spec()
assert s == e.parseString(s)[0].spec()
- e = language.ShortcutContentType.expr()
+ e = base.ShortcutContentType.expr()
v = e.parseString("c@100")[0]
v2 = v.freeze({})
v3 = v2.freeze({})
assert v2.value.val == v3.value.val
def test_location_shortcut(self):
- e = language.ShortcutLocation.expr()
+ e = base.ShortcutLocation.expr()
v = e.parseString("l'foo'")[0]
assert v.key.val == "Location"
assert v.value.val == "foo"
@@ -359,7 +356,7 @@ class TestHeaders:
s = v.spec()
assert s == e.parseString(s)[0].spec()
- e = language.ShortcutLocation.expr()
+ e = base.ShortcutLocation.expr()
v = e.parseString("l@100")[0]
v2 = v.freeze({})
v3 = v2.freeze({})
@@ -375,7 +372,7 @@ class TestHeaders:
class TestShortcutUserAgent:
def test_location_shortcut(self):
- e = language.ShortcutUserAgent.expr()
+ e = base.ShortcutUserAgent.expr()
v = e.parseString("ua")[0]
assert "Android" in str(v.value)
assert v.spec() == "ua"
@@ -394,9 +391,9 @@ class TestShortcutUserAgent:
class Test_Action:
def test_cmp(self):
- a = language.DisconnectAt(0)
- b = language.DisconnectAt(1)
- c = language.DisconnectAt(0)
+ a = base.DisconnectAt(0)
+ b = base.DisconnectAt(1)
+ c = base.DisconnectAt(0)
assert a < b
assert a == c
l = [b, a]
@@ -405,16 +402,16 @@ class Test_Action:
def test_resolve(self):
r = parse_request('GET:"/foo"')
- e = language.DisconnectAt("r")
+ e = base.DisconnectAt("r")
ret = e.resolve({}, r)
assert isinstance(ret.offset, int)
def test_repr(self):
- e = language.DisconnectAt("r")
+ e = base.DisconnectAt("r")
assert repr(e)
def test_freeze(self):
- l = language.DisconnectAt(5)
+ l = base.DisconnectAt(5)
assert l.freeze({}).spec() == l.spec()
@@ -426,21 +423,21 @@ class TestDisconnects:
assert a.spec() == "dr"
def test_at(self):
- e = language.DisconnectAt.expr()
+ e = base.DisconnectAt.expr()
v = e.parseString("d0")[0]
- assert isinstance(v, language.DisconnectAt)
+ assert isinstance(v, base.DisconnectAt)
assert v.offset == 0
v = e.parseString("d100")[0]
assert v.offset == 100
- e = language.DisconnectAt.expr()
+ e = base.DisconnectAt.expr()
v = e.parseString("dr")[0]
assert v.offset == "r"
def test_spec(self):
- assert language.DisconnectAt("r").spec() == "dr"
- assert language.DisconnectAt(10).spec() == "d10"
+ assert base.DisconnectAt("r").spec() == "dr"
+ assert base.DisconnectAt(10).spec() == "d10"
class TestInject:
@@ -454,11 +451,11 @@ class TestInject:
assert a.offset == "a"
def test_at(self):
- e = language.InjectAt.expr()
+ e = base.InjectAt.expr()
v = e.parseString("i0,'foo'")[0]
assert v.value.val == "foo"
assert v.offset == 0
- assert isinstance(v, language.InjectAt)
+ assert isinstance(v, base.InjectAt)
v = e.parseString("ir,'foo'")[0]
assert v.offset == "r"
@@ -469,12 +466,12 @@ class TestInject:
assert language.serve(r, s, {})
def test_spec(self):
- e = language.InjectAt.expr()
+ e = base.InjectAt.expr()
v = e.parseString("i0,'foo'")[0]
assert v.spec() == 'i0,"foo"'
def test_spec(self):
- e = language.InjectAt.expr()
+ e = base.InjectAt.expr()
v = e.parseString("i0,@100")[0]
v2 = v.freeze({})
v3 = v2.freeze({})
@@ -483,7 +480,7 @@ class TestInject:
class TestPauses:
def test_parse_response(self):
- e = language.PauseAt.expr()
+ e = base.PauseAt.expr()
v = e.parseString("p10,10")[0]
assert v.seconds == 10
assert v.offset == 10
@@ -502,12 +499,12 @@ class TestPauses:
assert r.actions[0].spec() == "p10,10"
def test_spec(self):
- assert language.PauseAt("r", 5).spec() == "pr,5"
- assert language.PauseAt(0, 5).spec() == "p0,5"
- assert language.PauseAt(0, "f").spec() == "p0,f"
+ assert base.PauseAt("r", 5).spec() == "pr,5"
+ assert base.PauseAt(0, 5).spec() == "p0,5"
+ assert base.PauseAt(0, "f").spec() == "p0,f"
def test_freeze(self):
- l = language.PauseAt("r", 5)
+ l = base.PauseAt("r", 5)
assert l.freeze({}).spec() == l.spec()
@@ -567,7 +564,7 @@ class TestRequest:
r = language.parse_requests(l)
assert len(r) == 1
assert len(r[0].tokens) == 3
- assert isinstance(r[0].tokens[2], language.PathodSpec)
+ assert isinstance(r[0].tokens[2], base.PathodSpec)
assert r[0].values({})
def test_render(self):
@@ -625,21 +622,21 @@ class TestRequest:
r = parse_request('ws:/path/')
res = r.resolve(language.Settings())
assert res.method.string().lower() == "get"
- assert res.tok(language.Path).value.val == "/path/"
- assert res.tok(language.Method).value.val.lower() == "get"
- assert utils.get_header("Upgrade", res.headers).value.val == "websocket"
+ assert res.tok(base.Path).value.val == "/path/"
+ assert res.tok(base.Method).value.val.lower() == "get"
+ assert http.get_header("Upgrade", res.headers).value.val == "websocket"
r = parse_request('ws:put:/path/')
res = r.resolve(language.Settings())
assert r.method.string().lower() == "put"
- assert res.tok(language.Path).value.val == "/path/"
- assert res.tok(language.Method).value.val.lower() == "put"
- assert utils.get_header("Upgrade", res.headers).value.val == "websocket"
+ assert res.tok(base.Path).value.val == "/path/"
+ assert res.tok(base.Method).value.val.lower() == "put"
+ assert http.get_header("Upgrade", res.headers).value.val == "websocket"
class TestWebsocketFrame:
def test_spec(self):
- e = language.WebsocketFrame.expr()
+ e = websockets.WebsocketFrame.expr()
wf = e.parseString("wf:b'foo'")
assert wf
@@ -656,45 +653,45 @@ class TestWriteValues:
v = "foobarfoobar"
for bs in range(1, len(v) + 2):
s = cStringIO.StringIO()
- language.send_chunk(s, v, bs, 0, len(v))
+ writer.send_chunk(s, v, bs, 0, len(v))
assert s.getvalue() == v
for start in range(len(v)):
for end in range(len(v)):
s = cStringIO.StringIO()
- language.send_chunk(s, v, bs, start, end)
+ writer.send_chunk(s, v, bs, start, end)
assert s.getvalue() == v[start:end]
def test_write_values_inject(self):
tst = "foo"
s = cStringIO.StringIO()
- language.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5)
+ writer.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5)
assert s.getvalue() == "aaafoo"
s = cStringIO.StringIO()
- language.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
+ writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
assert s.getvalue() == "faaaoo"
s = cStringIO.StringIO()
- language.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
+ writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
assert s.getvalue() == "faaaoo"
def test_write_values_disconnects(self):
s = cStringIO.StringIO()
tst = "foo" * 100
- language.write_values(s, [tst], [(0, "disconnect")], blocksize=5)
+ writer.write_values(s, [tst], [(0, "disconnect")], blocksize=5)
assert not s.getvalue()
def test_write_values(self):
tst = "foobarvoing"
s = cStringIO.StringIO()
- language.write_values(s, [tst], [])
+ writer.write_values(s, [tst], [])
assert s.getvalue() == tst
for bs in range(1, len(tst) + 2):
for off in range(len(tst)):
s = cStringIO.StringIO()
- language.write_values(
+ writer.write_values(
s, [tst], [(off, "disconnect")], blocksize=bs
)
assert s.getvalue() == tst[:off]
@@ -703,20 +700,20 @@ class TestWriteValues:
tst = "".join(str(i) for i in range(10))
for i in range(2, 10):
s = cStringIO.StringIO()
- language.write_values(
+ writer.write_values(
s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i
)
assert s.getvalue() == tst
for i in range(2, 10):
s = cStringIO.StringIO()
- language.write_values(s, [tst], [(1, "pause", 0)], blocksize=i)
+ writer.write_values(s, [tst], [(1, "pause", 0)], blocksize=i)
assert s.getvalue() == tst
tst = ["".join(str(i) for i in range(10))] * 5
for i in range(2, 10):
s = cStringIO.StringIO()
- language.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i)
+ writer.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i)
assert s.getvalue() == "".join(tst)
def test_write_values_after(self):
@@ -816,7 +813,7 @@ class TestResponse:
def test_parse_header(self):
r = language.parse_response('400:h"foo"="bar"')
- assert utils.get_header("foo", r.headers)
+ assert http.get_header("foo", r.headers)
def test_parse_pause_before(self):
r = language.parse_response("400:p0,10")
@@ -854,28 +851,28 @@ class TestResponse:
def test_read_file():
- tutils.raises(language.FileAccessDenied, language.read_file, {}, "=/foo")
+ tutils.raises(exceptions.FileAccessDenied, base.read_file, {}, "=/foo")
p = tutils.test_data.path("data")
d = dict(staticdir=p)
- assert language.read_file(d, "+./file").strip() == "testfile"
- assert language.read_file(d, "+file").strip() == "testfile"
+ assert base.read_file(d, "+./file").strip() == "testfile"
+ assert base.read_file(d, "+file").strip() == "testfile"
tutils.raises(
- language.FileAccessDenied,
- language.read_file,
+ exceptions.FileAccessDenied,
+ base.read_file,
d,
"+./nonexistent"
)
tutils.raises(
- language.FileAccessDenied,
- language.read_file,
+ exceptions.FileAccessDenied,
+ base.read_file,
d,
"+/nonexistent"
)
tutils.raises(
- language.FileAccessDenied,
- language.read_file,
+ exceptions.FileAccessDenied,
+ base.read_file,
d,
"+../test_language.py"
)
d["unconstrained_file_access"] = True
- assert language.read_file(d, "+../test_language.py")
+ assert base.read_file(d, "+../test_language.py")