diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/http/http1/test_protocol.py | 127 | ||||
-rw-r--r-- | test/http/http2/test_protocol.py | 22 | ||||
-rw-r--r-- | test/http/test_authentication.py | 44 | ||||
-rw-r--r-- | test/http/test_exceptions.py | 20 | ||||
-rw-r--r-- | test/http/test_semantics.py | 238 | ||||
-rw-r--r-- | test/test_utils.py | 25 | ||||
-rw-r--r-- | test/test_wsgi.py | 8 | ||||
-rw-r--r-- | test/websockets/test_websockets.py | 12 |
8 files changed, 300 insertions, 196 deletions
diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py index 6704647f..f7c615bd 100644 --- a/test/http/http1/test_protocol.py +++ b/test/http/http1/test_protocol.py @@ -2,7 +2,7 @@ import cStringIO import textwrap from netlib import http, odict, tcp, tutils -from netlib.http import semantics +from netlib.http import semantics, Headers from netlib.http.http1 import HTTP1Protocol from ... import tservers @@ -29,164 +29,161 @@ def test_stripped_chunked_encoding_no_content(): """ r = tutils.treq(content="") - r.headers["Transfer-Encoding"] = ["chunked"] + r.headers["Transfer-Encoding"] = "chunked" assert "Content-Length" in mock_protocol()._assemble_request_headers(r) r = tutils.tresp(content="") - r.headers["Transfer-Encoding"] = ["chunked"] + r.headers["Transfer-Encoding"] = "chunked" assert "Content-Length" in mock_protocol()._assemble_response_headers(r) def test_has_chunked_encoding(): - h = odict.ODictCaseless() - assert not HTTP1Protocol.has_chunked_encoding(h) - h["transfer-encoding"] = ["chunked"] - assert HTTP1Protocol.has_chunked_encoding(h) + headers = http.Headers() + assert not HTTP1Protocol.has_chunked_encoding(headers) + headers["transfer-encoding"] = "chunked" + assert HTTP1Protocol.has_chunked_encoding(headers) def test_read_chunked(): - h = odict.ODictCaseless() - h["transfer-encoding"] = ["chunked"] + headers = http.Headers() + headers["transfer-encoding"] = "chunked" data = "1\r\na\r\n0\r\n" tutils.raises( "malformed chunked body", mock_protocol(data).read_http_body, - h, None, "GET", None, True + headers, None, "GET", None, True ) data = "1\r\na\r\n0\r\n\r\n" - assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == "a" + assert mock_protocol(data).read_http_body(headers, None, "GET", None, True) == "a" data = "\r\n\r\n1\r\na\r\n0\r\n\r\n" - assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == "a" + assert mock_protocol(data).read_http_body(headers, None, "GET", None, True) == "a" data = "\r\n" tutils.raises( "closed prematurely", mock_protocol(data).read_http_body, - h, None, "GET", None, True + headers, None, "GET", None, True ) data = "1\r\nfoo" tutils.raises( "malformed chunked body", mock_protocol(data).read_http_body, - h, None, "GET", None, True + headers, None, "GET", None, True ) data = "foo\r\nfoo" tutils.raises( http.HttpError, mock_protocol(data).read_http_body, - h, None, "GET", None, True + headers, None, "GET", None, True ) data = "5\r\naaaaa\r\n0\r\n\r\n" - tutils.raises("too large", mock_protocol(data).read_http_body, h, 2, "GET", None, True) + tutils.raises("too large", mock_protocol(data).read_http_body, headers, 2, "GET", None, True) def test_connection_close(): - h = odict.ODictCaseless() - assert HTTP1Protocol.connection_close((1, 0), h) - assert not HTTP1Protocol.connection_close((1, 1), h) + headers = Headers() + assert HTTP1Protocol.connection_close((1, 0), headers) + assert not HTTP1Protocol.connection_close((1, 1), headers) - h["connection"] = ["keep-alive"] - assert not HTTP1Protocol.connection_close((1, 1), h) + headers["connection"] = "keep-alive" + assert not HTTP1Protocol.connection_close((1, 1), headers) - h["connection"] = ["close"] - assert HTTP1Protocol.connection_close((1, 1), h) + headers["connection"] = "close" + assert HTTP1Protocol.connection_close((1, 1), headers) def test_read_http_body_request(): - h = odict.ODictCaseless() + headers = Headers() data = "testing" - assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == "" + assert mock_protocol(data).read_http_body(headers, None, "GET", None, True) == "" def test_read_http_body_response(): - h = odict.ODictCaseless() + headers = Headers() data = "testing" - assert mock_protocol(data).read_http_body(h, None, "GET", 200, False) == "testing" + assert mock_protocol(data).read_http_body(headers, None, "GET", 200, False) == "testing" def test_read_http_body(): # test default case - h = odict.ODictCaseless() - h["content-length"] = [7] + headers = Headers() + headers["content-length"] = "7" data = "testing" - assert mock_protocol(data).read_http_body(h, None, "GET", 200, False) == "testing" + assert mock_protocol(data).read_http_body(headers, None, "GET", 200, False) == "testing" # test content length: invalid header - h["content-length"] = ["foo"] + headers["content-length"] = "foo" data = "testing" tutils.raises( http.HttpError, mock_protocol(data).read_http_body, - h, None, "GET", 200, False + headers, None, "GET", 200, False ) # test content length: invalid header #2 - h["content-length"] = [-1] + headers["content-length"] = "-1" data = "testing" tutils.raises( http.HttpError, mock_protocol(data).read_http_body, - h, None, "GET", 200, False + headers, None, "GET", 200, False ) # test content length: content length > actual content - h["content-length"] = [5] + headers["content-length"] = "5" data = "testing" tutils.raises( http.HttpError, mock_protocol(data).read_http_body, - h, 4, "GET", 200, False + headers, 4, "GET", 200, False ) # test content length: content length < actual content data = "testing" - assert len(mock_protocol(data).read_http_body(h, None, "GET", 200, False)) == 5 + assert len(mock_protocol(data).read_http_body(headers, None, "GET", 200, False)) == 5 # test no content length: limit > actual content - h = odict.ODictCaseless() + headers = Headers() data = "testing" - assert len(mock_protocol(data).read_http_body(h, 100, "GET", 200, False)) == 7 + assert len(mock_protocol(data).read_http_body(headers, 100, "GET", 200, False)) == 7 # test no content length: limit < actual content data = "testing" tutils.raises( http.HttpError, mock_protocol(data).read_http_body, - h, 4, "GET", 200, False + headers, 4, "GET", 200, False ) # test chunked - h = odict.ODictCaseless() - h["transfer-encoding"] = ["chunked"] + headers = Headers() + headers["transfer-encoding"] = "chunked" data = "5\r\naaaaa\r\n0\r\n\r\n" - assert mock_protocol(data).read_http_body(h, 100, "GET", 200, False) == "aaaaa" + assert mock_protocol(data).read_http_body(headers, 100, "GET", 200, False) == "aaaaa" def test_expected_http_body_size(): # gibber in the content-length field - h = odict.ODictCaseless() - h["content-length"] = ["foo"] - assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) is None + headers = Headers(content_length="foo") + assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) is None # negative number in the content-length field - h = odict.ODictCaseless() - h["content-length"] = ["-7"] - assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) is None + headers = Headers(content_length="-7") + assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) is None # explicit length - h = odict.ODictCaseless() - h["content-length"] = ["5"] - assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) == 5 + headers = Headers(content_length="5") + assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) == 5 # no length - h = odict.ODictCaseless() - assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) == -1 + headers = Headers() + assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) == -1 # no length request - h = odict.ODictCaseless() - assert HTTP1Protocol.expected_http_body_size(h, True, "GET", None) == 0 + headers = Headers() + assert HTTP1Protocol.expected_http_body_size(headers, True, "GET", None) == 0 def test_get_request_line(): @@ -265,8 +262,8 @@ class TestReadHeaders: Header2: two \r\n """ - h = self._read(data) - assert h.lst == [["Header", "one"], ["Header2", "two"]] + headers = self._read(data) + assert headers.fields == [["Header", "one"], ["Header2", "two"]] def test_read_multi(self): data = """ @@ -274,8 +271,8 @@ class TestReadHeaders: Header: two \r\n """ - h = self._read(data) - assert h.lst == [["Header", "one"], ["Header", "two"]] + headers = self._read(data) + assert headers.fields == [["Header", "one"], ["Header", "two"]] def test_read_continued(self): data = """ @@ -284,8 +281,8 @@ class TestReadHeaders: Header2: three \r\n """ - h = self._read(data) - assert h.lst == [["Header", "one\r\n two"], ["Header2", "three"]] + headers = self._read(data) + assert headers.fields == [["Header", "one\r\n two"], ["Header2", "three"]] def test_read_continued_err(self): data = "\tfoo: bar\r\n" @@ -389,7 +386,7 @@ class TestReadResponse(object): HTTP/1.1 200 """ assert self.tst(data, "GET", None) == http.Response( - (1, 1), 200, '', odict.ODictCaseless(), '' + (1, 1), 200, '', Headers(), '' ) def test_simple_message(self): @@ -397,7 +394,7 @@ class TestReadResponse(object): HTTP/1.1 200 OK """ assert self.tst(data, "GET", None) == http.Response( - (1, 1), 200, 'OK', odict.ODictCaseless(), '' + (1, 1), 200, 'OK', Headers(), '' ) def test_invalid_http_version(self): @@ -419,7 +416,7 @@ class TestReadResponse(object): HTTP/1.1 200 OK """ assert self.tst(data, "GET", None) == http.Response( - (1, 1), 100, 'CONTINUE', odict.ODictCaseless(), '' + (1, 1), 100, 'CONTINUE', Headers(), '' ) def test_simple_body(self): diff --git a/test/http/http2/test_protocol.py b/test/http/http2/test_protocol.py index 8810894f..2b7d7958 100644 --- a/test/http/http2/test_protocol.py +++ b/test/http/http2/test_protocol.py @@ -1,8 +1,8 @@ import OpenSSL import mock -from netlib import tcp, odict, http, tutils -from netlib.http import http2 +from netlib import tcp, http, tutils +from netlib.http import http2, Headers from netlib.http.http2 import HTTP2Protocol from netlib.http.http2.frame import * from ... import tservers @@ -229,11 +229,11 @@ class TestCreateHeaders(): c = tcp.TCPClient(("127.0.0.1", 0)) def test_create_headers(self): - headers = [ + headers = http.Headers([ (b':method', b'GET'), (b':path', b'index.html'), (b':scheme', b'https'), - (b'foo', b'bar')] + (b'foo', b'bar')]) bytes = HTTP2Protocol(self.c)._create_headers( headers, 1, end_stream=True) @@ -248,12 +248,12 @@ class TestCreateHeaders(): .decode('hex') def test_create_headers_multiple_frames(self): - headers = [ + headers = http.Headers([ (b':method', b'GET'), (b':path', b'/'), (b':scheme', b'https'), (b'foo', b'bar'), - (b'server', b'version')] + (b'server', b'version')]) protocol = HTTP2Protocol(self.c) protocol.http2_settings[SettingsFrame.SETTINGS.SETTINGS_MAX_FRAME_SIZE] = 8 @@ -309,7 +309,7 @@ class TestReadRequest(tservers.ServerTestBase): req = protocol.read_request() assert req.stream_id - assert req.headers.lst == [[u':method', u'GET'], [u':path', u'/'], [u':scheme', u'https']] + assert req.headers.fields == [[':method', 'GET'], [':path', '/'], [':scheme', 'https']] assert req.body == b'foobar' @@ -415,7 +415,7 @@ class TestReadResponse(tservers.ServerTestBase): assert resp.httpversion == (2, 0) assert resp.status_code == 200 assert resp.msg == "" - assert resp.headers.lst == [[':status', '200'], ['etag', 'foobar']] + assert resp.headers.fields == [[':status', '200'], ['etag', 'foobar']] assert resp.body == b'foobar' assert resp.timestamp_end @@ -442,7 +442,7 @@ class TestReadEmptyResponse(tservers.ServerTestBase): assert resp.httpversion == (2, 0) assert resp.status_code == 200 assert resp.msg == "" - assert resp.headers.lst == [[':status', '200'], ['etag', 'foobar']] + assert resp.headers.fields == [[':status', '200'], ['etag', 'foobar']] assert resp.body == b'' @@ -490,7 +490,7 @@ class TestAssembleRequest(object): '', '/', (2, 0), - odict.ODictCaseless([('foo', 'bar')]), + http.Headers([('foo', 'bar')]), 'foobar', )) assert len(bytes) == 2 @@ -528,7 +528,7 @@ class TestAssembleResponse(object): (2, 0), 200, '', - odict.ODictCaseless([('foo', 'bar')]), + Headers(foo="bar"), 'foobar' )) assert len(bytes) == 2 diff --git a/test/http/test_authentication.py b/test/http/test_authentication.py index 5261e029..17c91fe5 100644 --- a/test/http/test_authentication.py +++ b/test/http/test_authentication.py @@ -1,18 +1,18 @@ import binascii -from netlib import odict, http, tutils -from netlib.http import authentication +from netlib import tutils +from netlib.http import authentication, Headers def test_parse_http_basic_auth(): vals = ("basic", "foo", "bar") - assert http.authentication.parse_http_basic_auth( - http.authentication.assemble_http_basic_auth(*vals) + assert authentication.parse_http_basic_auth( + authentication.assemble_http_basic_auth(*vals) ) == vals - assert not http.authentication.parse_http_basic_auth("") - assert not http.authentication.parse_http_basic_auth("foo bar") + assert not authentication.parse_http_basic_auth("") + assert not authentication.parse_http_basic_auth("foo bar") v = "basic " + binascii.b2a_base64("foo") - assert not http.authentication.parse_http_basic_auth(v) + assert not authentication.parse_http_basic_auth(v) class TestPassManNonAnon: @@ -65,35 +65,35 @@ class TestBasicProxyAuth: def test_simple(self): ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test") - h = odict.ODictCaseless() + headers = Headers() assert ba.auth_challenge_headers() - assert not ba.authenticate(h) + assert not ba.authenticate(headers) def test_authenticate_clean(self): ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test") - hdrs = odict.ODictCaseless() + headers = Headers() vals = ("basic", "foo", "bar") - hdrs[ba.AUTH_HEADER] = [authentication.assemble_http_basic_auth(*vals)] - assert ba.authenticate(hdrs) + headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals) + assert ba.authenticate(headers) - ba.clean(hdrs) - assert not ba.AUTH_HEADER in hdrs + ba.clean(headers) + assert not ba.AUTH_HEADER in headers - hdrs[ba.AUTH_HEADER] = [""] - assert not ba.authenticate(hdrs) + headers[ba.AUTH_HEADER] = "" + assert not ba.authenticate(headers) - hdrs[ba.AUTH_HEADER] = ["foo"] - assert not ba.authenticate(hdrs) + headers[ba.AUTH_HEADER] = "foo" + assert not ba.authenticate(headers) vals = ("foo", "foo", "bar") - hdrs[ba.AUTH_HEADER] = [authentication.assemble_http_basic_auth(*vals)] - assert not ba.authenticate(hdrs) + headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals) + assert not ba.authenticate(headers) ba = authentication.BasicProxyAuth(authentication.PassMan(), "test") vals = ("basic", "foo", "bar") - hdrs[ba.AUTH_HEADER] = [authentication.assemble_http_basic_auth(*vals)] - assert not ba.authenticate(hdrs) + headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals) + assert not ba.authenticate(headers) class Bunch: diff --git a/test/http/test_exceptions.py b/test/http/test_exceptions.py index d7c438f7..49588d0a 100644 --- a/test/http/test_exceptions.py +++ b/test/http/test_exceptions.py @@ -1,26 +1,6 @@ from netlib.http.exceptions import * -from netlib import odict class TestHttpError: def test_simple(self): e = HttpError(404, "Not found") assert str(e) - -class TestHttpAuthenticationError: - def test_init(self): - headers = odict.ODictCaseless([("foo", "bar")]) - x = HttpAuthenticationError(headers) - assert str(x) - assert isinstance(x.headers, odict.ODictCaseless) - assert x.code == 407 - assert x.headers == headers - assert "foo" in x.headers.keys() - - def test_header_conversion(self): - headers = {"foo": "bar"} - x = HttpAuthenticationError(headers) - assert isinstance(x.headers, odict.ODictCaseless) - assert x.headers.lst == headers.items() - - def test_repr(self): - assert repr(HttpAuthenticationError()) == "Proxy Authentication Required" diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py index 2a799044..22fe992c 100644 --- a/test/http/test_semantics.py +++ b/test/http/test_semantics.py @@ -33,7 +33,7 @@ class TestRequest(object): r = tutils.treq() assert repr(r) - def test_headers_odict(self): + def test_headers(self): tutils.raises(AssertionError, semantics.Request, 'form_in', 'method', @@ -54,7 +54,7 @@ class TestRequest(object): 'path', (1, 1), ) - assert isinstance(req.headers, odict.ODictCaseless) + assert isinstance(req.headers, http.Headers) def test_equal(self): a = tutils.treq() @@ -76,30 +76,30 @@ class TestRequest(object): def test_anticache(self): req = tutils.treq() - req.headers.add("If-Modified-Since", "foo") - req.headers.add("If-None-Match", "bar") + req.headers["If-Modified-Since"] = "foo" + req.headers["If-None-Match"] = "bar" req.anticache() assert "If-Modified-Since" not in req.headers assert "If-None-Match" not in req.headers def test_anticomp(self): req = tutils.treq() - req.headers.add("Accept-Encoding", "foobar") + req.headers["Accept-Encoding"] = "foobar" req.anticomp() - assert req.headers["Accept-Encoding"] == ["identity"] + assert req.headers["Accept-Encoding"] == "identity" def test_constrain_encoding(self): req = tutils.treq() - req.headers.add("Accept-Encoding", "identity, gzip, foo") + req.headers["Accept-Encoding"] = "identity, gzip, foo" req.constrain_encoding() - assert "foo" not in req.headers.get_first("Accept-Encoding") + assert "foo" not in req.headers["Accept-Encoding"] def test_update_host(self): req = tutils.treq() - req.headers.add("Host", "") + req.headers["Host"] = "" req.host = "foobar" req.update_host_header() - assert req.headers.get_first("Host") == "foobar" + assert req.headers["Host"] == "foobar" def test_get_form(self): req = tutils.treq() @@ -113,7 +113,7 @@ class TestRequest(object): req = tutils.treq() req.body = "foobar" - req.headers["Content-Type"] = [semantics.HDR_FORM_URLENCODED] + req.headers["Content-Type"] = semantics.HDR_FORM_URLENCODED req.get_form() assert req.get_form_urlencoded.called assert not req.get_form_multipart.called @@ -123,7 +123,7 @@ class TestRequest(object): def test_get_form_with_multipart(self, mock_method_urlencoded, mock_method_multipart): req = tutils.treq() req.body = "foobar" - req.headers["Content-Type"] = [semantics.HDR_FORM_MULTIPART] + req.headers["Content-Type"] = semantics.HDR_FORM_MULTIPART req.get_form() assert not req.get_form_urlencoded.called assert req.get_form_multipart.called @@ -132,23 +132,25 @@ class TestRequest(object): req = tutils.treq("foobar") assert req.get_form_urlencoded() == odict.ODict() - req.headers["Content-Type"] = [semantics.HDR_FORM_URLENCODED] + req.headers["Content-Type"] = semantics.HDR_FORM_URLENCODED assert req.get_form_urlencoded() == odict.ODict(utils.urldecode(req.body)) def test_get_form_multipart(self): req = tutils.treq("foobar") assert req.get_form_multipart() == odict.ODict() - req.headers["Content-Type"] = [semantics.HDR_FORM_MULTIPART] + req.headers["Content-Type"] = semantics.HDR_FORM_MULTIPART assert req.get_form_multipart() == odict.ODict( utils.multipartdecode( req.headers, - req.body)) + req.body + ) + ) def test_set_form_urlencoded(self): req = tutils.treq() req.set_form_urlencoded(odict.ODict([('foo', 'bar'), ('rab', 'oof')])) - assert req.headers.get_first("Content-Type") == semantics.HDR_FORM_URLENCODED + assert req.headers["Content-Type"] == semantics.HDR_FORM_URLENCODED assert req.body def test_get_path_components(self): @@ -176,7 +178,7 @@ class TestRequest(object): r = tutils.treq() assert r.pretty_host(True) == "address" assert r.pretty_host(False) == "address" - r.headers["host"] = ["other"] + r.headers["host"] = "other" assert r.pretty_host(True) == "other" assert r.pretty_host(False) == "address" r.host = None @@ -187,7 +189,7 @@ class TestRequest(object): assert r.pretty_host(False) is None # Invalid IDNA - r.headers["host"] = [".disqus.com"] + r.headers["host"] = ".disqus.com" assert r.pretty_host(True) == ".disqus.com" def test_pretty_url(self): @@ -201,49 +203,37 @@ class TestRequest(object): assert req.pretty_url(False) == "http://address:22/path" def test_get_cookies_none(self): - h = odict.ODictCaseless() + headers = http.Headers() r = tutils.treq() - r.headers = h + r.headers = headers assert len(r.get_cookies()) == 0 def test_get_cookies_single(self): - h = odict.ODictCaseless() - h["Cookie"] = ["cookiename=cookievalue"] r = tutils.treq() - r.headers = h + r.headers = http.Headers(cookie="cookiename=cookievalue") result = r.get_cookies() assert len(result) == 1 assert result['cookiename'] == ['cookievalue'] def test_get_cookies_double(self): - h = odict.ODictCaseless() - h["Cookie"] = [ - "cookiename=cookievalue;othercookiename=othercookievalue" - ] r = tutils.treq() - r.headers = h + r.headers = http.Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue") result = r.get_cookies() assert len(result) == 2 assert result['cookiename'] == ['cookievalue'] assert result['othercookiename'] == ['othercookievalue'] def test_get_cookies_withequalsign(self): - h = odict.ODictCaseless() - h["Cookie"] = [ - "cookiename=coo=kievalue;othercookiename=othercookievalue" - ] r = tutils.treq() - r.headers = h + r.headers = http.Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue") result = r.get_cookies() assert len(result) == 2 assert result['cookiename'] == ['coo=kievalue'] assert result['othercookiename'] == ['othercookievalue'] def test_set_cookies(self): - h = odict.ODictCaseless() - h["Cookie"] = ["cookiename=cookievalue"] r = tutils.treq() - r.headers = h + r.headers = http.Headers(cookie="cookiename=cookievalue") result = r.get_cookies() result["cookiename"] = ["foo"] r.set_cookies(result) @@ -348,7 +338,7 @@ class TestEmptyRequest(object): assert req class TestResponse(object): - def test_headers_odict(self): + def test_headers(self): tutils.raises(AssertionError, semantics.Response, (1, 1), 200, @@ -359,7 +349,7 @@ class TestResponse(object): (1, 1), 200, ) - assert isinstance(resp.headers, odict.ODictCaseless) + assert isinstance(resp.headers, http.Headers) def test_equal(self): a = tutils.tresp() @@ -374,32 +364,26 @@ class TestResponse(object): def test_repr(self): r = tutils.tresp() assert "unknown content type" in repr(r) - r.headers["content-type"] = ["foo"] + r.headers["content-type"] = "foo" assert "foo" in repr(r) assert repr(tutils.tresp(content=CONTENT_MISSING)) def test_get_cookies_none(self): - h = odict.ODictCaseless() resp = tutils.tresp() - resp.headers = h + resp.headers = http.Headers() assert not resp.get_cookies() def test_get_cookies_simple(self): - h = odict.ODictCaseless() - h["Set-Cookie"] = ["cookiename=cookievalue"] resp = tutils.tresp() - resp.headers = h + resp.headers = http.Headers(set_cookie="cookiename=cookievalue") result = resp.get_cookies() assert len(result) == 1 assert "cookiename" in result assert result["cookiename"][0] == ["cookievalue", odict.ODict()] def test_get_cookies_with_parameters(self): - h = odict.ODictCaseless() - h["Set-Cookie"] = [ - "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"] resp = tutils.tresp() - resp.headers = h + resp.headers = http.Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly") result = resp.get_cookies() assert len(result) == 1 assert "cookiename" in result @@ -412,12 +396,8 @@ class TestResponse(object): assert attrs["httponly"] == [None] def test_get_cookies_no_value(self): - h = odict.ODictCaseless() - h["Set-Cookie"] = [ - "cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/" - ] resp = tutils.tresp() - resp.headers = h + resp.headers = http.Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/") result = resp.get_cookies() assert len(result) == 1 assert "cookiename" in result @@ -425,10 +405,11 @@ class TestResponse(object): assert len(result["cookiename"][0][1]) == 2 def test_get_cookies_twocookies(self): - h = odict.ODictCaseless() - h["Set-Cookie"] = ["cookiename=cookievalue", "othercookie=othervalue"] resp = tutils.tresp() - resp.headers = h + resp.headers = http.Headers([ + ["Set-Cookie", "cookiename=cookievalue"], + ["Set-Cookie", "othercookie=othervalue"] + ]) result = resp.get_cookies() assert len(result) == 2 assert "cookiename" in result @@ -445,3 +426,148 @@ class TestResponse(object): v = resp.get_cookies() assert len(v) == 1 assert v["foo"] == [["bar", odict.ODictCaseless()]] + + +class TestHeaders(object): + def _2host(self): + return semantics.Headers( + [ + ["Host", "example.com"], + ["host", "example.org"] + ] + ) + + def test_init(self): + headers = semantics.Headers() + assert len(headers) == 0 + + headers = semantics.Headers([["Host", "example.com"]]) + assert len(headers) == 1 + assert headers["Host"] == "example.com" + + headers = semantics.Headers(Host="example.com") + assert len(headers) == 1 + assert headers["Host"] == "example.com" + + headers = semantics.Headers( + [["Host", "invalid"]], + Host="example.com" + ) + assert len(headers) == 1 + assert headers["Host"] == "example.com" + + headers = semantics.Headers( + [["Host", "invalid"], ["Accept", "text/plain"]], + Host="example.com" + ) + assert len(headers) == 2 + assert headers["Host"] == "example.com" + assert headers["Accept"] == "text/plain" + + def test_getitem(self): + headers = semantics.Headers(Host="example.com") + assert headers["Host"] == "example.com" + assert headers["host"] == "example.com" + tutils.raises(KeyError, headers.__getitem__, "Accept") + + headers = self._2host() + assert headers["Host"] == "example.com, example.org" + + def test_str(self): + headers = semantics.Headers(Host="example.com") + assert str(headers) == "Host: example.com\r\n" + + headers = semantics.Headers([ + ["Host", "example.com"], + ["Accept", "text/plain"] + ]) + assert str(headers) == "Host: example.com\r\nAccept: text/plain\r\n" + + def test_setitem(self): + headers = semantics.Headers() + headers["Host"] = "example.com" + assert "Host" in headers + assert "host" in headers + assert headers["Host"] == "example.com" + + headers["host"] = "example.org" + assert "Host" in headers + assert "host" in headers + assert headers["Host"] == "example.org" + + headers["accept"] = "text/plain" + assert len(headers) == 2 + assert "Accept" in headers + assert "Host" in headers + + headers = self._2host() + assert len(headers.fields) == 2 + headers["Host"] = "example.com" + assert len(headers.fields) == 1 + assert "Host" in headers + + def test_delitem(self): + headers = semantics.Headers(Host="example.com") + assert len(headers) == 1 + del headers["host"] + assert len(headers) == 0 + try: + del headers["host"] + except KeyError: + assert True + else: + assert False + + headers = self._2host() + del headers["Host"] + assert len(headers) == 0 + + def test_keys(self): + headers = semantics.Headers(Host="example.com") + assert len(headers.keys()) == 1 + assert headers.keys()[0] == "Host" + + headers = self._2host() + assert len(headers.keys()) == 1 + assert headers.keys()[0] == "Host" + + def test_eq_ne(self): + headers1 = semantics.Headers(Host="example.com") + headers2 = semantics.Headers(host="example.com") + assert not (headers1 == headers2) + assert headers1 != headers2 + + headers1 = semantics.Headers(Host="example.com") + headers2 = semantics.Headers(Host="example.com") + assert headers1 == headers2 + assert not (headers1 != headers2) + + assert headers1 != 42 + + def test_get_all(self): + headers = self._2host() + assert headers.get_all("host") == ["example.com", "example.org"] + assert headers.get_all("accept", 42) is 42 + + def test_set_all(self): + headers = semantics.Headers(Host="example.com") + headers.set_all("Accept", ["text/plain"]) + assert len(headers) == 2 + assert "accept" in headers + + headers = self._2host() + headers.set_all("Host", ["example.org"]) + assert headers["host"] == "example.org" + + headers.set_all("Host", ["example.org", "example.net"]) + assert headers["host"] == "example.org, example.net" + + def test_state(self): + headers = self._2host() + assert len(headers.get_state()) == 2 + assert headers == semantics.Headers.from_state(headers.get_state()) + + headers2 = semantics.Headers() + assert headers != headers2 + headers2.load_state(headers.get_state()) + assert headers == headers2 diff --git a/test/test_utils.py b/test/test_utils.py index fc7174d6..374d09ba 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -1,5 +1,5 @@ -from netlib import utils, odict, tutils - +from netlib import utils, tutils +from netlib.http import Headers def test_bidi(): b = utils.BiDi(a=1, b=2) @@ -88,20 +88,21 @@ def test_urldecode(): def test_get_header_tokens(): - h = odict.ODictCaseless() - assert utils.get_header_tokens(h, "foo") == [] - h["foo"] = ["bar"] - assert utils.get_header_tokens(h, "foo") == ["bar"] - h["foo"] = ["bar, voing"] - assert utils.get_header_tokens(h, "foo") == ["bar", "voing"] - h["foo"] = ["bar, voing", "oink"] - assert utils.get_header_tokens(h, "foo") == ["bar", "voing", "oink"] + headers = Headers() + assert utils.get_header_tokens(headers, "foo") == [] + headers["foo"] = "bar" + assert utils.get_header_tokens(headers, "foo") == ["bar"] + headers["foo"] = "bar, voing" + assert utils.get_header_tokens(headers, "foo") == ["bar", "voing"] + headers.set_all("foo", ["bar, voing", "oink"]) + assert utils.get_header_tokens(headers, "foo") == ["bar", "voing", "oink"] def test_multipartdecode(): boundary = 'somefancyboundary' - headers = odict.ODict( - [('content-type', ('multipart/form-data; boundary=%s' % boundary))]) + headers = Headers( + content_type='multipart/form-data; boundary=%s' % boundary + ) content = "--{0}\n" \ "Content-Disposition: form-data; name=\"field1\"\n\n" \ "value1\n" \ diff --git a/test/test_wsgi.py b/test/test_wsgi.py index 41572d49..e26e1413 100644 --- a/test/test_wsgi.py +++ b/test/test_wsgi.py @@ -1,12 +1,12 @@ import cStringIO import sys -from netlib import wsgi, odict +from netlib import wsgi +from netlib.http import Headers def tflow(): - h = odict.ODictCaseless() - h["test"] = ["value"] - req = wsgi.Request("http", "GET", "/", h, "") + headers = Headers(test="value") + req = wsgi.Request("http", "GET", "/", headers, "") return wsgi.Flow(("127.0.0.1", 8888), req) diff --git a/test/websockets/test_websockets.py b/test/websockets/test_websockets.py index be87b20a..57cfd166 100644 --- a/test/websockets/test_websockets.py +++ b/test/websockets/test_websockets.py @@ -42,7 +42,7 @@ class WebSocketsEchoHandler(tcp.BaseHandler): preamble = 'HTTP/1.1 101 %s' % status_codes.RESPONSES.get(101) self.wfile.write(preamble + "\r\n") headers = self.protocol.server_handshake_headers(key) - self.wfile.write(headers.format() + "\r\n") + self.wfile.write(str(headers) + "\r\n") self.wfile.flush() self.handshake_done = True @@ -66,8 +66,8 @@ class WebSocketsClient(tcp.TCPClient): preamble = 'GET / HTTP/1.1' self.wfile.write(preamble + "\r\n") headers = self.protocol.client_handshake_headers() - self.client_nonce = headers.get_first("sec-websocket-key") - self.wfile.write(headers.format() + "\r\n") + self.client_nonce = headers["sec-websocket-key"] + self.wfile.write(str(headers) + "\r\n") self.wfile.flush() resp = http1_protocol.read_response("GET", None) @@ -145,13 +145,13 @@ class TestWebSockets(tservers.ServerTestBase): def test_check_server_handshake(self): headers = self.protocol.server_handshake_headers("key") assert self.protocol.check_server_handshake(headers) - headers["Upgrade"] = ["not_websocket"] + headers["Upgrade"] = "not_websocket" assert not self.protocol.check_server_handshake(headers) def test_check_client_handshake(self): headers = self.protocol.client_handshake_headers("key") assert self.protocol.check_client_handshake(headers) == "key" - headers["Upgrade"] = ["not_websocket"] + headers["Upgrade"] = "not_websocket" assert not self.protocol.check_client_handshake(headers) @@ -166,7 +166,7 @@ class BadHandshakeHandler(WebSocketsEchoHandler): preamble = 'HTTP/1.1 101 %s' % status_codes.RESPONSES.get(101) self.wfile.write(preamble + "\r\n") headers = self.protocol.server_handshake_headers("malformed key") - self.wfile.write(headers.format() + "\r\n") + self.wfile.write(str(headers) + "\r\n") self.wfile.flush() self.handshake_done = True |