aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--netlib/encoding.py82
-rw-r--r--netlib/http/exceptions.py13
-rw-r--r--netlib/http/http1/protocol.py39
-rw-r--r--netlib/http/semantics.py366
-rw-r--r--netlib/tutils.py (renamed from test/tutils.py)59
-rw-r--r--netlib/utils.py100
-rw-r--r--test/http/http1/test_protocol.py10
-rw-r--r--test/http/test_exceptions.py6
-rw-r--r--test/http/test_semantics.py295
-rw-r--r--test/test_utils.py77
10 files changed, 885 insertions, 162 deletions
diff --git a/netlib/encoding.py b/netlib/encoding.py
new file mode 100644
index 00000000..f107eb5f
--- /dev/null
+++ b/netlib/encoding.py
@@ -0,0 +1,82 @@
+"""
+ Utility functions for decoding response bodies.
+"""
+from __future__ import absolute_import
+import cStringIO
+import gzip
+import zlib
+
+__ALL__ = ["ENCODINGS"]
+
+ENCODINGS = set(["identity", "gzip", "deflate"])
+
+
+def decode(e, content):
+ encoding_map = {
+ "identity": identity,
+ "gzip": decode_gzip,
+ "deflate": decode_deflate,
+ }
+ if e not in encoding_map:
+ return None
+ return encoding_map[e](content)
+
+
+def encode(e, content):
+ encoding_map = {
+ "identity": identity,
+ "gzip": encode_gzip,
+ "deflate": encode_deflate,
+ }
+ if e not in encoding_map:
+ return None
+ return encoding_map[e](content)
+
+
+def identity(content):
+ """
+ Returns content unchanged. Identity is the default value of
+ Accept-Encoding headers.
+ """
+ return content
+
+
+def decode_gzip(content):
+ gfile = gzip.GzipFile(fileobj=cStringIO.StringIO(content))
+ try:
+ return gfile.read()
+ except (IOError, EOFError):
+ return None
+
+
+def encode_gzip(content):
+ s = cStringIO.StringIO()
+ gf = gzip.GzipFile(fileobj=s, mode='wb')
+ gf.write(content)
+ gf.close()
+ return s.getvalue()
+
+
+def decode_deflate(content):
+ """
+ Returns decompressed data for DEFLATE. Some servers may respond with
+ compressed data without a zlib header or checksum. An undocumented
+ feature of zlib permits the lenient decompression of data missing both
+ values.
+
+ http://bugs.python.org/issue5784
+ """
+ try:
+ try:
+ return zlib.decompress(content)
+ except zlib.error:
+ return zlib.decompress(content, -15)
+ except zlib.error:
+ return None
+
+
+def encode_deflate(content):
+ """
+ Returns compressed content, always including zlib header and checksum.
+ """
+ return zlib.compress(content)
diff --git a/netlib/http/exceptions.py b/netlib/http/exceptions.py
index 8a2bbebc..45bd2dce 100644
--- a/netlib/http/exceptions.py
+++ b/netlib/http/exceptions.py
@@ -7,3 +7,16 @@ class HttpError(Exception):
class HttpErrorConnClosed(HttpError):
pass
+
+
+
+class HttpAuthenticationError(Exception):
+ def __init__(self, auth_headers=None):
+ super(HttpAuthenticationError, self).__init__(
+ "Proxy Authentication Required"
+ )
+ self.headers = auth_headers
+ self.code = 407
+
+ def __repr__(self):
+ return "Proxy Authentication Required"
diff --git a/netlib/http/http1/protocol.py b/netlib/http/http1/protocol.py
index b098110a..a189bffc 100644
--- a/netlib/http/http1/protocol.py
+++ b/netlib/http/http1/protocol.py
@@ -375,7 +375,7 @@ class HTTP1Protocol(semantics.ProtocolMixin):
@classmethod
def has_chunked_encoding(self, headers):
return "chunked" in [
- i.lower() for i in http.get_header_tokens(headers, "transfer-encoding")
+ i.lower() for i in utils.get_header_tokens(headers, "transfer-encoding")
]
@@ -482,9 +482,9 @@ class HTTP1Protocol(semantics.ProtocolMixin):
port = int(port)
except ValueError:
return None
- if not http.is_valid_port(port):
+ if not utils.is_valid_port(port):
return None
- if not http.is_valid_host(host):
+ if not utils.is_valid_host(host):
return None
return host, port, httpversion
@@ -496,7 +496,7 @@ class HTTP1Protocol(semantics.ProtocolMixin):
return None
method, url, httpversion = v
- parts = http.parse_url(url)
+ parts = utils.parse_url(url)
if not parts:
return None
scheme, host, port, path = parts
@@ -528,7 +528,7 @@ class HTTP1Protocol(semantics.ProtocolMixin):
"""
# At first, check if we have an explicit Connection header.
if "connection" in headers:
- toks = http.get_header_tokens(headers, "connection")
+ toks = utils.get_header_tokens(headers, "connection")
if "close" in toks:
return True
elif "keep-alive" in toks:
@@ -556,34 +556,7 @@ class HTTP1Protocol(semantics.ProtocolMixin):
@classmethod
def _assemble_request_first_line(self, request):
- if request.form_in == "relative":
- request_line = '%s %s HTTP/%s.%s' % (
- request.method,
- request.path,
- request.httpversion[0],
- request.httpversion[1],
- )
- elif request.form_in == "authority":
- request_line = '%s %s:%s HTTP/%s.%s' % (
- request.method,
- request.host,
- request.port,
- request.httpversion[0],
- request.httpversion[1],
- )
- elif request.form_in == "absolute":
- request_line = '%s %s://%s:%s%s HTTP/%s.%s' % (
- request.method,
- request.scheme,
- request.host,
- request.port,
- request.path,
- request.httpversion[0],
- request.httpversion[1],
- )
- else:
- raise http.HttpError(400, "Invalid request form")
- return request_line
+ return request.legacy_first_line()
def _assemble_request_headers(self, request):
headers = request.headers.copy()
diff --git a/netlib/http/semantics.py b/netlib/http/semantics.py
index 54bf83d2..e7ae2b5f 100644
--- a/netlib/http/semantics.py
+++ b/netlib/http/semantics.py
@@ -3,9 +3,15 @@ import binascii
import collections
import string
import sys
+import urllib
import urlparse
from .. import utils, odict
+from . import cookies
+from netlib import utils, encoding
+
+HDR_FORM_URLENCODED = "application/x-www-form-urlencoded"
+HDR_FORM_MULTIPART = "multipart/form-data"
CONTENT_MISSING = 0
@@ -75,7 +81,240 @@ class Request(object):
return False
def __repr__(self):
- return "Request(%s - %s, %s)" % (self.method, self.host, self.path)
+ # return "Request(%s - %s, %s)" % (self.method, self.host, self.path)
+
+ return "<HTTPRequest: {0}>".format(
+ self.legacy_first_line()[:-9]
+ )
+
+ def legacy_first_line(self):
+ if self.form_in == "relative":
+ return '%s %s HTTP/%s.%s' % (
+ self.method,
+ self.path,
+ self.httpversion[0],
+ self.httpversion[1],
+ )
+ elif self.form_in == "authority":
+ return '%s %s:%s HTTP/%s.%s' % (
+ self.method,
+ self.host,
+ self.port,
+ self.httpversion[0],
+ self.httpversion[1],
+ )
+ elif self.form_in == "absolute":
+ return '%s %s://%s:%s%s HTTP/%s.%s' % (
+ self.method,
+ self.scheme,
+ self.host,
+ self.port,
+ self.path,
+ self.httpversion[0],
+ self.httpversion[1],
+ )
+ else:
+ raise http.HttpError(400, "Invalid request form")
+
+ def anticache(self):
+ """
+ Modifies this request to remove headers that might produce a cached
+ response. That is, we remove ETags and If-Modified-Since headers.
+ """
+ delheaders = [
+ "if-modified-since",
+ "if-none-match",
+ ]
+ for i in delheaders:
+ del self.headers[i]
+
+ def anticomp(self):
+ """
+ Modifies this request to remove headers that will compress the
+ resource's data.
+ """
+ self.headers["accept-encoding"] = ["identity"]
+
+ def constrain_encoding(self):
+ """
+ Limits the permissible Accept-Encoding values, based on what we can
+ decode appropriately.
+ """
+ if self.headers["accept-encoding"]:
+ self.headers["accept-encoding"] = [
+ ', '.join(
+ e for e in encoding.ENCODINGS if e in self.headers["accept-encoding"][0])]
+
+ def update_host_header(self):
+ """
+ Update the host header to reflect the current target.
+ """
+ self.headers["Host"] = [self.host]
+
+ def get_form(self):
+ """
+ Retrieves the URL-encoded or multipart form data, returning an ODict object.
+ Returns an empty ODict if there is no data or the content-type
+ indicates non-form data.
+ """
+ if self.body:
+ if self.headers.in_any("content-type", HDR_FORM_URLENCODED, True):
+ return self.get_form_urlencoded()
+ elif self.headers.in_any("content-type", HDR_FORM_MULTIPART, True):
+ return self.get_form_multipart()
+ return odict.ODict([])
+
+ def get_form_urlencoded(self):
+ """
+ Retrieves the URL-encoded form data, returning an ODict object.
+ Returns an empty ODict if there is no data or the content-type
+ indicates non-form data.
+ """
+ if self.body and self.headers.in_any(
+ "content-type",
+ HDR_FORM_URLENCODED,
+ True):
+ return odict.ODict(utils.urldecode(self.body))
+ return odict.ODict([])
+
+ def get_form_multipart(self):
+ if self.body and self.headers.in_any(
+ "content-type",
+ HDR_FORM_MULTIPART,
+ True):
+ return odict.ODict(
+ utils.multipartdecode(
+ self.headers,
+ self.body))
+ return odict.ODict([])
+
+ def set_form_urlencoded(self, odict):
+ """
+ Sets the body to the URL-encoded form data, and adds the
+ appropriate content-type header. Note that this will destory the
+ existing body if there is one.
+ """
+ # FIXME: If there's an existing content-type header indicating a
+ # url-encoded form, leave it alone.
+ self.headers["Content-Type"] = [HDR_FORM_URLENCODED]
+ self.body = utils.urlencode(odict.lst)
+
+ def get_path_components(self):
+ """
+ Returns the path components of the URL as a list of strings.
+
+ Components are unquoted.
+ """
+ _, _, path, _, _, _ = urlparse.urlparse(self.url)
+ return [urllib.unquote(i) for i in path.split("/") if i]
+
+ def set_path_components(self, lst):
+ """
+ Takes a list of strings, and sets the path component of the URL.
+
+ Components are quoted.
+ """
+ lst = [urllib.quote(i, safe="") for i in lst]
+ path = "/" + "/".join(lst)
+ scheme, netloc, _, params, query, fragment = urlparse.urlparse(self.url)
+ self.url = urlparse.urlunparse(
+ [scheme, netloc, path, params, query, fragment]
+ )
+
+ def get_query(self):
+ """
+ Gets the request query string. Returns an ODict object.
+ """
+ _, _, _, _, query, _ = urlparse.urlparse(self.url)
+ if query:
+ return odict.ODict(utils.urldecode(query))
+ return odict.ODict([])
+
+ def set_query(self, odict):
+ """
+ Takes an ODict object, and sets the request query string.
+ """
+ scheme, netloc, path, params, _, fragment = urlparse.urlparse(self.url)
+ query = utils.urlencode(odict.lst)
+ self.url = urlparse.urlunparse(
+ [scheme, netloc, path, params, query, fragment]
+ )
+
+ def pretty_host(self, hostheader):
+ """
+ Heuristic to get the host of the request.
+
+ Note that pretty_host() does not always return the TCP destination
+ of the request, e.g. if an upstream proxy is in place
+
+ If hostheader is set to True, the Host: header will be used as
+ additional (and preferred) data source. This is handy in
+ transparent mode, where only the IO of the destination is known,
+ but not the resolved name. This is disabled by default, as an
+ attacker may spoof the host header to confuse an analyst.
+ """
+ host = None
+ if hostheader:
+ host = self.headers.get_first("host")
+ if not host:
+ host = self.host
+ if host:
+ try:
+ return host.encode("idna")
+ except ValueError:
+ return host
+ else:
+ return None
+
+ def pretty_url(self, hostheader):
+ if self.form_out == "authority": # upstream proxy mode
+ return "%s:%s" % (self.pretty_host(hostheader), self.port)
+ return utils.unparse_url(self.scheme,
+ self.pretty_host(hostheader),
+ self.port,
+ self.path).encode('ascii')
+
+ def get_cookies(self):
+ """
+ Returns a possibly empty netlib.odict.ODict object.
+ """
+ ret = odict.ODict()
+ for i in self.headers["cookie"]:
+ ret.extend(cookies.parse_cookie_header(i))
+ return ret
+
+ def set_cookies(self, odict):
+ """
+ Takes an netlib.odict.ODict object. Over-writes any existing Cookie
+ headers.
+ """
+ v = cookies.format_cookie_header(odict)
+ self.headers["Cookie"] = [v]
+
+ @property
+ def url(self):
+ """
+ Returns a URL string, constructed from the Request's URL components.
+ """
+ return utils.unparse_url(
+ self.scheme,
+ self.host,
+ self.port,
+ self.path
+ ).encode('ascii')
+
+ @url.setter
+ def url(self, url):
+ """
+ Parses a URL specification, and updates the Request's information
+ accordingly.
+
+ Returns False if the URL was invalid, True if the request succeeded.
+ """
+ parts = utils.parse_url(url)
+ if not parts:
+ raise ValueError("Invalid URL: %s" % url)
+ self.scheme, self.host, self.port, self.path = parts
@property
def content(self):
@@ -139,7 +378,56 @@ class Response(object):
return False
def __repr__(self):
- return "Response(%s - %s)" % (self.status_code, self.msg)
+ # return "Response(%s - %s)" % (self.status_code, self.msg)
+
+ if self.body:
+ size = utils.pretty_size(len(self.body))
+ else:
+ size = "content missing"
+ return "<HTTPResponse: {status_code} {msg} ({contenttype}, {size})>".format(
+ status_code=self.status_code,
+ msg=self.msg,
+ contenttype=self.headers.get_first(
+ "content-type", "unknown content type"
+ ),
+ size=size
+ )
+
+
+ def get_cookies(self):
+ """
+ Get the contents of all Set-Cookie headers.
+
+ Returns a possibly empty ODict, where keys are cookie name strings,
+ and values are [value, attr] lists. Value is a string, and attr is
+ an ODictCaseless containing cookie attributes. Within attrs, unary
+ attributes (e.g. HTTPOnly) are indicated by a Null value.
+ """
+ ret = []
+ for header in self.headers["set-cookie"]:
+ v = cookies.parse_set_cookie_header(header)
+ if v:
+ name, value, attrs = v
+ ret.append([name, [value, attrs]])
+ return odict.ODict(ret)
+
+ def set_cookies(self, odict):
+ """
+ Set the Set-Cookie headers on this response, over-writing existing
+ headers.
+
+ Accepts an ODict of the same format as that returned by get_cookies.
+ """
+ values = []
+ for i in odict.lst:
+ values.append(
+ cookies.format_set_cookie_header(
+ i[0],
+ i[1][0],
+ i[1][1]
+ )
+ )
+ self.headers["Set-Cookie"] = values
@property
def content(self):
@@ -160,77 +448,3 @@ class Response(object):
def code(self, code):
# TODO: remove deprecated setter
self.status_code = code
-
-
-
-def is_valid_port(port):
- if not 0 <= port <= 65535:
- return False
- return True
-
-
-def is_valid_host(host):
- try:
- host.decode("idna")
- except ValueError:
- return False
- if "\0" in host:
- return None
- return True
-
-
-def parse_url(url):
- """
- Returns a (scheme, host, port, path) tuple, or None on error.
-
- Checks that:
- port is an integer 0-65535
- host is a valid IDNA-encoded hostname with no null-bytes
- path is valid ASCII
- """
- try:
- scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
- except ValueError:
- return None
- if not scheme:
- return None
- if '@' in netloc:
- # FIXME: Consider what to do with the discarded credentials here Most
- # probably we should extend the signature to return these as a separate
- # value.
- _, netloc = string.rsplit(netloc, '@', maxsplit=1)
- if ':' in netloc:
- host, port = string.rsplit(netloc, ':', maxsplit=1)
- try:
- port = int(port)
- except ValueError:
- return None
- else:
- host = netloc
- if scheme == "https":
- port = 443
- else:
- port = 80
- path = urlparse.urlunparse(('', '', path, params, query, fragment))
- if not path.startswith("/"):
- path = "/" + path
- if not is_valid_host(host):
- return None
- if not utils.isascii(path):
- return None
- if not is_valid_port(port):
- return None
- return scheme, host, port, path
-
-
-def get_header_tokens(headers, key):
- """
- Retrieve all tokens for a header key. A number of different headers
- follow a pattern where each header line can containe comma-separated
- tokens, and headers can be set multiple times.
- """
- toks = []
- for i in headers[key]:
- for j in i.split(","):
- toks.append(j.strip())
- return toks
diff --git a/test/tutils.py b/netlib/tutils.py
index 94139f6f..5018b9e8 100644
--- a/test/tutils.py
+++ b/netlib/tutils.py
@@ -1,10 +1,11 @@
import cStringIO
import tempfile
import os
+import time
import shutil
from contextlib import contextmanager
-from netlib import tcp, utils
+from netlib import tcp, utils, odict, http
def treader(bytes):
@@ -66,3 +67,59 @@ def raises(exc, obj, *args, **kwargs):
raise AssertionError("No exception raised. Return value: {}".format(ret))
test_data = utils.Data(__name__)
+
+
+
+
+def treq(content="content", scheme="http", host="address", port=22):
+ """
+ @return: libmproxy.protocol.http.HTTPRequest
+ """
+ headers = odict.ODictCaseless()
+ headers["header"] = ["qvalue"]
+ req = http.Request(
+ "relative",
+ "GET",
+ scheme,
+ host,
+ port,
+ "/path",
+ (1, 1),
+ headers,
+ content,
+ None,
+ None,
+ )
+ return req
+
+
+def treq_absolute(content="content"):
+ """
+ @return: libmproxy.protocol.http.HTTPRequest
+ """
+ r = treq(content)
+ r.form_in = r.form_out = "absolute"
+ r.host = "address"
+ r.port = 22
+ r.scheme = "http"
+ return r
+
+
+def tresp(content="message"):
+ """
+ @return: libmproxy.protocol.http.HTTPResponse
+ """
+
+ headers = odict.ODictCaseless()
+ headers["header_response"] = ["svalue"]
+
+ resp = http.semantics.Response(
+ (1, 1),
+ 200,
+ "OK",
+ headers,
+ content,
+ time.time(),
+ time.time(),
+ )
+ return resp
diff --git a/netlib/utils.py b/netlib/utils.py
index 86e33f33..39354605 100644
--- a/netlib/utils.py
+++ b/netlib/utils.py
@@ -1,5 +1,10 @@
from __future__ import (absolute_import, print_function, division)
import os.path
+import cgi
+import urllib
+import urlparse
+import string
+
def isascii(s):
try:
@@ -131,6 +136,81 @@ class Data(object):
return fullpath
+
+
+def is_valid_port(port):
+ if not 0 <= port <= 65535:
+ return False
+ return True
+
+
+def is_valid_host(host):
+ try:
+ host.decode("idna")
+ except ValueError:
+ return False
+ if "\0" in host:
+ return None
+ return True
+
+
+def parse_url(url):
+ """
+ Returns a (scheme, host, port, path) tuple, or None on error.
+
+ Checks that:
+ port is an integer 0-65535
+ host is a valid IDNA-encoded hostname with no null-bytes
+ path is valid ASCII
+ """
+ try:
+ scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
+ except ValueError:
+ return None
+ if not scheme:
+ return None
+ if '@' in netloc:
+ # FIXME: Consider what to do with the discarded credentials here Most
+ # probably we should extend the signature to return these as a separate
+ # value.
+ _, netloc = string.rsplit(netloc, '@', maxsplit=1)
+ if ':' in netloc:
+ host, port = string.rsplit(netloc, ':', maxsplit=1)
+ try:
+ port = int(port)
+ except ValueError:
+ return None
+ else:
+ host = netloc
+ if scheme == "https":
+ port = 443
+ else:
+ port = 80
+ path = urlparse.urlunparse(('', '', path, params, query, fragment))
+ if not path.startswith("/"):
+ path = "/" + path
+ if not is_valid_host(host):
+ return None
+ if not isascii(path):
+ return None
+ if not is_valid_port(port):
+ return None
+ return scheme, host, port, path
+
+
+def get_header_tokens(headers, key):
+ """
+ Retrieve all tokens for a header key. A number of different headers
+ follow a pattern where each header line can containe comma-separated
+ tokens, and headers can be set multiple times.
+ """
+ toks = []
+ for i in headers[key]:
+ for j in i.split(","):
+ toks.append(j.strip())
+ return toks
+
+
def hostport(scheme, host, port):
"""
Returns the host component, with a port specifcation if needed.
@@ -139,3 +219,23 @@ def hostport(scheme, host, port):
return host
else:
return "%s:%s" % (host, port)
+
+def unparse_url(scheme, host, port, path=""):
+ """
+ Returns a URL string, constructed from the specified compnents.
+ """
+ return "%s://%s%s" % (scheme, hostport(scheme, host, port), path)
+
+
+def urlencode(s):
+ """
+ Takes a list of (key, value) tuples and returns a urlencoded string.
+ """
+ s = [tuple(i) for i in s]
+ return urllib.urlencode(s, False)
+
+def urldecode(s):
+ """
+ Takes a urlencoded string and returns a list of (key, value) tuples.
+ """
+ return cgi.parse_qsl(s, keep_blank_values=True)
diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py
index b196b7a3..05bad1af 100644
--- a/test/http/http1/test_protocol.py
+++ b/test/http/http1/test_protocol.py
@@ -75,16 +75,6 @@ def test_connection_close():
assert HTTP1Protocol.connection_close((1, 1), h)
-def test_get_header_tokens():
- h = odict.ODictCaseless()
- assert http.get_header_tokens(h, "foo") == []
- h["foo"] = ["bar"]
- assert http.get_header_tokens(h, "foo") == ["bar"]
- h["foo"] = ["bar, voing"]
- assert http.get_header_tokens(h, "foo") == ["bar", "voing"]
- h["foo"] = ["bar, voing", "oink"]
- assert http.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]
-
def test_read_http_body_request():
h = odict.ODictCaseless()
diff --git a/test/http/test_exceptions.py b/test/http/test_exceptions.py
new file mode 100644
index 00000000..aa57f831
--- /dev/null
+++ b/test/http/test_exceptions.py
@@ -0,0 +1,6 @@
+from netlib.http.exceptions import *
+
+def test_HttpAuthenticationError():
+ x = HttpAuthenticationError({"foo": "bar"})
+ assert str(x)
+ assert "foo" in x.headers
diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py
index c4605302..986afc39 100644
--- a/test/http/test_semantics.py
+++ b/test/http/test_semantics.py
@@ -1,54 +1,267 @@
import cStringIO
import textwrap
import binascii
+from mock import MagicMock
from netlib import http, odict, tcp
from netlib.http import http1
+from netlib.http.semantics import CONTENT_MISSING
from .. import tutils, tservers
def test_httperror():
e = http.exceptions.HttpError(404, "Not found")
assert str(e)
+class TestRequest:
+ # def test_asterisk_form_in(self):
+ # f = tutils.tflow(req=None)
+ # protocol = mock_protocol("OPTIONS * HTTP/1.1")
+ # f.request = HTTPRequest.from_protocol(protocol)
+ #
+ # assert f.request.form_in == "relative"
+ # f.request.host = f.server_conn.address.host
+ # f.request.port = f.server_conn.address.port
+ # f.request.scheme = "http"
+ # assert protocol.assemble(f.request) == (
+ # "OPTIONS * HTTP/1.1\r\n"
+ # "Host: address:22\r\n"
+ # "Content-Length: 0\r\n\r\n")
+ #
+ # def test_relative_form_in(self):
+ # protocol = mock_protocol("GET /foo\xff HTTP/1.1")
+ # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
+ #
+ # protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c")
+ # r = HTTPRequest.from_protocol(protocol)
+ # assert r.headers["Upgrade"] == ["h2c"]
+ #
+ # def test_expect_header(self):
+ # protocol = mock_protocol(
+ # "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar")
+ # r = HTTPRequest.from_protocol(protocol)
+ # assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
+ # assert r.content == "foo"
+ # assert protocol.tcp_handler.rfile.read(3) == "bar"
+ #
+ # def test_authority_form_in(self):
+ # protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1")
+ # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
+ #
+ # protocol = mock_protocol("CONNECT address:22 HTTP/1.1")
+ # r = HTTPRequest.from_protocol(protocol)
+ # r.scheme, r.host, r.port = "http", "address", 22
+ # assert protocol.assemble(r) == (
+ # "CONNECT address:22 HTTP/1.1\r\n"
+ # "Host: address:22\r\n"
+ # "Content-Length: 0\r\n\r\n")
+ # assert r.pretty_url(False) == "address:22"
+ #
+ # def test_absolute_form_in(self):
+ # protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1")
+ # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
+ #
+ # protocol = mock_protocol("GET http://address:22/ HTTP/1.1")
+ # r = HTTPRequest.from_protocol(protocol)
+ # assert protocol.assemble(r) == (
+ # "GET http://address:22/ HTTP/1.1\r\n"
+ # "Host: address:22\r\n"
+ # "Content-Length: 0\r\n\r\n")
+ #
+ # def test_http_options_relative_form_in(self):
+ # """
+ # Exercises fix for Issue #392.
+ # """
+ # protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1")
+ # r = HTTPRequest.from_protocol(protocol)
+ # r.host = 'address'
+ # r.port = 80
+ # r.scheme = "http"
+ # assert protocol.assemble(r) == (
+ # "OPTIONS /secret/resource HTTP/1.1\r\n"
+ # "Host: address\r\n"
+ # "Content-Length: 0\r\n\r\n")
+ #
+ # def test_http_options_absolute_form_in(self):
+ # protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1")
+ # r = HTTPRequest.from_protocol(protocol)
+ # r.host = 'address'
+ # r.port = 80
+ # r.scheme = "http"
+ # assert protocol.assemble(r) == (
+ # "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n"
+ # "Host: address\r\n"
+ # "Content-Length: 0\r\n\r\n")
-def test_parse_url():
- assert not http.parse_url("")
-
- u = "http://foo.com:8888/test"
- s, h, po, pa = http.parse_url(u)
- assert s == "http"
- assert h == "foo.com"
- assert po == 8888
- assert pa == "/test"
-
- s, h, po, pa = http.parse_url("http://foo/bar")
- assert s == "http"
- assert h == "foo"
- assert po == 80
- assert pa == "/bar"
-
- s, h, po, pa = http.parse_url("http://user:pass@foo/bar")
- assert s == "http"
- assert h == "foo"
- assert po == 80
- assert pa == "/bar"
-
- s, h, po, pa = http.parse_url("http://foo")
- assert pa == "/"
-
- s, h, po, pa = http.parse_url("https://foo")
- assert po == 443
-
- assert not http.parse_url("https://foo:bar")
- assert not http.parse_url("https://foo:")
-
- # Invalid IDNA
- assert not http.parse_url("http://\xfafoo")
- # Invalid PATH
- assert not http.parse_url("http:/\xc6/localhost:56121")
- # Null byte in host
- assert not http.parse_url("http://foo\0")
- # Port out of range
- assert not http.parse_url("http://foo:999999")
- # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
- assert not http.parse_url('http://lo[calhost')
+ def test_set_url(self):
+ r = tutils.treq_absolute()
+ r.url = "https://otheraddress:42/ORLY"
+ assert r.scheme == "https"
+ assert r.host == "otheraddress"
+ assert r.port == 42
+ assert r.path == "/ORLY"
+
+ def test_repr(self):
+ r = tutils.treq()
+ assert repr(r)
+
+ def test_pretty_host(self):
+ r = tutils.treq()
+ assert r.pretty_host(True) == "address"
+ assert r.pretty_host(False) == "address"
+ r.headers["host"] = ["other"]
+ assert r.pretty_host(True) == "other"
+ assert r.pretty_host(False) == "address"
+ r.host = None
+ assert r.pretty_host(True) == "other"
+ assert r.pretty_host(False) is None
+ del r.headers["host"]
+ assert r.pretty_host(True) is None
+ assert r.pretty_host(False) is None
+
+ # Invalid IDNA
+ r.headers["host"] = [".disqus.com"]
+ assert r.pretty_host(True) == ".disqus.com"
+
+ def test_get_form_for_urlencoded(self):
+ r = tutils.treq()
+ r.headers.add("content-type", "application/x-www-form-urlencoded")
+ r.get_form_urlencoded = MagicMock()
+
+ r.get_form()
+
+ assert r.get_form_urlencoded.called
+
+ def test_get_form_for_multipart(self):
+ r = tutils.treq()
+ r.headers.add("content-type", "multipart/form-data")
+ r.get_form_multipart = MagicMock()
+
+ r.get_form()
+
+ assert r.get_form_multipart.called
+
+ def test_get_cookies_none(self):
+ h = odict.ODictCaseless()
+ r = tutils.treq()
+ r.headers = h
+ assert len(r.get_cookies()) == 0
+
+ def test_get_cookies_single(self):
+ h = odict.ODictCaseless()
+ h["Cookie"] = ["cookiename=cookievalue"]
+ r = tutils.treq()
+ r.headers = h
+ result = r.get_cookies()
+ assert len(result) == 1
+ assert result['cookiename'] == ['cookievalue']
+
+ def test_get_cookies_double(self):
+ h = odict.ODictCaseless()
+ h["Cookie"] = [
+ "cookiename=cookievalue;othercookiename=othercookievalue"
+ ]
+ r = tutils.treq()
+ r.headers = h
+ result = r.get_cookies()
+ assert len(result) == 2
+ assert result['cookiename'] == ['cookievalue']
+ assert result['othercookiename'] == ['othercookievalue']
+
+ def test_get_cookies_withequalsign(self):
+ h = odict.ODictCaseless()
+ h["Cookie"] = [
+ "cookiename=coo=kievalue;othercookiename=othercookievalue"
+ ]
+ r = tutils.treq()
+ r.headers = h
+ result = r.get_cookies()
+ assert len(result) == 2
+ assert result['cookiename'] == ['coo=kievalue']
+ assert result['othercookiename'] == ['othercookievalue']
+
+ def test_set_cookies(self):
+ h = odict.ODictCaseless()
+ h["Cookie"] = ["cookiename=cookievalue"]
+ r = tutils.treq()
+ r.headers = h
+ result = r.get_cookies()
+ result["cookiename"] = ["foo"]
+ r.set_cookies(result)
+ assert r.get_cookies()["cookiename"] == ["foo"]
+
+
+class TestResponse(object):
+ def test_repr(self):
+ r = tutils.tresp()
+ assert "unknown content type" in repr(r)
+ r.headers["content-type"] = ["foo"]
+ assert "foo" in repr(r)
+ assert repr(tutils.tresp(content=CONTENT_MISSING))
+
+ def test_get_cookies_none(self):
+ h = odict.ODictCaseless()
+ resp = tutils.tresp()
+ resp.headers = h
+ assert not resp.get_cookies()
+
+ def test_get_cookies_simple(self):
+ h = odict.ODictCaseless()
+ h["Set-Cookie"] = ["cookiename=cookievalue"]
+ resp = tutils.tresp()
+ resp.headers = h
+ result = resp.get_cookies()
+ assert len(result) == 1
+ assert "cookiename" in result
+ assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
+
+ def test_get_cookies_with_parameters(self):
+ h = odict.ODictCaseless()
+ h["Set-Cookie"] = [
+ "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
+ resp = tutils.tresp()
+ resp.headers = h
+ result = resp.get_cookies()
+ assert len(result) == 1
+ assert "cookiename" in result
+ assert result["cookiename"][0][0] == "cookievalue"
+ attrs = result["cookiename"][0][1]
+ assert len(attrs) == 4
+ assert attrs["domain"] == ["example.com"]
+ assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
+ assert attrs["path"] == ["/"]
+ assert attrs["httponly"] == [None]
+
+ def test_get_cookies_no_value(self):
+ h = odict.ODictCaseless()
+ h["Set-Cookie"] = [
+ "cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"
+ ]
+ resp = tutils.tresp()
+ resp.headers = h
+ result = resp.get_cookies()
+ assert len(result) == 1
+ assert "cookiename" in result
+ assert result["cookiename"][0][0] == ""
+ assert len(result["cookiename"][0][1]) == 2
+
+ def test_get_cookies_twocookies(self):
+ h = odict.ODictCaseless()
+ h["Set-Cookie"] = ["cookiename=cookievalue", "othercookie=othervalue"]
+ resp = tutils.tresp()
+ resp.headers = h
+ result = resp.get_cookies()
+ assert len(result) == 2
+ assert "cookiename" in result
+ assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
+ assert "othercookie" in result
+ assert result["othercookie"][0] == ["othervalue", odict.ODict()]
+
+ def test_set_cookies(self):
+ resp = tutils.tresp()
+ v = resp.get_cookies()
+ v.add("foo", ["bar", odict.ODictCaseless()])
+ resp.set_cookies(v)
+
+ v = resp.get_cookies()
+ assert len(v) == 1
+ assert v["foo"] == [["bar", odict.ODictCaseless()]]
diff --git a/test/test_utils.py b/test/test_utils.py
index 8e66bce4..0153030c 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -1,4 +1,6 @@
-from netlib import utils
+import urlparse
+
+from netlib import utils, odict
import tutils
@@ -27,3 +29,76 @@ def test_pretty_size():
assert utils.pretty_size(1024) == "1kB"
assert utils.pretty_size(1024 + (1024 / 2.0)) == "1.5kB"
assert utils.pretty_size(1024 * 1024) == "1MB"
+
+
+
+
+def test_parse_url():
+ assert not utils.parse_url("")
+
+ u = "http://foo.com:8888/test"
+ s, h, po, pa = utils.parse_url(u)
+ assert s == "http"
+ assert h == "foo.com"
+ assert po == 8888
+ assert pa == "/test"
+
+ s, h, po, pa = utils.parse_url("http://foo/bar")
+ assert s == "http"
+ assert h == "foo"
+ assert po == 80
+ assert pa == "/bar"
+
+ s, h, po, pa = utils.parse_url("http://user:pass@foo/bar")
+ assert s == "http"
+ assert h == "foo"
+ assert po == 80
+ assert pa == "/bar"
+
+ s, h, po, pa = utils.parse_url("http://foo")
+ assert pa == "/"
+
+ s, h, po, pa = utils.parse_url("https://foo")
+ assert po == 443
+
+ assert not utils.parse_url("https://foo:bar")
+ assert not utils.parse_url("https://foo:")
+
+ # Invalid IDNA
+ assert not utils.parse_url("http://\xfafoo")
+ # Invalid PATH
+ assert not utils.parse_url("http:/\xc6/localhost:56121")
+ # Null byte in host
+ assert not utils.parse_url("http://foo\0")
+ # Port out of range
+ assert not utils.parse_url("http://foo:999999")
+ # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
+ assert not utils.parse_url('http://lo[calhost')
+
+
+def test_unparse_url():
+ assert utils.unparse_url("http", "foo.com", 99, "") == "http://foo.com:99"
+ assert utils.unparse_url("http", "foo.com", 80, "") == "http://foo.com"
+ assert utils.unparse_url("https", "foo.com", 80, "") == "https://foo.com:80"
+ assert utils.unparse_url("https", "foo.com", 443, "") == "https://foo.com"
+
+
+def test_urlencode():
+ assert utils.urlencode([('foo', 'bar')])
+
+
+
+def test_urldecode():
+ s = "one=two&three=four"
+ assert len(utils.urldecode(s)) == 2
+
+
+def test_get_header_tokens():
+ h = odict.ODictCaseless()
+ assert utils.get_header_tokens(h, "foo") == []
+ h["foo"] = ["bar"]
+ assert utils.get_header_tokens(h, "foo") == ["bar"]
+ h["foo"] = ["bar, voing"]
+ assert utils.get_header_tokens(h, "foo") == ["bar", "voing"]
+ h["foo"] = ["bar, voing", "oink"]
+ assert utils.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]