aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/http/http1/test_protocol.py10
-rw-r--r--test/http/test_exceptions.py6
-rw-r--r--test/http/test_semantics.py295
-rw-r--r--test/test_utils.py77
-rw-r--r--test/tutils.py68
5 files changed, 336 insertions, 120 deletions
diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py
index b196b7a3..05bad1af 100644
--- a/test/http/http1/test_protocol.py
+++ b/test/http/http1/test_protocol.py
@@ -75,16 +75,6 @@ def test_connection_close():
assert HTTP1Protocol.connection_close((1, 1), h)
-def test_get_header_tokens():
- h = odict.ODictCaseless()
- assert http.get_header_tokens(h, "foo") == []
- h["foo"] = ["bar"]
- assert http.get_header_tokens(h, "foo") == ["bar"]
- h["foo"] = ["bar, voing"]
- assert http.get_header_tokens(h, "foo") == ["bar", "voing"]
- h["foo"] = ["bar, voing", "oink"]
- assert http.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]
-
def test_read_http_body_request():
h = odict.ODictCaseless()
diff --git a/test/http/test_exceptions.py b/test/http/test_exceptions.py
new file mode 100644
index 00000000..aa57f831
--- /dev/null
+++ b/test/http/test_exceptions.py
@@ -0,0 +1,6 @@
+from netlib.http.exceptions import *
+
+def test_HttpAuthenticationError():
+ x = HttpAuthenticationError({"foo": "bar"})
+ assert str(x)
+ assert "foo" in x.headers
diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py
index c4605302..986afc39 100644
--- a/test/http/test_semantics.py
+++ b/test/http/test_semantics.py
@@ -1,54 +1,267 @@
import cStringIO
import textwrap
import binascii
+from mock import MagicMock
from netlib import http, odict, tcp
from netlib.http import http1
+from netlib.http.semantics import CONTENT_MISSING
from .. import tutils, tservers
def test_httperror():
e = http.exceptions.HttpError(404, "Not found")
assert str(e)
+class TestRequest:
+ # def test_asterisk_form_in(self):
+ # f = tutils.tflow(req=None)
+ # protocol = mock_protocol("OPTIONS * HTTP/1.1")
+ # f.request = HTTPRequest.from_protocol(protocol)
+ #
+ # assert f.request.form_in == "relative"
+ # f.request.host = f.server_conn.address.host
+ # f.request.port = f.server_conn.address.port
+ # f.request.scheme = "http"
+ # assert protocol.assemble(f.request) == (
+ # "OPTIONS * HTTP/1.1\r\n"
+ # "Host: address:22\r\n"
+ # "Content-Length: 0\r\n\r\n")
+ #
+ # def test_relative_form_in(self):
+ # protocol = mock_protocol("GET /foo\xff HTTP/1.1")
+ # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
+ #
+ # protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c")
+ # r = HTTPRequest.from_protocol(protocol)
+ # assert r.headers["Upgrade"] == ["h2c"]
+ #
+ # def test_expect_header(self):
+ # protocol = mock_protocol(
+ # "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar")
+ # r = HTTPRequest.from_protocol(protocol)
+ # assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
+ # assert r.content == "foo"
+ # assert protocol.tcp_handler.rfile.read(3) == "bar"
+ #
+ # def test_authority_form_in(self):
+ # protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1")
+ # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
+ #
+ # protocol = mock_protocol("CONNECT address:22 HTTP/1.1")
+ # r = HTTPRequest.from_protocol(protocol)
+ # r.scheme, r.host, r.port = "http", "address", 22
+ # assert protocol.assemble(r) == (
+ # "CONNECT address:22 HTTP/1.1\r\n"
+ # "Host: address:22\r\n"
+ # "Content-Length: 0\r\n\r\n")
+ # assert r.pretty_url(False) == "address:22"
+ #
+ # def test_absolute_form_in(self):
+ # protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1")
+ # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
+ #
+ # protocol = mock_protocol("GET http://address:22/ HTTP/1.1")
+ # r = HTTPRequest.from_protocol(protocol)
+ # assert protocol.assemble(r) == (
+ # "GET http://address:22/ HTTP/1.1\r\n"
+ # "Host: address:22\r\n"
+ # "Content-Length: 0\r\n\r\n")
+ #
+ # def test_http_options_relative_form_in(self):
+ # """
+ # Exercises fix for Issue #392.
+ # """
+ # protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1")
+ # r = HTTPRequest.from_protocol(protocol)
+ # r.host = 'address'
+ # r.port = 80
+ # r.scheme = "http"
+ # assert protocol.assemble(r) == (
+ # "OPTIONS /secret/resource HTTP/1.1\r\n"
+ # "Host: address\r\n"
+ # "Content-Length: 0\r\n\r\n")
+ #
+ # def test_http_options_absolute_form_in(self):
+ # protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1")
+ # r = HTTPRequest.from_protocol(protocol)
+ # r.host = 'address'
+ # r.port = 80
+ # r.scheme = "http"
+ # assert protocol.assemble(r) == (
+ # "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n"
+ # "Host: address\r\n"
+ # "Content-Length: 0\r\n\r\n")
-def test_parse_url():
- assert not http.parse_url("")
-
- u = "http://foo.com:8888/test"
- s, h, po, pa = http.parse_url(u)
- assert s == "http"
- assert h == "foo.com"
- assert po == 8888
- assert pa == "/test"
-
- s, h, po, pa = http.parse_url("http://foo/bar")
- assert s == "http"
- assert h == "foo"
- assert po == 80
- assert pa == "/bar"
-
- s, h, po, pa = http.parse_url("http://user:pass@foo/bar")
- assert s == "http"
- assert h == "foo"
- assert po == 80
- assert pa == "/bar"
-
- s, h, po, pa = http.parse_url("http://foo")
- assert pa == "/"
-
- s, h, po, pa = http.parse_url("https://foo")
- assert po == 443
-
- assert not http.parse_url("https://foo:bar")
- assert not http.parse_url("https://foo:")
-
- # Invalid IDNA
- assert not http.parse_url("http://\xfafoo")
- # Invalid PATH
- assert not http.parse_url("http:/\xc6/localhost:56121")
- # Null byte in host
- assert not http.parse_url("http://foo\0")
- # Port out of range
- assert not http.parse_url("http://foo:999999")
- # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
- assert not http.parse_url('http://lo[calhost')
+ def test_set_url(self):
+ r = tutils.treq_absolute()
+ r.url = "https://otheraddress:42/ORLY"
+ assert r.scheme == "https"
+ assert r.host == "otheraddress"
+ assert r.port == 42
+ assert r.path == "/ORLY"
+
+ def test_repr(self):
+ r = tutils.treq()
+ assert repr(r)
+
+ def test_pretty_host(self):
+ r = tutils.treq()
+ assert r.pretty_host(True) == "address"
+ assert r.pretty_host(False) == "address"
+ r.headers["host"] = ["other"]
+ assert r.pretty_host(True) == "other"
+ assert r.pretty_host(False) == "address"
+ r.host = None
+ assert r.pretty_host(True) == "other"
+ assert r.pretty_host(False) is None
+ del r.headers["host"]
+ assert r.pretty_host(True) is None
+ assert r.pretty_host(False) is None
+
+ # Invalid IDNA
+ r.headers["host"] = [".disqus.com"]
+ assert r.pretty_host(True) == ".disqus.com"
+
+ def test_get_form_for_urlencoded(self):
+ r = tutils.treq()
+ r.headers.add("content-type", "application/x-www-form-urlencoded")
+ r.get_form_urlencoded = MagicMock()
+
+ r.get_form()
+
+ assert r.get_form_urlencoded.called
+
+ def test_get_form_for_multipart(self):
+ r = tutils.treq()
+ r.headers.add("content-type", "multipart/form-data")
+ r.get_form_multipart = MagicMock()
+
+ r.get_form()
+
+ assert r.get_form_multipart.called
+
+ def test_get_cookies_none(self):
+ h = odict.ODictCaseless()
+ r = tutils.treq()
+ r.headers = h
+ assert len(r.get_cookies()) == 0
+
+ def test_get_cookies_single(self):
+ h = odict.ODictCaseless()
+ h["Cookie"] = ["cookiename=cookievalue"]
+ r = tutils.treq()
+ r.headers = h
+ result = r.get_cookies()
+ assert len(result) == 1
+ assert result['cookiename'] == ['cookievalue']
+
+ def test_get_cookies_double(self):
+ h = odict.ODictCaseless()
+ h["Cookie"] = [
+ "cookiename=cookievalue;othercookiename=othercookievalue"
+ ]
+ r = tutils.treq()
+ r.headers = h
+ result = r.get_cookies()
+ assert len(result) == 2
+ assert result['cookiename'] == ['cookievalue']
+ assert result['othercookiename'] == ['othercookievalue']
+
+ def test_get_cookies_withequalsign(self):
+ h = odict.ODictCaseless()
+ h["Cookie"] = [
+ "cookiename=coo=kievalue;othercookiename=othercookievalue"
+ ]
+ r = tutils.treq()
+ r.headers = h
+ result = r.get_cookies()
+ assert len(result) == 2
+ assert result['cookiename'] == ['coo=kievalue']
+ assert result['othercookiename'] == ['othercookievalue']
+
+ def test_set_cookies(self):
+ h = odict.ODictCaseless()
+ h["Cookie"] = ["cookiename=cookievalue"]
+ r = tutils.treq()
+ r.headers = h
+ result = r.get_cookies()
+ result["cookiename"] = ["foo"]
+ r.set_cookies(result)
+ assert r.get_cookies()["cookiename"] == ["foo"]
+
+
+class TestResponse(object):
+ def test_repr(self):
+ r = tutils.tresp()
+ assert "unknown content type" in repr(r)
+ r.headers["content-type"] = ["foo"]
+ assert "foo" in repr(r)
+ assert repr(tutils.tresp(content=CONTENT_MISSING))
+
+ def test_get_cookies_none(self):
+ h = odict.ODictCaseless()
+ resp = tutils.tresp()
+ resp.headers = h
+ assert not resp.get_cookies()
+
+ def test_get_cookies_simple(self):
+ h = odict.ODictCaseless()
+ h["Set-Cookie"] = ["cookiename=cookievalue"]
+ resp = tutils.tresp()
+ resp.headers = h
+ result = resp.get_cookies()
+ assert len(result) == 1
+ assert "cookiename" in result
+ assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
+
+ def test_get_cookies_with_parameters(self):
+ h = odict.ODictCaseless()
+ h["Set-Cookie"] = [
+ "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
+ resp = tutils.tresp()
+ resp.headers = h
+ result = resp.get_cookies()
+ assert len(result) == 1
+ assert "cookiename" in result
+ assert result["cookiename"][0][0] == "cookievalue"
+ attrs = result["cookiename"][0][1]
+ assert len(attrs) == 4
+ assert attrs["domain"] == ["example.com"]
+ assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
+ assert attrs["path"] == ["/"]
+ assert attrs["httponly"] == [None]
+
+ def test_get_cookies_no_value(self):
+ h = odict.ODictCaseless()
+ h["Set-Cookie"] = [
+ "cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"
+ ]
+ resp = tutils.tresp()
+ resp.headers = h
+ result = resp.get_cookies()
+ assert len(result) == 1
+ assert "cookiename" in result
+ assert result["cookiename"][0][0] == ""
+ assert len(result["cookiename"][0][1]) == 2
+
+ def test_get_cookies_twocookies(self):
+ h = odict.ODictCaseless()
+ h["Set-Cookie"] = ["cookiename=cookievalue", "othercookie=othervalue"]
+ resp = tutils.tresp()
+ resp.headers = h
+ result = resp.get_cookies()
+ assert len(result) == 2
+ assert "cookiename" in result
+ assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
+ assert "othercookie" in result
+ assert result["othercookie"][0] == ["othervalue", odict.ODict()]
+
+ def test_set_cookies(self):
+ resp = tutils.tresp()
+ v = resp.get_cookies()
+ v.add("foo", ["bar", odict.ODictCaseless()])
+ resp.set_cookies(v)
+
+ v = resp.get_cookies()
+ assert len(v) == 1
+ assert v["foo"] == [["bar", odict.ODictCaseless()]]
diff --git a/test/test_utils.py b/test/test_utils.py
index 8e66bce4..0153030c 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -1,4 +1,6 @@
-from netlib import utils
+import urlparse
+
+from netlib import utils, odict
import tutils
@@ -27,3 +29,76 @@ def test_pretty_size():
assert utils.pretty_size(1024) == "1kB"
assert utils.pretty_size(1024 + (1024 / 2.0)) == "1.5kB"
assert utils.pretty_size(1024 * 1024) == "1MB"
+
+
+
+
+def test_parse_url():
+ assert not utils.parse_url("")
+
+ u = "http://foo.com:8888/test"
+ s, h, po, pa = utils.parse_url(u)
+ assert s == "http"
+ assert h == "foo.com"
+ assert po == 8888
+ assert pa == "/test"
+
+ s, h, po, pa = utils.parse_url("http://foo/bar")
+ assert s == "http"
+ assert h == "foo"
+ assert po == 80
+ assert pa == "/bar"
+
+ s, h, po, pa = utils.parse_url("http://user:pass@foo/bar")
+ assert s == "http"
+ assert h == "foo"
+ assert po == 80
+ assert pa == "/bar"
+
+ s, h, po, pa = utils.parse_url("http://foo")
+ assert pa == "/"
+
+ s, h, po, pa = utils.parse_url("https://foo")
+ assert po == 443
+
+ assert not utils.parse_url("https://foo:bar")
+ assert not utils.parse_url("https://foo:")
+
+ # Invalid IDNA
+ assert not utils.parse_url("http://\xfafoo")
+ # Invalid PATH
+ assert not utils.parse_url("http:/\xc6/localhost:56121")
+ # Null byte in host
+ assert not utils.parse_url("http://foo\0")
+ # Port out of range
+ assert not utils.parse_url("http://foo:999999")
+ # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
+ assert not utils.parse_url('http://lo[calhost')
+
+
+def test_unparse_url():
+ assert utils.unparse_url("http", "foo.com", 99, "") == "http://foo.com:99"
+ assert utils.unparse_url("http", "foo.com", 80, "") == "http://foo.com"
+ assert utils.unparse_url("https", "foo.com", 80, "") == "https://foo.com:80"
+ assert utils.unparse_url("https", "foo.com", 443, "") == "https://foo.com"
+
+
+def test_urlencode():
+ assert utils.urlencode([('foo', 'bar')])
+
+
+
+def test_urldecode():
+ s = "one=two&three=four"
+ assert len(utils.urldecode(s)) == 2
+
+
+def test_get_header_tokens():
+ h = odict.ODictCaseless()
+ assert utils.get_header_tokens(h, "foo") == []
+ h["foo"] = ["bar"]
+ assert utils.get_header_tokens(h, "foo") == ["bar"]
+ h["foo"] = ["bar, voing"]
+ assert utils.get_header_tokens(h, "foo") == ["bar", "voing"]
+ h["foo"] = ["bar, voing", "oink"]
+ assert utils.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]
diff --git a/test/tutils.py b/test/tutils.py
deleted file mode 100644
index 94139f6f..00000000
--- a/test/tutils.py
+++ /dev/null
@@ -1,68 +0,0 @@
-import cStringIO
-import tempfile
-import os
-import shutil
-from contextlib import contextmanager
-
-from netlib import tcp, utils
-
-
-def treader(bytes):
- """
- Construct a tcp.Read object from bytes.
- """
- fp = cStringIO.StringIO(bytes)
- return tcp.Reader(fp)
-
-
-@contextmanager
-def tmpdir(*args, **kwargs):
- orig_workdir = os.getcwd()
- temp_workdir = tempfile.mkdtemp(*args, **kwargs)
- os.chdir(temp_workdir)
-
- yield temp_workdir
-
- os.chdir(orig_workdir)
- shutil.rmtree(temp_workdir)
-
-
-def raises(exc, obj, *args, **kwargs):
- """
- Assert that a callable raises a specified exception.
-
- :exc An exception class or a string. If a class, assert that an
- exception of this type is raised. If a string, assert that the string
- occurs in the string representation of the exception, based on a
- case-insenstivie match.
-
- :obj A callable object.
-
- :args Arguments to be passsed to the callable.
-
- :kwargs Arguments to be passed to the callable.
- """
- try:
- ret = obj(*args, **kwargs)
- except Exception as v:
- if isinstance(exc, basestring):
- if exc.lower() in str(v).lower():
- return
- else:
- raise AssertionError(
- "Expected %s, but caught %s" % (
- repr(str(exc)), v
- )
- )
- else:
- if isinstance(v, exc):
- return
- else:
- raise AssertionError(
- "Expected %s, but caught %s %s" % (
- exc.__name__, v.__class__.__name__, str(v)
- )
- )
- raise AssertionError("No exception raised. Return value: {}".format(ret))
-
-test_data = utils.Data(__name__)