diff options
-rw-r--r-- | netlib/http.py (renamed from netlib/protocol.py) | 12 | ||||
-rw-r--r-- | test/test_http.py (renamed from test/test_protocol.py) | 86 |
2 files changed, 49 insertions, 49 deletions
diff --git a/netlib/protocol.py b/netlib/http.py index 55bcf440..c676c25c 100644 --- a/netlib/protocol.py +++ b/netlib/http.py @@ -1,11 +1,11 @@ import string, urlparse -class ProtocolError(Exception): +class HttpError(Exception): def __init__(self, code, msg): self.code, self.msg = code, msg def __str__(self): - return "ProtocolError(%s, %s)"%(self.code, self.msg) + return "HttpError(%s, %s)"%(self.code, self.msg) def parse_url(url): @@ -71,14 +71,14 @@ def read_chunked(fp, limit): except ValueError: # FIXME: Not strictly correct - this could be from the server, in which # case we should send a 502. - raise ProtocolError(400, "Invalid chunked encoding length: %s"%line) + raise HttpError(400, "Invalid chunked encoding length: %s"%line) if not length: break total += length if limit is not None and total > limit: msg = "HTTP Body too large."\ " Limit is %s, chunked content length was at least %s"%(limit, total) - raise ProtocolError(509, msg) + raise HttpError(509, msg) content += fp.read(length) line = fp.readline(5) if line != '\r\n': @@ -109,9 +109,9 @@ def read_http_body(rfile, headers, all, limit): except ValueError: # FIXME: Not strictly correct - this could be from the server, in which # case we should send a 502. - raise ProtocolError(400, "Invalid content-length header: %s"%headers["content-length"]) + raise HttpError(400, "Invalid content-length header: %s"%headers["content-length"]) if limit is not None and l > limit: - raise ProtocolError(509, "HTTP Body too large. Limit is %s, content-length was %s"%(limit, l)) + raise HttpError(509, "HTTP Body too large. Limit is %s, content-length was %s"%(limit, l)) content = rfile.read(l) elif all: content = rfile.read(limit if limit else None) diff --git a/test/test_protocol.py b/test/test_http.py index 028faadd..d272f343 100644 --- a/test/test_protocol.py +++ b/test/test_http.py @@ -1,78 +1,78 @@ import cStringIO, textwrap -from netlib import protocol, odict +from netlib import http, odict import tutils def test_has_chunked_encoding(): h = odict.ODictCaseless() - assert not protocol.has_chunked_encoding(h) + assert not http.has_chunked_encoding(h) h["transfer-encoding"] = ["chunked"] - assert protocol.has_chunked_encoding(h) + assert http.has_chunked_encoding(h) def test_read_chunked(): s = cStringIO.StringIO("1\r\na\r\n0\r\n") - tutils.raises(IOError, protocol.read_chunked, s, None) + tutils.raises(IOError, http.read_chunked, s, None) s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n") - assert protocol.read_chunked(s, None) == "a" + assert http.read_chunked(s, None) == "a" s = cStringIO.StringIO("\r\n") - tutils.raises(IOError, protocol.read_chunked, s, None) + tutils.raises(IOError, http.read_chunked, s, None) s = cStringIO.StringIO("1\r\nfoo") - tutils.raises(IOError, protocol.read_chunked, s, None) + tutils.raises(IOError, http.read_chunked, s, None) s = cStringIO.StringIO("foo\r\nfoo") - tutils.raises(protocol.ProtocolError, protocol.read_chunked, s, None) + tutils.raises(http.HttpError, http.read_chunked, s, None) def test_request_connection_close(): h = odict.ODictCaseless() - assert protocol.request_connection_close((1, 0), h) - assert not protocol.request_connection_close((1, 1), h) + assert http.request_connection_close((1, 0), h) + assert not http.request_connection_close((1, 1), h) h["connection"] = ["keep-alive"] - assert not protocol.request_connection_close((1, 1), h) + assert not http.request_connection_close((1, 1), h) def test_read_http_body(): h = odict.ODict() s = cStringIO.StringIO("testing") - assert protocol.read_http_body(s, h, False, None) == "" + assert http.read_http_body(s, h, False, None) == "" h["content-length"] = ["foo"] s = cStringIO.StringIO("testing") - tutils.raises(protocol.ProtocolError, protocol.read_http_body, s, h, False, None) + tutils.raises(http.HttpError, http.read_http_body, s, h, False, None) h["content-length"] = [5] s = cStringIO.StringIO("testing") - assert len(protocol.read_http_body(s, h, False, None)) == 5 + assert len(http.read_http_body(s, h, False, None)) == 5 s = cStringIO.StringIO("testing") - tutils.raises(protocol.ProtocolError, protocol.read_http_body, s, h, False, 4) + tutils.raises(http.HttpError, http.read_http_body, s, h, False, 4) h = odict.ODict() s = cStringIO.StringIO("testing") - assert len(protocol.read_http_body(s, h, True, 4)) == 4 + assert len(http.read_http_body(s, h, True, 4)) == 4 s = cStringIO.StringIO("testing") - assert len(protocol.read_http_body(s, h, True, 100)) == 7 + assert len(http.read_http_body(s, h, True, 100)) == 7 def test_parse_http_protocol(): - assert protocol.parse_http_protocol("HTTP/1.1") == (1, 1) - assert protocol.parse_http_protocol("HTTP/0.0") == (0, 0) - assert not protocol.parse_http_protocol("foo/0.0") + assert http.parse_http_protocol("HTTP/1.1") == (1, 1) + assert http.parse_http_protocol("HTTP/0.0") == (0, 0) + assert not http.parse_http_protocol("foo/0.0") def test_parse_init_connect(): - assert protocol.parse_init_connect("CONNECT host.com:443 HTTP/1.0") - assert not protocol.parse_init_connect("bogus") - assert not protocol.parse_init_connect("GET host.com:443 HTTP/1.0") - assert not protocol.parse_init_connect("CONNECT host.com443 HTTP/1.0") - assert not protocol.parse_init_connect("CONNECT host.com:443 foo/1.0") + assert http.parse_init_connect("CONNECT host.com:443 HTTP/1.0") + assert not http.parse_init_connect("bogus") + assert not http.parse_init_connect("GET host.com:443 HTTP/1.0") + assert not http.parse_init_connect("CONNECT host.com443 HTTP/1.0") + assert not http.parse_init_connect("CONNECT host.com:443 foo/1.0") def test_prase_init_proxy(): u = "GET http://foo.com:8888/test HTTP/1.1" - m, s, h, po, pa, httpversion = protocol.parse_init_proxy(u) + m, s, h, po, pa, httpversion = http.parse_init_proxy(u) assert m == "GET" assert s == "http" assert h == "foo.com" @@ -80,21 +80,21 @@ def test_prase_init_proxy(): assert pa == "/test" assert httpversion == (1, 1) - assert not protocol.parse_init_proxy("invalid") - assert not protocol.parse_init_proxy("GET invalid HTTP/1.1") - assert not protocol.parse_init_proxy("GET http://foo.com:8888/test foo/1.1") + assert not http.parse_init_proxy("invalid") + assert not http.parse_init_proxy("GET invalid HTTP/1.1") + assert not http.parse_init_proxy("GET http://foo.com:8888/test foo/1.1") def test_parse_init_http(): u = "GET /test HTTP/1.1" - m, u, httpversion= protocol.parse_init_http(u) + m, u, httpversion= http.parse_init_http(u) assert m == "GET" assert u == "/test" assert httpversion == (1, 1) - assert not protocol.parse_init_http("invalid") - assert not protocol.parse_init_http("GET invalid HTTP/1.1") - assert not protocol.parse_init_http("GET /test foo/1.1") + assert not http.parse_init_http("invalid") + assert not http.parse_init_http("GET invalid HTTP/1.1") + assert not http.parse_init_http("GET /test foo/1.1") class TestReadHeaders: @@ -107,7 +107,7 @@ class TestReadHeaders: data = textwrap.dedent(data) data = data.strip() s = cStringIO.StringIO(data) - h = protocol.read_headers(s) + h = http.read_headers(s) assert h == [["Header", "one"], ["Header2", "two"]] def test_read_multi(self): @@ -119,7 +119,7 @@ class TestReadHeaders: data = textwrap.dedent(data) data = data.strip() s = cStringIO.StringIO(data) - h = protocol.read_headers(s) + h = http.read_headers(s) assert h == [["Header", "one"], ["Header", "two"]] def test_read_continued(self): @@ -132,32 +132,32 @@ class TestReadHeaders: data = textwrap.dedent(data) data = data.strip() s = cStringIO.StringIO(data) - h = protocol.read_headers(s) + h = http.read_headers(s) assert h == [["Header", "one\r\n two"], ["Header2", "three"]] def test_parse_url(): - assert not protocol.parse_url("") + assert not http.parse_url("") u = "http://foo.com:8888/test" - s, h, po, pa = protocol.parse_url(u) + s, h, po, pa = http.parse_url(u) assert s == "http" assert h == "foo.com" assert po == 8888 assert pa == "/test" - s, h, po, pa = protocol.parse_url("http://foo/bar") + s, h, po, pa = http.parse_url("http://foo/bar") assert s == "http" assert h == "foo" assert po == 80 assert pa == "/bar" - s, h, po, pa = protocol.parse_url("http://foo") + s, h, po, pa = http.parse_url("http://foo") assert pa == "/" - s, h, po, pa = protocol.parse_url("https://foo") + s, h, po, pa = http.parse_url("https://foo") assert po == 443 - assert not protocol.parse_url("https://foo:bar") - assert not protocol.parse_url("https://foo:") + assert not http.parse_url("https://foo:bar") + assert not http.parse_url("https://foo:") |