aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_rparse.py
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@nullcube.com>2012-04-28 12:42:03 +1200
committerAldo Cortesi <aldo@nullcube.com>2012-04-28 12:42:03 +1200
commitb4105be21e967f79d819749c44eff6ed4311f65d (patch)
tree723857cc38b59c5ebd35ab6c5b32d72e3a05c9a4 /test/test_rparse.py
downloadmitmproxy-b4105be21e967f79d819749c44eff6ed4311f65d.tar.gz
mitmproxy-b4105be21e967f79d819749c44eff6ed4311f65d.tar.bz2
mitmproxy-b4105be21e967f79d819749c44eff6ed4311f65d.zip
Initial checkin.
Diffstat (limited to 'test/test_rparse.py')
-rw-r--r--test/test_rparse.py262
1 files changed, 262 insertions, 0 deletions
diff --git a/test/test_rparse.py b/test/test_rparse.py
new file mode 100644
index 00000000..b872b1f3
--- /dev/null
+++ b/test/test_rparse.py
@@ -0,0 +1,262 @@
+import StringIO, sys
+import libpry
+from libpathod import rparse
+
+rparse.TESTING = True
+
+class uMisc(libpry.AutoTree):
+ def test_generators(self):
+ v = rparse.Value.parseString("val")[0]
+ g = v.get_generator({})
+ assert g[:] == "val"
+
+ def test_randomgenerator(self):
+ g = rparse.RandomGenerator("one", 100)
+ assert len(g[:10]) == 10
+ assert len(g[1:10]) == 9
+ assert len(g[:1000]) == 100
+ assert len(g[1000:1001]) == 0
+ assert g[0]
+
+ def test_literalgenerator(self):
+ g = rparse.LiteralGenerator("one")
+ assert g == "one"
+ assert g[:] == "one"
+ assert g[1] == "n"
+
+ def test_valueliteral(self):
+ v = rparse.ValueLiteral("foo")
+ assert v.expr()
+ assert str(v)
+
+ def test_generated_value(self):
+ v = rparse.Value.parseString("!10b")[0]
+ assert v.usize == 10
+ assert v.unit == "b"
+ assert v.bytes() == 10
+ v = rparse.Value.parseString("!10")[0]
+ assert v.unit == "b"
+ v = rparse.Value.parseString("!10k")[0]
+ assert v.bytes() == 10240
+ v = rparse.Value.parseString("!10g")[0]
+ assert v.bytes() == 1024**3 * 10
+
+ v = rparse.Value.parseString("!10g:digits")[0]
+ assert v.datatype == "digits"
+ g = v.get_generator({})
+ assert g[:100]
+
+ v = rparse.Value.parseString("!10:digits")[0]
+ assert v.unit == "b"
+ assert v.datatype == "digits"
+
+ def test_value(self):
+ assert rparse.Value.parseString("val")[0].val == "val"
+ assert rparse.Value.parseString("'val'")[0].val == "val"
+ assert rparse.Value.parseString('"val"')[0].val == "val"
+ assert rparse.Value.parseString('"\'val\'"')[0].val == "'val'"
+
+ v = rparse.Value.parseString("<path")[0]
+ assert v.path == "path"
+ v = rparse.Value.parseString("<'one two'")[0]
+ assert v.path == "one two"
+
+ def test_body(self):
+ e = rparse.Body.expr()
+ v = e.parseString("b:foo")[0]
+ assert v.value.val == "foo"
+
+ v = e.parseString("b:!100")[0]
+ assert str(v.value) == "!100b:bytes"
+
+ v = e.parseString("b:!100g:digits", parseAll=True)[0]
+ assert v.value.datatype == "digits"
+ assert str(v.value) == "!100g:digits"
+
+ def test_header(self):
+ e = rparse.Header.expr()
+ v = e.parseString("h:foo:bar")[0]
+ assert v.key.val == "foo"
+ assert v.value.val == "bar"
+
+ def test_code(self):
+ e = rparse.Code.expr()
+ v = e.parseString("200")[0]
+ assert v.code == 200
+
+ v = e.parseString("404:msg")[0]
+ assert v.code == 404
+ assert v.msg.val == "msg"
+
+ r = e.parseString("200:'foo'")[0]
+ assert r.msg.val == "foo"
+
+ r = e.parseString("200:'\"foo\"'")[0]
+ assert r.msg.val == "\"foo\""
+
+ r = e.parseString('200:"foo"')[0]
+ assert r.msg.val == "foo"
+
+ r = e.parseString('404')[0]
+ assert r.msg.val == "Not Found"
+
+ r = e.parseString('10')[0]
+ assert r.msg.val == "Unknown code"
+
+ def test_stub_response(self):
+ s = rparse.StubResponse(400, "foo")
+
+
+class uDisconnects(libpry.AutoTree):
+ def test_before(self):
+ e = rparse.DisconnectBefore.expr()
+ v = e.parseString("db")[0]
+ assert isinstance(v, rparse.DisconnectBefore)
+
+ v = e.parseString("db")[0]
+ assert isinstance(v, rparse.DisconnectBefore)
+
+ def test_random(self):
+ e = rparse.DisconnectRandom.expr()
+ v = e.parseString("dr")[0]
+ assert isinstance(v, rparse.DisconnectRandom)
+
+ v = e.parseString("dr")[0]
+ assert isinstance(v, rparse.DisconnectRandom)
+
+
+class uPauses(libpry.AutoTree):
+ def test_before(self):
+ e = rparse.PauseBefore.expr()
+ v = e.parseString("pb:10")[0]
+ assert v.value == 10
+
+ v = e.parseString("pb:forever")[0]
+ assert v.value == "forever"
+
+ def test_after(self):
+ e = rparse.PauseAfter.expr()
+ v = e.parseString("pa:10")[0]
+ assert v.value == 10
+
+ def test_random(self):
+ e = rparse.PauseRandom.expr()
+ v = e.parseString("pr:10")[0]
+ assert v.value == 10
+
+
+class uparse(libpry.AutoTree):
+ def test_parse_err(self):
+ libpry.raises(rparse.ParseException, rparse.parse, {}, "400:msg,b:")
+
+ def test_parse_header(self):
+ r = rparse.parse({}, "400,h:foo:bar")
+ assert r.get_header("foo") == "bar"
+
+ def test_parse_pause_before(self):
+ r = rparse.parse({}, "400,pb:10")
+ assert (0, 10) in r.pauses
+
+ def test_parse_pause_after(self):
+ r = rparse.parse({}, "400,pa:10")
+ assert (sys.maxint, 10) in r.pauses
+
+ def test_parse_pause_random(self):
+ r = rparse.parse({}, "400,pr:10")
+ assert ("random", 10) in r.pauses
+
+
+class DummyRequest(StringIO.StringIO):
+ def write(self, d, callback=None):
+ StringIO.StringIO.write(self, d)
+ if callback:
+ callback()
+
+ def finish(self):
+ return
+
+
+class uResponse(libpry.AutoTree):
+ def dummy_response(self):
+ return rparse.parse({}, "400:msg")
+
+ def test_response(self):
+ r = rparse.parse({}, "400:msg")
+ assert r.code == 400
+ assert r.msg == "msg"
+
+ r = rparse.parse({}, "400:msg,b:!100b")
+ assert r.msg == "msg"
+ assert r.body[:]
+ assert str(r)
+
+ def test_ready_randoms(self):
+ r = rparse.parse({}, "400:msg")
+
+ x = [(0, 5)]
+ assert r.ready_randoms(100, x) == x
+
+ x = [("random", 5)]
+ ret = r.ready_randoms(100, x)
+ assert 0 <= ret[0][0] < 100
+
+ x = [(1, 5), (0, 5)]
+ assert r.ready_randoms(100, x) == sorted(x)
+
+ def test_write_values_disconnects(self):
+ r = self.dummy_response()
+ s = DummyRequest()
+ tst = "foo"*100
+ r.write_values(s, [tst], [], "before", blocksize=5)
+ assert not s.getvalue()
+
+ def test_write_values(self):
+ tst = "foo"*1025
+ r = rparse.parse({}, "400:msg")
+
+ s = DummyRequest()
+ r.write_values(s, [tst], [], None)
+ assert s.getvalue() == tst
+
+ def test_write_values_pauses(self):
+ tst = "".join(str(i) for i in range(10))
+ r = rparse.parse({}, "400:msg")
+
+ for i in range(2, 10):
+ s = DummyRequest()
+ r.write_values(s, [tst], [(2, 0), (1, 0)], None, blocksize=i)
+ assert s.getvalue() == tst
+
+ for i in range(2, 10):
+ s = DummyRequest()
+ r.write_values(s, [tst], [(1, 0)], None, blocksize=i)
+ assert s.getvalue() == tst
+
+ tst = ["".join(str(i) for i in range(10))]*5
+ for i in range(2, 10):
+ s = DummyRequest()
+ r.write_values(s, tst[:], [(1, 0)], None, blocksize=i)
+ assert s.getvalue() == "".join(tst)
+
+ def test_render(self):
+ s = DummyRequest()
+ r = rparse.parse({}, "400:msg")
+ r.render(s)
+
+ def test_length(self):
+ def testlen(x):
+ s = DummyRequest()
+ x.render(s)
+ assert x.length() == len(s.getvalue())
+ testlen(rparse.parse({}, "400:msg"))
+ testlen(rparse.parse({}, "400:msg,h:foo:bar"))
+ testlen(rparse.parse({}, "400:msg,h:foo:bar,b:!100b"))
+
+
+tests = [
+ uResponse(),
+ uPauses(),
+ uDisconnects(),
+ uMisc(),
+ uparse()
+]