aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/http/http1/test_protocol.py127
-rw-r--r--test/http/http2/test_protocol.py22
-rw-r--r--test/http/test_authentication.py44
-rw-r--r--test/http/test_exceptions.py20
-rw-r--r--test/http/test_semantics.py238
-rw-r--r--test/test_utils.py25
-rw-r--r--test/test_wsgi.py8
-rw-r--r--test/websockets/test_websockets.py12
8 files changed, 300 insertions, 196 deletions
diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py
index 6704647f..f7c615bd 100644
--- a/test/http/http1/test_protocol.py
+++ b/test/http/http1/test_protocol.py
@@ -2,7 +2,7 @@ import cStringIO
import textwrap
from netlib import http, odict, tcp, tutils
-from netlib.http import semantics
+from netlib.http import semantics, Headers
from netlib.http.http1 import HTTP1Protocol
from ... import tservers
@@ -29,164 +29,161 @@ def test_stripped_chunked_encoding_no_content():
"""
r = tutils.treq(content="")
- r.headers["Transfer-Encoding"] = ["chunked"]
+ r.headers["Transfer-Encoding"] = "chunked"
assert "Content-Length" in mock_protocol()._assemble_request_headers(r)
r = tutils.tresp(content="")
- r.headers["Transfer-Encoding"] = ["chunked"]
+ r.headers["Transfer-Encoding"] = "chunked"
assert "Content-Length" in mock_protocol()._assemble_response_headers(r)
def test_has_chunked_encoding():
- h = odict.ODictCaseless()
- assert not HTTP1Protocol.has_chunked_encoding(h)
- h["transfer-encoding"] = ["chunked"]
- assert HTTP1Protocol.has_chunked_encoding(h)
+ headers = http.Headers()
+ assert not HTTP1Protocol.has_chunked_encoding(headers)
+ headers["transfer-encoding"] = "chunked"
+ assert HTTP1Protocol.has_chunked_encoding(headers)
def test_read_chunked():
- h = odict.ODictCaseless()
- h["transfer-encoding"] = ["chunked"]
+ headers = http.Headers()
+ headers["transfer-encoding"] = "chunked"
data = "1\r\na\r\n0\r\n"
tutils.raises(
"malformed chunked body",
mock_protocol(data).read_http_body,
- h, None, "GET", None, True
+ headers, None, "GET", None, True
)
data = "1\r\na\r\n0\r\n\r\n"
- assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == "a"
+ assert mock_protocol(data).read_http_body(headers, None, "GET", None, True) == "a"
data = "\r\n\r\n1\r\na\r\n0\r\n\r\n"
- assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == "a"
+ assert mock_protocol(data).read_http_body(headers, None, "GET", None, True) == "a"
data = "\r\n"
tutils.raises(
"closed prematurely",
mock_protocol(data).read_http_body,
- h, None, "GET", None, True
+ headers, None, "GET", None, True
)
data = "1\r\nfoo"
tutils.raises(
"malformed chunked body",
mock_protocol(data).read_http_body,
- h, None, "GET", None, True
+ headers, None, "GET", None, True
)
data = "foo\r\nfoo"
tutils.raises(
http.HttpError,
mock_protocol(data).read_http_body,
- h, None, "GET", None, True
+ headers, None, "GET", None, True
)
data = "5\r\naaaaa\r\n0\r\n\r\n"
- tutils.raises("too large", mock_protocol(data).read_http_body, h, 2, "GET", None, True)
+ tutils.raises("too large", mock_protocol(data).read_http_body, headers, 2, "GET", None, True)
def test_connection_close():
- h = odict.ODictCaseless()
- assert HTTP1Protocol.connection_close((1, 0), h)
- assert not HTTP1Protocol.connection_close((1, 1), h)
+ headers = Headers()
+ assert HTTP1Protocol.connection_close((1, 0), headers)
+ assert not HTTP1Protocol.connection_close((1, 1), headers)
- h["connection"] = ["keep-alive"]
- assert not HTTP1Protocol.connection_close((1, 1), h)
+ headers["connection"] = "keep-alive"
+ assert not HTTP1Protocol.connection_close((1, 1), headers)
- h["connection"] = ["close"]
- assert HTTP1Protocol.connection_close((1, 1), h)
+ headers["connection"] = "close"
+ assert HTTP1Protocol.connection_close((1, 1), headers)
def test_read_http_body_request():
- h = odict.ODictCaseless()
+ headers = Headers()
data = "testing"
- assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == ""
+ assert mock_protocol(data).read_http_body(headers, None, "GET", None, True) == ""
def test_read_http_body_response():
- h = odict.ODictCaseless()
+ headers = Headers()
data = "testing"
- assert mock_protocol(data).read_http_body(h, None, "GET", 200, False) == "testing"
+ assert mock_protocol(data).read_http_body(headers, None, "GET", 200, False) == "testing"
def test_read_http_body():
# test default case
- h = odict.ODictCaseless()
- h["content-length"] = [7]
+ headers = Headers()
+ headers["content-length"] = "7"
data = "testing"
- assert mock_protocol(data).read_http_body(h, None, "GET", 200, False) == "testing"
+ assert mock_protocol(data).read_http_body(headers, None, "GET", 200, False) == "testing"
# test content length: invalid header
- h["content-length"] = ["foo"]
+ headers["content-length"] = "foo"
data = "testing"
tutils.raises(
http.HttpError,
mock_protocol(data).read_http_body,
- h, None, "GET", 200, False
+ headers, None, "GET", 200, False
)
# test content length: invalid header #2
- h["content-length"] = [-1]
+ headers["content-length"] = "-1"
data = "testing"
tutils.raises(
http.HttpError,
mock_protocol(data).read_http_body,
- h, None, "GET", 200, False
+ headers, None, "GET", 200, False
)
# test content length: content length > actual content
- h["content-length"] = [5]
+ headers["content-length"] = "5"
data = "testing"
tutils.raises(
http.HttpError,
mock_protocol(data).read_http_body,
- h, 4, "GET", 200, False
+ headers, 4, "GET", 200, False
)
# test content length: content length < actual content
data = "testing"
- assert len(mock_protocol(data).read_http_body(h, None, "GET", 200, False)) == 5
+ assert len(mock_protocol(data).read_http_body(headers, None, "GET", 200, False)) == 5
# test no content length: limit > actual content
- h = odict.ODictCaseless()
+ headers = Headers()
data = "testing"
- assert len(mock_protocol(data).read_http_body(h, 100, "GET", 200, False)) == 7
+ assert len(mock_protocol(data).read_http_body(headers, 100, "GET", 200, False)) == 7
# test no content length: limit < actual content
data = "testing"
tutils.raises(
http.HttpError,
mock_protocol(data).read_http_body,
- h, 4, "GET", 200, False
+ headers, 4, "GET", 200, False
)
# test chunked
- h = odict.ODictCaseless()
- h["transfer-encoding"] = ["chunked"]
+ headers = Headers()
+ headers["transfer-encoding"] = "chunked"
data = "5\r\naaaaa\r\n0\r\n\r\n"
- assert mock_protocol(data).read_http_body(h, 100, "GET", 200, False) == "aaaaa"
+ assert mock_protocol(data).read_http_body(headers, 100, "GET", 200, False) == "aaaaa"
def test_expected_http_body_size():
# gibber in the content-length field
- h = odict.ODictCaseless()
- h["content-length"] = ["foo"]
- assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) is None
+ headers = Headers(content_length="foo")
+ assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) is None
# negative number in the content-length field
- h = odict.ODictCaseless()
- h["content-length"] = ["-7"]
- assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) is None
+ headers = Headers(content_length="-7")
+ assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) is None
# explicit length
- h = odict.ODictCaseless()
- h["content-length"] = ["5"]
- assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) == 5
+ headers = Headers(content_length="5")
+ assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) == 5
# no length
- h = odict.ODictCaseless()
- assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) == -1
+ headers = Headers()
+ assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) == -1
# no length request
- h = odict.ODictCaseless()
- assert HTTP1Protocol.expected_http_body_size(h, True, "GET", None) == 0
+ headers = Headers()
+ assert HTTP1Protocol.expected_http_body_size(headers, True, "GET", None) == 0
def test_get_request_line():
@@ -265,8 +262,8 @@ class TestReadHeaders:
Header2: two
\r\n
"""
- h = self._read(data)
- assert h.lst == [["Header", "one"], ["Header2", "two"]]
+ headers = self._read(data)
+ assert headers.fields == [["Header", "one"], ["Header2", "two"]]
def test_read_multi(self):
data = """
@@ -274,8 +271,8 @@ class TestReadHeaders:
Header: two
\r\n
"""
- h = self._read(data)
- assert h.lst == [["Header", "one"], ["Header", "two"]]
+ headers = self._read(data)
+ assert headers.fields == [["Header", "one"], ["Header", "two"]]
def test_read_continued(self):
data = """
@@ -284,8 +281,8 @@ class TestReadHeaders:
Header2: three
\r\n
"""
- h = self._read(data)
- assert h.lst == [["Header", "one\r\n two"], ["Header2", "three"]]
+ headers = self._read(data)
+ assert headers.fields == [["Header", "one\r\n two"], ["Header2", "three"]]
def test_read_continued_err(self):
data = "\tfoo: bar\r\n"
@@ -389,7 +386,7 @@ class TestReadResponse(object):
HTTP/1.1 200
"""
assert self.tst(data, "GET", None) == http.Response(
- (1, 1), 200, '', odict.ODictCaseless(), ''
+ (1, 1), 200, '', Headers(), ''
)
def test_simple_message(self):
@@ -397,7 +394,7 @@ class TestReadResponse(object):
HTTP/1.1 200 OK
"""
assert self.tst(data, "GET", None) == http.Response(
- (1, 1), 200, 'OK', odict.ODictCaseless(), ''
+ (1, 1), 200, 'OK', Headers(), ''
)
def test_invalid_http_version(self):
@@ -419,7 +416,7 @@ class TestReadResponse(object):
HTTP/1.1 200 OK
"""
assert self.tst(data, "GET", None) == http.Response(
- (1, 1), 100, 'CONTINUE', odict.ODictCaseless(), ''
+ (1, 1), 100, 'CONTINUE', Headers(), ''
)
def test_simple_body(self):
diff --git a/test/http/http2/test_protocol.py b/test/http/http2/test_protocol.py
index 8810894f..2b7d7958 100644
--- a/test/http/http2/test_protocol.py
+++ b/test/http/http2/test_protocol.py
@@ -1,8 +1,8 @@
import OpenSSL
import mock
-from netlib import tcp, odict, http, tutils
-from netlib.http import http2
+from netlib import tcp, http, tutils
+from netlib.http import http2, Headers
from netlib.http.http2 import HTTP2Protocol
from netlib.http.http2.frame import *
from ... import tservers
@@ -229,11 +229,11 @@ class TestCreateHeaders():
c = tcp.TCPClient(("127.0.0.1", 0))
def test_create_headers(self):
- headers = [
+ headers = http.Headers([
(b':method', b'GET'),
(b':path', b'index.html'),
(b':scheme', b'https'),
- (b'foo', b'bar')]
+ (b'foo', b'bar')])
bytes = HTTP2Protocol(self.c)._create_headers(
headers, 1, end_stream=True)
@@ -248,12 +248,12 @@ class TestCreateHeaders():
.decode('hex')
def test_create_headers_multiple_frames(self):
- headers = [
+ headers = http.Headers([
(b':method', b'GET'),
(b':path', b'/'),
(b':scheme', b'https'),
(b'foo', b'bar'),
- (b'server', b'version')]
+ (b'server', b'version')])
protocol = HTTP2Protocol(self.c)
protocol.http2_settings[SettingsFrame.SETTINGS.SETTINGS_MAX_FRAME_SIZE] = 8
@@ -309,7 +309,7 @@ class TestReadRequest(tservers.ServerTestBase):
req = protocol.read_request()
assert req.stream_id
- assert req.headers.lst == [[u':method', u'GET'], [u':path', u'/'], [u':scheme', u'https']]
+ assert req.headers.fields == [[':method', 'GET'], [':path', '/'], [':scheme', 'https']]
assert req.body == b'foobar'
@@ -415,7 +415,7 @@ class TestReadResponse(tservers.ServerTestBase):
assert resp.httpversion == (2, 0)
assert resp.status_code == 200
assert resp.msg == ""
- assert resp.headers.lst == [[':status', '200'], ['etag', 'foobar']]
+ assert resp.headers.fields == [[':status', '200'], ['etag', 'foobar']]
assert resp.body == b'foobar'
assert resp.timestamp_end
@@ -442,7 +442,7 @@ class TestReadEmptyResponse(tservers.ServerTestBase):
assert resp.httpversion == (2, 0)
assert resp.status_code == 200
assert resp.msg == ""
- assert resp.headers.lst == [[':status', '200'], ['etag', 'foobar']]
+ assert resp.headers.fields == [[':status', '200'], ['etag', 'foobar']]
assert resp.body == b''
@@ -490,7 +490,7 @@ class TestAssembleRequest(object):
'',
'/',
(2, 0),
- odict.ODictCaseless([('foo', 'bar')]),
+ http.Headers([('foo', 'bar')]),
'foobar',
))
assert len(bytes) == 2
@@ -528,7 +528,7 @@ class TestAssembleResponse(object):
(2, 0),
200,
'',
- odict.ODictCaseless([('foo', 'bar')]),
+ Headers(foo="bar"),
'foobar'
))
assert len(bytes) == 2
diff --git a/test/http/test_authentication.py b/test/http/test_authentication.py
index 5261e029..17c91fe5 100644
--- a/test/http/test_authentication.py
+++ b/test/http/test_authentication.py
@@ -1,18 +1,18 @@
import binascii
-from netlib import odict, http, tutils
-from netlib.http import authentication
+from netlib import tutils
+from netlib.http import authentication, Headers
def test_parse_http_basic_auth():
vals = ("basic", "foo", "bar")
- assert http.authentication.parse_http_basic_auth(
- http.authentication.assemble_http_basic_auth(*vals)
+ assert authentication.parse_http_basic_auth(
+ authentication.assemble_http_basic_auth(*vals)
) == vals
- assert not http.authentication.parse_http_basic_auth("")
- assert not http.authentication.parse_http_basic_auth("foo bar")
+ assert not authentication.parse_http_basic_auth("")
+ assert not authentication.parse_http_basic_auth("foo bar")
v = "basic " + binascii.b2a_base64("foo")
- assert not http.authentication.parse_http_basic_auth(v)
+ assert not authentication.parse_http_basic_auth(v)
class TestPassManNonAnon:
@@ -65,35 +65,35 @@ class TestBasicProxyAuth:
def test_simple(self):
ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
- h = odict.ODictCaseless()
+ headers = Headers()
assert ba.auth_challenge_headers()
- assert not ba.authenticate(h)
+ assert not ba.authenticate(headers)
def test_authenticate_clean(self):
ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
- hdrs = odict.ODictCaseless()
+ headers = Headers()
vals = ("basic", "foo", "bar")
- hdrs[ba.AUTH_HEADER] = [authentication.assemble_http_basic_auth(*vals)]
- assert ba.authenticate(hdrs)
+ headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
+ assert ba.authenticate(headers)
- ba.clean(hdrs)
- assert not ba.AUTH_HEADER in hdrs
+ ba.clean(headers)
+ assert not ba.AUTH_HEADER in headers
- hdrs[ba.AUTH_HEADER] = [""]
- assert not ba.authenticate(hdrs)
+ headers[ba.AUTH_HEADER] = ""
+ assert not ba.authenticate(headers)
- hdrs[ba.AUTH_HEADER] = ["foo"]
- assert not ba.authenticate(hdrs)
+ headers[ba.AUTH_HEADER] = "foo"
+ assert not ba.authenticate(headers)
vals = ("foo", "foo", "bar")
- hdrs[ba.AUTH_HEADER] = [authentication.assemble_http_basic_auth(*vals)]
- assert not ba.authenticate(hdrs)
+ headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
+ assert not ba.authenticate(headers)
ba = authentication.BasicProxyAuth(authentication.PassMan(), "test")
vals = ("basic", "foo", "bar")
- hdrs[ba.AUTH_HEADER] = [authentication.assemble_http_basic_auth(*vals)]
- assert not ba.authenticate(hdrs)
+ headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
+ assert not ba.authenticate(headers)
class Bunch:
diff --git a/test/http/test_exceptions.py b/test/http/test_exceptions.py
index d7c438f7..49588d0a 100644
--- a/test/http/test_exceptions.py
+++ b/test/http/test_exceptions.py
@@ -1,26 +1,6 @@
from netlib.http.exceptions import *
-from netlib import odict
class TestHttpError:
def test_simple(self):
e = HttpError(404, "Not found")
assert str(e)
-
-class TestHttpAuthenticationError:
- def test_init(self):
- headers = odict.ODictCaseless([("foo", "bar")])
- x = HttpAuthenticationError(headers)
- assert str(x)
- assert isinstance(x.headers, odict.ODictCaseless)
- assert x.code == 407
- assert x.headers == headers
- assert "foo" in x.headers.keys()
-
- def test_header_conversion(self):
- headers = {"foo": "bar"}
- x = HttpAuthenticationError(headers)
- assert isinstance(x.headers, odict.ODictCaseless)
- assert x.headers.lst == headers.items()
-
- def test_repr(self):
- assert repr(HttpAuthenticationError()) == "Proxy Authentication Required"
diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py
index 2a799044..22fe992c 100644
--- a/test/http/test_semantics.py
+++ b/test/http/test_semantics.py
@@ -33,7 +33,7 @@ class TestRequest(object):
r = tutils.treq()
assert repr(r)
- def test_headers_odict(self):
+ def test_headers(self):
tutils.raises(AssertionError, semantics.Request,
'form_in',
'method',
@@ -54,7 +54,7 @@ class TestRequest(object):
'path',
(1, 1),
)
- assert isinstance(req.headers, odict.ODictCaseless)
+ assert isinstance(req.headers, http.Headers)
def test_equal(self):
a = tutils.treq()
@@ -76,30 +76,30 @@ class TestRequest(object):
def test_anticache(self):
req = tutils.treq()
- req.headers.add("If-Modified-Since", "foo")
- req.headers.add("If-None-Match", "bar")
+ req.headers["If-Modified-Since"] = "foo"
+ req.headers["If-None-Match"] = "bar"
req.anticache()
assert "If-Modified-Since" not in req.headers
assert "If-None-Match" not in req.headers
def test_anticomp(self):
req = tutils.treq()
- req.headers.add("Accept-Encoding", "foobar")
+ req.headers["Accept-Encoding"] = "foobar"
req.anticomp()
- assert req.headers["Accept-Encoding"] == ["identity"]
+ assert req.headers["Accept-Encoding"] == "identity"
def test_constrain_encoding(self):
req = tutils.treq()
- req.headers.add("Accept-Encoding", "identity, gzip, foo")
+ req.headers["Accept-Encoding"] = "identity, gzip, foo"
req.constrain_encoding()
- assert "foo" not in req.headers.get_first("Accept-Encoding")
+ assert "foo" not in req.headers["Accept-Encoding"]
def test_update_host(self):
req = tutils.treq()
- req.headers.add("Host", "")
+ req.headers["Host"] = ""
req.host = "foobar"
req.update_host_header()
- assert req.headers.get_first("Host") == "foobar"
+ assert req.headers["Host"] == "foobar"
def test_get_form(self):
req = tutils.treq()
@@ -113,7 +113,7 @@ class TestRequest(object):
req = tutils.treq()
req.body = "foobar"
- req.headers["Content-Type"] = [semantics.HDR_FORM_URLENCODED]
+ req.headers["Content-Type"] = semantics.HDR_FORM_URLENCODED
req.get_form()
assert req.get_form_urlencoded.called
assert not req.get_form_multipart.called
@@ -123,7 +123,7 @@ class TestRequest(object):
def test_get_form_with_multipart(self, mock_method_urlencoded, mock_method_multipart):
req = tutils.treq()
req.body = "foobar"
- req.headers["Content-Type"] = [semantics.HDR_FORM_MULTIPART]
+ req.headers["Content-Type"] = semantics.HDR_FORM_MULTIPART
req.get_form()
assert not req.get_form_urlencoded.called
assert req.get_form_multipart.called
@@ -132,23 +132,25 @@ class TestRequest(object):
req = tutils.treq("foobar")
assert req.get_form_urlencoded() == odict.ODict()
- req.headers["Content-Type"] = [semantics.HDR_FORM_URLENCODED]
+ req.headers["Content-Type"] = semantics.HDR_FORM_URLENCODED
assert req.get_form_urlencoded() == odict.ODict(utils.urldecode(req.body))
def test_get_form_multipart(self):
req = tutils.treq("foobar")
assert req.get_form_multipart() == odict.ODict()
- req.headers["Content-Type"] = [semantics.HDR_FORM_MULTIPART]
+ req.headers["Content-Type"] = semantics.HDR_FORM_MULTIPART
assert req.get_form_multipart() == odict.ODict(
utils.multipartdecode(
req.headers,
- req.body))
+ req.body
+ )
+ )
def test_set_form_urlencoded(self):
req = tutils.treq()
req.set_form_urlencoded(odict.ODict([('foo', 'bar'), ('rab', 'oof')]))
- assert req.headers.get_first("Content-Type") == semantics.HDR_FORM_URLENCODED
+ assert req.headers["Content-Type"] == semantics.HDR_FORM_URLENCODED
assert req.body
def test_get_path_components(self):
@@ -176,7 +178,7 @@ class TestRequest(object):
r = tutils.treq()
assert r.pretty_host(True) == "address"
assert r.pretty_host(False) == "address"
- r.headers["host"] = ["other"]
+ r.headers["host"] = "other"
assert r.pretty_host(True) == "other"
assert r.pretty_host(False) == "address"
r.host = None
@@ -187,7 +189,7 @@ class TestRequest(object):
assert r.pretty_host(False) is None
# Invalid IDNA
- r.headers["host"] = [".disqus.com"]
+ r.headers["host"] = ".disqus.com"
assert r.pretty_host(True) == ".disqus.com"
def test_pretty_url(self):
@@ -201,49 +203,37 @@ class TestRequest(object):
assert req.pretty_url(False) == "http://address:22/path"
def test_get_cookies_none(self):
- h = odict.ODictCaseless()
+ headers = http.Headers()
r = tutils.treq()
- r.headers = h
+ r.headers = headers
assert len(r.get_cookies()) == 0
def test_get_cookies_single(self):
- h = odict.ODictCaseless()
- h["Cookie"] = ["cookiename=cookievalue"]
r = tutils.treq()
- r.headers = h
+ r.headers = http.Headers(cookie="cookiename=cookievalue")
result = r.get_cookies()
assert len(result) == 1
assert result['cookiename'] == ['cookievalue']
def test_get_cookies_double(self):
- h = odict.ODictCaseless()
- h["Cookie"] = [
- "cookiename=cookievalue;othercookiename=othercookievalue"
- ]
r = tutils.treq()
- r.headers = h
+ r.headers = http.Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue")
result = r.get_cookies()
assert len(result) == 2
assert result['cookiename'] == ['cookievalue']
assert result['othercookiename'] == ['othercookievalue']
def test_get_cookies_withequalsign(self):
- h = odict.ODictCaseless()
- h["Cookie"] = [
- "cookiename=coo=kievalue;othercookiename=othercookievalue"
- ]
r = tutils.treq()
- r.headers = h
+ r.headers = http.Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue")
result = r.get_cookies()
assert len(result) == 2
assert result['cookiename'] == ['coo=kievalue']
assert result['othercookiename'] == ['othercookievalue']
def test_set_cookies(self):
- h = odict.ODictCaseless()
- h["Cookie"] = ["cookiename=cookievalue"]
r = tutils.treq()
- r.headers = h
+ r.headers = http.Headers(cookie="cookiename=cookievalue")
result = r.get_cookies()
result["cookiename"] = ["foo"]
r.set_cookies(result)
@@ -348,7 +338,7 @@ class TestEmptyRequest(object):
assert req
class TestResponse(object):
- def test_headers_odict(self):
+ def test_headers(self):
tutils.raises(AssertionError, semantics.Response,
(1, 1),
200,
@@ -359,7 +349,7 @@ class TestResponse(object):
(1, 1),
200,
)
- assert isinstance(resp.headers, odict.ODictCaseless)
+ assert isinstance(resp.headers, http.Headers)
def test_equal(self):
a = tutils.tresp()
@@ -374,32 +364,26 @@ class TestResponse(object):
def test_repr(self):
r = tutils.tresp()
assert "unknown content type" in repr(r)
- r.headers["content-type"] = ["foo"]
+ r.headers["content-type"] = "foo"
assert "foo" in repr(r)
assert repr(tutils.tresp(content=CONTENT_MISSING))
def test_get_cookies_none(self):
- h = odict.ODictCaseless()
resp = tutils.tresp()
- resp.headers = h
+ resp.headers = http.Headers()
assert not resp.get_cookies()
def test_get_cookies_simple(self):
- h = odict.ODictCaseless()
- h["Set-Cookie"] = ["cookiename=cookievalue"]
resp = tutils.tresp()
- resp.headers = h
+ resp.headers = http.Headers(set_cookie="cookiename=cookievalue")
result = resp.get_cookies()
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
def test_get_cookies_with_parameters(self):
- h = odict.ODictCaseless()
- h["Set-Cookie"] = [
- "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
resp = tutils.tresp()
- resp.headers = h
+ resp.headers = http.Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly")
result = resp.get_cookies()
assert len(result) == 1
assert "cookiename" in result
@@ -412,12 +396,8 @@ class TestResponse(object):
assert attrs["httponly"] == [None]
def test_get_cookies_no_value(self):
- h = odict.ODictCaseless()
- h["Set-Cookie"] = [
- "cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"
- ]
resp = tutils.tresp()
- resp.headers = h
+ resp.headers = http.Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/")
result = resp.get_cookies()
assert len(result) == 1
assert "cookiename" in result
@@ -425,10 +405,11 @@ class TestResponse(object):
assert len(result["cookiename"][0][1]) == 2
def test_get_cookies_twocookies(self):
- h = odict.ODictCaseless()
- h["Set-Cookie"] = ["cookiename=cookievalue", "othercookie=othervalue"]
resp = tutils.tresp()
- resp.headers = h
+ resp.headers = http.Headers([
+ ["Set-Cookie", "cookiename=cookievalue"],
+ ["Set-Cookie", "othercookie=othervalue"]
+ ])
result = resp.get_cookies()
assert len(result) == 2
assert "cookiename" in result
@@ -445,3 +426,148 @@ class TestResponse(object):
v = resp.get_cookies()
assert len(v) == 1
assert v["foo"] == [["bar", odict.ODictCaseless()]]
+
+
+class TestHeaders(object):
+ def _2host(self):
+ return semantics.Headers(
+ [
+ ["Host", "example.com"],
+ ["host", "example.org"]
+ ]
+ )
+
+ def test_init(self):
+ headers = semantics.Headers()
+ assert len(headers) == 0
+
+ headers = semantics.Headers([["Host", "example.com"]])
+ assert len(headers) == 1
+ assert headers["Host"] == "example.com"
+
+ headers = semantics.Headers(Host="example.com")
+ assert len(headers) == 1
+ assert headers["Host"] == "example.com"
+
+ headers = semantics.Headers(
+ [["Host", "invalid"]],
+ Host="example.com"
+ )
+ assert len(headers) == 1
+ assert headers["Host"] == "example.com"
+
+ headers = semantics.Headers(
+ [["Host", "invalid"], ["Accept", "text/plain"]],
+ Host="example.com"
+ )
+ assert len(headers) == 2
+ assert headers["Host"] == "example.com"
+ assert headers["Accept"] == "text/plain"
+
+ def test_getitem(self):
+ headers = semantics.Headers(Host="example.com")
+ assert headers["Host"] == "example.com"
+ assert headers["host"] == "example.com"
+ tutils.raises(KeyError, headers.__getitem__, "Accept")
+
+ headers = self._2host()
+ assert headers["Host"] == "example.com, example.org"
+
+ def test_str(self):
+ headers = semantics.Headers(Host="example.com")
+ assert str(headers) == "Host: example.com\r\n"
+
+ headers = semantics.Headers([
+ ["Host", "example.com"],
+ ["Accept", "text/plain"]
+ ])
+ assert str(headers) == "Host: example.com\r\nAccept: text/plain\r\n"
+
+ def test_setitem(self):
+ headers = semantics.Headers()
+ headers["Host"] = "example.com"
+ assert "Host" in headers
+ assert "host" in headers
+ assert headers["Host"] == "example.com"
+
+ headers["host"] = "example.org"
+ assert "Host" in headers
+ assert "host" in headers
+ assert headers["Host"] == "example.org"
+
+ headers["accept"] = "text/plain"
+ assert len(headers) == 2
+ assert "Accept" in headers
+ assert "Host" in headers
+
+ headers = self._2host()
+ assert len(headers.fields) == 2
+ headers["Host"] = "example.com"
+ assert len(headers.fields) == 1
+ assert "Host" in headers
+
+ def test_delitem(self):
+ headers = semantics.Headers(Host="example.com")
+ assert len(headers) == 1
+ del headers["host"]
+ assert len(headers) == 0
+ try:
+ del headers["host"]
+ except KeyError:
+ assert True
+ else:
+ assert False
+
+ headers = self._2host()
+ del headers["Host"]
+ assert len(headers) == 0
+
+ def test_keys(self):
+ headers = semantics.Headers(Host="example.com")
+ assert len(headers.keys()) == 1
+ assert headers.keys()[0] == "Host"
+
+ headers = self._2host()
+ assert len(headers.keys()) == 1
+ assert headers.keys()[0] == "Host"
+
+ def test_eq_ne(self):
+ headers1 = semantics.Headers(Host="example.com")
+ headers2 = semantics.Headers(host="example.com")
+ assert not (headers1 == headers2)
+ assert headers1 != headers2
+
+ headers1 = semantics.Headers(Host="example.com")
+ headers2 = semantics.Headers(Host="example.com")
+ assert headers1 == headers2
+ assert not (headers1 != headers2)
+
+ assert headers1 != 42
+
+ def test_get_all(self):
+ headers = self._2host()
+ assert headers.get_all("host") == ["example.com", "example.org"]
+ assert headers.get_all("accept", 42) is 42
+
+ def test_set_all(self):
+ headers = semantics.Headers(Host="example.com")
+ headers.set_all("Accept", ["text/plain"])
+ assert len(headers) == 2
+ assert "accept" in headers
+
+ headers = self._2host()
+ headers.set_all("Host", ["example.org"])
+ assert headers["host"] == "example.org"
+
+ headers.set_all("Host", ["example.org", "example.net"])
+ assert headers["host"] == "example.org, example.net"
+
+ def test_state(self):
+ headers = self._2host()
+ assert len(headers.get_state()) == 2
+ assert headers == semantics.Headers.from_state(headers.get_state())
+
+ headers2 = semantics.Headers()
+ assert headers != headers2
+ headers2.load_state(headers.get_state())
+ assert headers == headers2
diff --git a/test/test_utils.py b/test/test_utils.py
index fc7174d6..374d09ba 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -1,5 +1,5 @@
-from netlib import utils, odict, tutils
-
+from netlib import utils, tutils
+from netlib.http import Headers
def test_bidi():
b = utils.BiDi(a=1, b=2)
@@ -88,20 +88,21 @@ def test_urldecode():
def test_get_header_tokens():
- h = odict.ODictCaseless()
- assert utils.get_header_tokens(h, "foo") == []
- h["foo"] = ["bar"]
- assert utils.get_header_tokens(h, "foo") == ["bar"]
- h["foo"] = ["bar, voing"]
- assert utils.get_header_tokens(h, "foo") == ["bar", "voing"]
- h["foo"] = ["bar, voing", "oink"]
- assert utils.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]
+ headers = Headers()
+ assert utils.get_header_tokens(headers, "foo") == []
+ headers["foo"] = "bar"
+ assert utils.get_header_tokens(headers, "foo") == ["bar"]
+ headers["foo"] = "bar, voing"
+ assert utils.get_header_tokens(headers, "foo") == ["bar", "voing"]
+ headers.set_all("foo", ["bar, voing", "oink"])
+ assert utils.get_header_tokens(headers, "foo") == ["bar", "voing", "oink"]
def test_multipartdecode():
boundary = 'somefancyboundary'
- headers = odict.ODict(
- [('content-type', ('multipart/form-data; boundary=%s' % boundary))])
+ headers = Headers(
+ content_type='multipart/form-data; boundary=%s' % boundary
+ )
content = "--{0}\n" \
"Content-Disposition: form-data; name=\"field1\"\n\n" \
"value1\n" \
diff --git a/test/test_wsgi.py b/test/test_wsgi.py
index 41572d49..e26e1413 100644
--- a/test/test_wsgi.py
+++ b/test/test_wsgi.py
@@ -1,12 +1,12 @@
import cStringIO
import sys
-from netlib import wsgi, odict
+from netlib import wsgi
+from netlib.http import Headers
def tflow():
- h = odict.ODictCaseless()
- h["test"] = ["value"]
- req = wsgi.Request("http", "GET", "/", h, "")
+ headers = Headers(test="value")
+ req = wsgi.Request("http", "GET", "/", headers, "")
return wsgi.Flow(("127.0.0.1", 8888), req)
diff --git a/test/websockets/test_websockets.py b/test/websockets/test_websockets.py
index be87b20a..57cfd166 100644
--- a/test/websockets/test_websockets.py
+++ b/test/websockets/test_websockets.py
@@ -42,7 +42,7 @@ class WebSocketsEchoHandler(tcp.BaseHandler):
preamble = 'HTTP/1.1 101 %s' % status_codes.RESPONSES.get(101)
self.wfile.write(preamble + "\r\n")
headers = self.protocol.server_handshake_headers(key)
- self.wfile.write(headers.format() + "\r\n")
+ self.wfile.write(str(headers) + "\r\n")
self.wfile.flush()
self.handshake_done = True
@@ -66,8 +66,8 @@ class WebSocketsClient(tcp.TCPClient):
preamble = 'GET / HTTP/1.1'
self.wfile.write(preamble + "\r\n")
headers = self.protocol.client_handshake_headers()
- self.client_nonce = headers.get_first("sec-websocket-key")
- self.wfile.write(headers.format() + "\r\n")
+ self.client_nonce = headers["sec-websocket-key"]
+ self.wfile.write(str(headers) + "\r\n")
self.wfile.flush()
resp = http1_protocol.read_response("GET", None)
@@ -145,13 +145,13 @@ class TestWebSockets(tservers.ServerTestBase):
def test_check_server_handshake(self):
headers = self.protocol.server_handshake_headers("key")
assert self.protocol.check_server_handshake(headers)
- headers["Upgrade"] = ["not_websocket"]
+ headers["Upgrade"] = "not_websocket"
assert not self.protocol.check_server_handshake(headers)
def test_check_client_handshake(self):
headers = self.protocol.client_handshake_headers("key")
assert self.protocol.check_client_handshake(headers) == "key"
- headers["Upgrade"] = ["not_websocket"]
+ headers["Upgrade"] = "not_websocket"
assert not self.protocol.check_client_handshake(headers)
@@ -166,7 +166,7 @@ class BadHandshakeHandler(WebSocketsEchoHandler):
preamble = 'HTTP/1.1 101 %s' % status_codes.RESPONSES.get(101)
self.wfile.write(preamble + "\r\n")
headers = self.protocol.server_handshake_headers("malformed key")
- self.wfile.write(headers.format() + "\r\n")
+ self.wfile.write(str(headers) + "\r\n")
self.wfile.flush()
self.handshake_done = True