diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/http/http1/test_protocol.py | 10 | ||||
-rw-r--r-- | test/http/test_exceptions.py | 6 | ||||
-rw-r--r-- | test/http/test_semantics.py | 295 | ||||
-rw-r--r-- | test/test_utils.py | 77 | ||||
-rw-r--r-- | test/tutils.py | 68 |
5 files changed, 336 insertions, 120 deletions
diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py index b196b7a3..05bad1af 100644 --- a/test/http/http1/test_protocol.py +++ b/test/http/http1/test_protocol.py @@ -75,16 +75,6 @@ def test_connection_close(): assert HTTP1Protocol.connection_close((1, 1), h) -def test_get_header_tokens(): - h = odict.ODictCaseless() - assert http.get_header_tokens(h, "foo") == [] - h["foo"] = ["bar"] - assert http.get_header_tokens(h, "foo") == ["bar"] - h["foo"] = ["bar, voing"] - assert http.get_header_tokens(h, "foo") == ["bar", "voing"] - h["foo"] = ["bar, voing", "oink"] - assert http.get_header_tokens(h, "foo") == ["bar", "voing", "oink"] - def test_read_http_body_request(): h = odict.ODictCaseless() diff --git a/test/http/test_exceptions.py b/test/http/test_exceptions.py new file mode 100644 index 00000000..aa57f831 --- /dev/null +++ b/test/http/test_exceptions.py @@ -0,0 +1,6 @@ +from netlib.http.exceptions import * + +def test_HttpAuthenticationError(): + x = HttpAuthenticationError({"foo": "bar"}) + assert str(x) + assert "foo" in x.headers diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py index c4605302..986afc39 100644 --- a/test/http/test_semantics.py +++ b/test/http/test_semantics.py @@ -1,54 +1,267 @@ import cStringIO import textwrap import binascii +from mock import MagicMock from netlib import http, odict, tcp from netlib.http import http1 +from netlib.http.semantics import CONTENT_MISSING from .. import tutils, tservers def test_httperror(): e = http.exceptions.HttpError(404, "Not found") assert str(e) +class TestRequest: + # def test_asterisk_form_in(self): + # f = tutils.tflow(req=None) + # protocol = mock_protocol("OPTIONS * HTTP/1.1") + # f.request = HTTPRequest.from_protocol(protocol) + # + # assert f.request.form_in == "relative" + # f.request.host = f.server_conn.address.host + # f.request.port = f.server_conn.address.port + # f.request.scheme = "http" + # assert protocol.assemble(f.request) == ( + # "OPTIONS * HTTP/1.1\r\n" + # "Host: address:22\r\n" + # "Content-Length: 0\r\n\r\n") + # + # def test_relative_form_in(self): + # protocol = mock_protocol("GET /foo\xff HTTP/1.1") + # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) + # + # protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c") + # r = HTTPRequest.from_protocol(protocol) + # assert r.headers["Upgrade"] == ["h2c"] + # + # def test_expect_header(self): + # protocol = mock_protocol( + # "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar") + # r = HTTPRequest.from_protocol(protocol) + # assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" + # assert r.content == "foo" + # assert protocol.tcp_handler.rfile.read(3) == "bar" + # + # def test_authority_form_in(self): + # protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1") + # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) + # + # protocol = mock_protocol("CONNECT address:22 HTTP/1.1") + # r = HTTPRequest.from_protocol(protocol) + # r.scheme, r.host, r.port = "http", "address", 22 + # assert protocol.assemble(r) == ( + # "CONNECT address:22 HTTP/1.1\r\n" + # "Host: address:22\r\n" + # "Content-Length: 0\r\n\r\n") + # assert r.pretty_url(False) == "address:22" + # + # def test_absolute_form_in(self): + # protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1") + # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) + # + # protocol = mock_protocol("GET http://address:22/ HTTP/1.1") + # r = HTTPRequest.from_protocol(protocol) + # assert protocol.assemble(r) == ( + # "GET http://address:22/ HTTP/1.1\r\n" + # "Host: address:22\r\n" + # "Content-Length: 0\r\n\r\n") + # + # def test_http_options_relative_form_in(self): + # """ + # Exercises fix for Issue #392. + # """ + # protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1") + # r = HTTPRequest.from_protocol(protocol) + # r.host = 'address' + # r.port = 80 + # r.scheme = "http" + # assert protocol.assemble(r) == ( + # "OPTIONS /secret/resource HTTP/1.1\r\n" + # "Host: address\r\n" + # "Content-Length: 0\r\n\r\n") + # + # def test_http_options_absolute_form_in(self): + # protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1") + # r = HTTPRequest.from_protocol(protocol) + # r.host = 'address' + # r.port = 80 + # r.scheme = "http" + # assert protocol.assemble(r) == ( + # "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" + # "Host: address\r\n" + # "Content-Length: 0\r\n\r\n") -def test_parse_url(): - assert not http.parse_url("") - - u = "http://foo.com:8888/test" - s, h, po, pa = http.parse_url(u) - assert s == "http" - assert h == "foo.com" - assert po == 8888 - assert pa == "/test" - - s, h, po, pa = http.parse_url("http://foo/bar") - assert s == "http" - assert h == "foo" - assert po == 80 - assert pa == "/bar" - - s, h, po, pa = http.parse_url("http://user:pass@foo/bar") - assert s == "http" - assert h == "foo" - assert po == 80 - assert pa == "/bar" - - s, h, po, pa = http.parse_url("http://foo") - assert pa == "/" - - s, h, po, pa = http.parse_url("https://foo") - assert po == 443 - - assert not http.parse_url("https://foo:bar") - assert not http.parse_url("https://foo:") - - # Invalid IDNA - assert not http.parse_url("http://\xfafoo") - # Invalid PATH - assert not http.parse_url("http:/\xc6/localhost:56121") - # Null byte in host - assert not http.parse_url("http://foo\0") - # Port out of range - assert not http.parse_url("http://foo:999999") - # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt - assert not http.parse_url('http://lo[calhost') + def test_set_url(self): + r = tutils.treq_absolute() + r.url = "https://otheraddress:42/ORLY" + assert r.scheme == "https" + assert r.host == "otheraddress" + assert r.port == 42 + assert r.path == "/ORLY" + + def test_repr(self): + r = tutils.treq() + assert repr(r) + + def test_pretty_host(self): + r = tutils.treq() + assert r.pretty_host(True) == "address" + assert r.pretty_host(False) == "address" + r.headers["host"] = ["other"] + assert r.pretty_host(True) == "other" + assert r.pretty_host(False) == "address" + r.host = None + assert r.pretty_host(True) == "other" + assert r.pretty_host(False) is None + del r.headers["host"] + assert r.pretty_host(True) is None + assert r.pretty_host(False) is None + + # Invalid IDNA + r.headers["host"] = [".disqus.com"] + assert r.pretty_host(True) == ".disqus.com" + + def test_get_form_for_urlencoded(self): + r = tutils.treq() + r.headers.add("content-type", "application/x-www-form-urlencoded") + r.get_form_urlencoded = MagicMock() + + r.get_form() + + assert r.get_form_urlencoded.called + + def test_get_form_for_multipart(self): + r = tutils.treq() + r.headers.add("content-type", "multipart/form-data") + r.get_form_multipart = MagicMock() + + r.get_form() + + assert r.get_form_multipart.called + + def test_get_cookies_none(self): + h = odict.ODictCaseless() + r = tutils.treq() + r.headers = h + assert len(r.get_cookies()) == 0 + + def test_get_cookies_single(self): + h = odict.ODictCaseless() + h["Cookie"] = ["cookiename=cookievalue"] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + assert len(result) == 1 + assert result['cookiename'] == ['cookievalue'] + + def test_get_cookies_double(self): + h = odict.ODictCaseless() + h["Cookie"] = [ + "cookiename=cookievalue;othercookiename=othercookievalue" + ] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + assert len(result) == 2 + assert result['cookiename'] == ['cookievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_get_cookies_withequalsign(self): + h = odict.ODictCaseless() + h["Cookie"] = [ + "cookiename=coo=kievalue;othercookiename=othercookievalue" + ] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + assert len(result) == 2 + assert result['cookiename'] == ['coo=kievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_set_cookies(self): + h = odict.ODictCaseless() + h["Cookie"] = ["cookiename=cookievalue"] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + result["cookiename"] = ["foo"] + r.set_cookies(result) + assert r.get_cookies()["cookiename"] == ["foo"] + + +class TestResponse(object): + def test_repr(self): + r = tutils.tresp() + assert "unknown content type" in repr(r) + r.headers["content-type"] = ["foo"] + assert "foo" in repr(r) + assert repr(tutils.tresp(content=CONTENT_MISSING)) + + def test_get_cookies_none(self): + h = odict.ODictCaseless() + resp = tutils.tresp() + resp.headers = h + assert not resp.get_cookies() + + def test_get_cookies_simple(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = ["cookiename=cookievalue"] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", odict.ODict()] + + def test_get_cookies_with_parameters(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = [ + "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "cookievalue" + attrs = result["cookiename"][0][1] + assert len(attrs) == 4 + assert attrs["domain"] == ["example.com"] + assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"] + assert attrs["path"] == ["/"] + assert attrs["httponly"] == [None] + + def test_get_cookies_no_value(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = [ + "cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/" + ] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "" + assert len(result["cookiename"][0][1]) == 2 + + def test_get_cookies_twocookies(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = ["cookiename=cookievalue", "othercookie=othervalue"] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 2 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", odict.ODict()] + assert "othercookie" in result + assert result["othercookie"][0] == ["othervalue", odict.ODict()] + + def test_set_cookies(self): + resp = tutils.tresp() + v = resp.get_cookies() + v.add("foo", ["bar", odict.ODictCaseless()]) + resp.set_cookies(v) + + v = resp.get_cookies() + assert len(v) == 1 + assert v["foo"] == [["bar", odict.ODictCaseless()]] diff --git a/test/test_utils.py b/test/test_utils.py index 8e66bce4..0153030c 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -1,4 +1,6 @@ -from netlib import utils +import urlparse + +from netlib import utils, odict import tutils @@ -27,3 +29,76 @@ def test_pretty_size(): assert utils.pretty_size(1024) == "1kB" assert utils.pretty_size(1024 + (1024 / 2.0)) == "1.5kB" assert utils.pretty_size(1024 * 1024) == "1MB" + + + + +def test_parse_url(): + assert not utils.parse_url("") + + u = "http://foo.com:8888/test" + s, h, po, pa = utils.parse_url(u) + assert s == "http" + assert h == "foo.com" + assert po == 8888 + assert pa == "/test" + + s, h, po, pa = utils.parse_url("http://foo/bar") + assert s == "http" + assert h == "foo" + assert po == 80 + assert pa == "/bar" + + s, h, po, pa = utils.parse_url("http://user:pass@foo/bar") + assert s == "http" + assert h == "foo" + assert po == 80 + assert pa == "/bar" + + s, h, po, pa = utils.parse_url("http://foo") + assert pa == "/" + + s, h, po, pa = utils.parse_url("https://foo") + assert po == 443 + + assert not utils.parse_url("https://foo:bar") + assert not utils.parse_url("https://foo:") + + # Invalid IDNA + assert not utils.parse_url("http://\xfafoo") + # Invalid PATH + assert not utils.parse_url("http:/\xc6/localhost:56121") + # Null byte in host + assert not utils.parse_url("http://foo\0") + # Port out of range + assert not utils.parse_url("http://foo:999999") + # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt + assert not utils.parse_url('http://lo[calhost') + + +def test_unparse_url(): + assert utils.unparse_url("http", "foo.com", 99, "") == "http://foo.com:99" + assert utils.unparse_url("http", "foo.com", 80, "") == "http://foo.com" + assert utils.unparse_url("https", "foo.com", 80, "") == "https://foo.com:80" + assert utils.unparse_url("https", "foo.com", 443, "") == "https://foo.com" + + +def test_urlencode(): + assert utils.urlencode([('foo', 'bar')]) + + + +def test_urldecode(): + s = "one=two&three=four" + assert len(utils.urldecode(s)) == 2 + + +def test_get_header_tokens(): + h = odict.ODictCaseless() + assert utils.get_header_tokens(h, "foo") == [] + h["foo"] = ["bar"] + assert utils.get_header_tokens(h, "foo") == ["bar"] + h["foo"] = ["bar, voing"] + assert utils.get_header_tokens(h, "foo") == ["bar", "voing"] + h["foo"] = ["bar, voing", "oink"] + assert utils.get_header_tokens(h, "foo") == ["bar", "voing", "oink"] diff --git a/test/tutils.py b/test/tutils.py deleted file mode 100644 index 94139f6f..00000000 --- a/test/tutils.py +++ /dev/null @@ -1,68 +0,0 @@ -import cStringIO -import tempfile -import os -import shutil -from contextlib import contextmanager - -from netlib import tcp, utils - - -def treader(bytes): - """ - Construct a tcp.Read object from bytes. - """ - fp = cStringIO.StringIO(bytes) - return tcp.Reader(fp) - - -@contextmanager -def tmpdir(*args, **kwargs): - orig_workdir = os.getcwd() - temp_workdir = tempfile.mkdtemp(*args, **kwargs) - os.chdir(temp_workdir) - - yield temp_workdir - - os.chdir(orig_workdir) - shutil.rmtree(temp_workdir) - - -def raises(exc, obj, *args, **kwargs): - """ - Assert that a callable raises a specified exception. - - :exc An exception class or a string. If a class, assert that an - exception of this type is raised. If a string, assert that the string - occurs in the string representation of the exception, based on a - case-insenstivie match. - - :obj A callable object. - - :args Arguments to be passsed to the callable. - - :kwargs Arguments to be passed to the callable. - """ - try: - ret = obj(*args, **kwargs) - except Exception as v: - if isinstance(exc, basestring): - if exc.lower() in str(v).lower(): - return - else: - raise AssertionError( - "Expected %s, but caught %s" % ( - repr(str(exc)), v - ) - ) - else: - if isinstance(v, exc): - return - else: - raise AssertionError( - "Expected %s, but caught %s %s" % ( - exc.__name__, v.__class__.__name__, str(v) - ) - ) - raise AssertionError("No exception raised. Return value: {}".format(ret)) - -test_data = utils.Data(__name__) |