diff options
-rw-r--r-- | netlib/encoding.py | 82 | ||||
-rw-r--r-- | netlib/http/exceptions.py | 13 | ||||
-rw-r--r-- | netlib/http/http1/protocol.py | 39 | ||||
-rw-r--r-- | netlib/http/semantics.py | 366 | ||||
-rw-r--r-- | netlib/tutils.py (renamed from test/tutils.py) | 59 | ||||
-rw-r--r-- | netlib/utils.py | 100 | ||||
-rw-r--r-- | test/http/http1/test_protocol.py | 10 | ||||
-rw-r--r-- | test/http/test_exceptions.py | 6 | ||||
-rw-r--r-- | test/http/test_semantics.py | 295 | ||||
-rw-r--r-- | test/test_utils.py | 77 |
10 files changed, 885 insertions, 162 deletions
diff --git a/netlib/encoding.py b/netlib/encoding.py new file mode 100644 index 00000000..f107eb5f --- /dev/null +++ b/netlib/encoding.py @@ -0,0 +1,82 @@ +""" + Utility functions for decoding response bodies. +""" +from __future__ import absolute_import +import cStringIO +import gzip +import zlib + +__ALL__ = ["ENCODINGS"] + +ENCODINGS = set(["identity", "gzip", "deflate"]) + + +def decode(e, content): + encoding_map = { + "identity": identity, + "gzip": decode_gzip, + "deflate": decode_deflate, + } + if e not in encoding_map: + return None + return encoding_map[e](content) + + +def encode(e, content): + encoding_map = { + "identity": identity, + "gzip": encode_gzip, + "deflate": encode_deflate, + } + if e not in encoding_map: + return None + return encoding_map[e](content) + + +def identity(content): + """ + Returns content unchanged. Identity is the default value of + Accept-Encoding headers. + """ + return content + + +def decode_gzip(content): + gfile = gzip.GzipFile(fileobj=cStringIO.StringIO(content)) + try: + return gfile.read() + except (IOError, EOFError): + return None + + +def encode_gzip(content): + s = cStringIO.StringIO() + gf = gzip.GzipFile(fileobj=s, mode='wb') + gf.write(content) + gf.close() + return s.getvalue() + + +def decode_deflate(content): + """ + Returns decompressed data for DEFLATE. Some servers may respond with + compressed data without a zlib header or checksum. An undocumented + feature of zlib permits the lenient decompression of data missing both + values. + + http://bugs.python.org/issue5784 + """ + try: + try: + return zlib.decompress(content) + except zlib.error: + return zlib.decompress(content, -15) + except zlib.error: + return None + + +def encode_deflate(content): + """ + Returns compressed content, always including zlib header and checksum. + """ + return zlib.compress(content) diff --git a/netlib/http/exceptions.py b/netlib/http/exceptions.py index 8a2bbebc..45bd2dce 100644 --- a/netlib/http/exceptions.py +++ b/netlib/http/exceptions.py @@ -7,3 +7,16 @@ class HttpError(Exception): class HttpErrorConnClosed(HttpError): pass + + + +class HttpAuthenticationError(Exception): + def __init__(self, auth_headers=None): + super(HttpAuthenticationError, self).__init__( + "Proxy Authentication Required" + ) + self.headers = auth_headers + self.code = 407 + + def __repr__(self): + return "Proxy Authentication Required" diff --git a/netlib/http/http1/protocol.py b/netlib/http/http1/protocol.py index b098110a..a189bffc 100644 --- a/netlib/http/http1/protocol.py +++ b/netlib/http/http1/protocol.py @@ -375,7 +375,7 @@ class HTTP1Protocol(semantics.ProtocolMixin): @classmethod def has_chunked_encoding(self, headers): return "chunked" in [ - i.lower() for i in http.get_header_tokens(headers, "transfer-encoding") + i.lower() for i in utils.get_header_tokens(headers, "transfer-encoding") ] @@ -482,9 +482,9 @@ class HTTP1Protocol(semantics.ProtocolMixin): port = int(port) except ValueError: return None - if not http.is_valid_port(port): + if not utils.is_valid_port(port): return None - if not http.is_valid_host(host): + if not utils.is_valid_host(host): return None return host, port, httpversion @@ -496,7 +496,7 @@ class HTTP1Protocol(semantics.ProtocolMixin): return None method, url, httpversion = v - parts = http.parse_url(url) + parts = utils.parse_url(url) if not parts: return None scheme, host, port, path = parts @@ -528,7 +528,7 @@ class HTTP1Protocol(semantics.ProtocolMixin): """ # At first, check if we have an explicit Connection header. if "connection" in headers: - toks = http.get_header_tokens(headers, "connection") + toks = utils.get_header_tokens(headers, "connection") if "close" in toks: return True elif "keep-alive" in toks: @@ -556,34 +556,7 @@ class HTTP1Protocol(semantics.ProtocolMixin): @classmethod def _assemble_request_first_line(self, request): - if request.form_in == "relative": - request_line = '%s %s HTTP/%s.%s' % ( - request.method, - request.path, - request.httpversion[0], - request.httpversion[1], - ) - elif request.form_in == "authority": - request_line = '%s %s:%s HTTP/%s.%s' % ( - request.method, - request.host, - request.port, - request.httpversion[0], - request.httpversion[1], - ) - elif request.form_in == "absolute": - request_line = '%s %s://%s:%s%s HTTP/%s.%s' % ( - request.method, - request.scheme, - request.host, - request.port, - request.path, - request.httpversion[0], - request.httpversion[1], - ) - else: - raise http.HttpError(400, "Invalid request form") - return request_line + return request.legacy_first_line() def _assemble_request_headers(self, request): headers = request.headers.copy() diff --git a/netlib/http/semantics.py b/netlib/http/semantics.py index 54bf83d2..e7ae2b5f 100644 --- a/netlib/http/semantics.py +++ b/netlib/http/semantics.py @@ -3,9 +3,15 @@ import binascii import collections import string import sys +import urllib import urlparse from .. import utils, odict +from . import cookies +from netlib import utils, encoding + +HDR_FORM_URLENCODED = "application/x-www-form-urlencoded" +HDR_FORM_MULTIPART = "multipart/form-data" CONTENT_MISSING = 0 @@ -75,7 +81,240 @@ class Request(object): return False def __repr__(self): - return "Request(%s - %s, %s)" % (self.method, self.host, self.path) + # return "Request(%s - %s, %s)" % (self.method, self.host, self.path) + + return "<HTTPRequest: {0}>".format( + self.legacy_first_line()[:-9] + ) + + def legacy_first_line(self): + if self.form_in == "relative": + return '%s %s HTTP/%s.%s' % ( + self.method, + self.path, + self.httpversion[0], + self.httpversion[1], + ) + elif self.form_in == "authority": + return '%s %s:%s HTTP/%s.%s' % ( + self.method, + self.host, + self.port, + self.httpversion[0], + self.httpversion[1], + ) + elif self.form_in == "absolute": + return '%s %s://%s:%s%s HTTP/%s.%s' % ( + self.method, + self.scheme, + self.host, + self.port, + self.path, + self.httpversion[0], + self.httpversion[1], + ) + else: + raise http.HttpError(400, "Invalid request form") + + def anticache(self): + """ + Modifies this request to remove headers that might produce a cached + response. That is, we remove ETags and If-Modified-Since headers. + """ + delheaders = [ + "if-modified-since", + "if-none-match", + ] + for i in delheaders: + del self.headers[i] + + def anticomp(self): + """ + Modifies this request to remove headers that will compress the + resource's data. + """ + self.headers["accept-encoding"] = ["identity"] + + def constrain_encoding(self): + """ + Limits the permissible Accept-Encoding values, based on what we can + decode appropriately. + """ + if self.headers["accept-encoding"]: + self.headers["accept-encoding"] = [ + ', '.join( + e for e in encoding.ENCODINGS if e in self.headers["accept-encoding"][0])] + + def update_host_header(self): + """ + Update the host header to reflect the current target. + """ + self.headers["Host"] = [self.host] + + def get_form(self): + """ + Retrieves the URL-encoded or multipart form data, returning an ODict object. + Returns an empty ODict if there is no data or the content-type + indicates non-form data. + """ + if self.body: + if self.headers.in_any("content-type", HDR_FORM_URLENCODED, True): + return self.get_form_urlencoded() + elif self.headers.in_any("content-type", HDR_FORM_MULTIPART, True): + return self.get_form_multipart() + return odict.ODict([]) + + def get_form_urlencoded(self): + """ + Retrieves the URL-encoded form data, returning an ODict object. + Returns an empty ODict if there is no data or the content-type + indicates non-form data. + """ + if self.body and self.headers.in_any( + "content-type", + HDR_FORM_URLENCODED, + True): + return odict.ODict(utils.urldecode(self.body)) + return odict.ODict([]) + + def get_form_multipart(self): + if self.body and self.headers.in_any( + "content-type", + HDR_FORM_MULTIPART, + True): + return odict.ODict( + utils.multipartdecode( + self.headers, + self.body)) + return odict.ODict([]) + + def set_form_urlencoded(self, odict): + """ + Sets the body to the URL-encoded form data, and adds the + appropriate content-type header. Note that this will destory the + existing body if there is one. + """ + # FIXME: If there's an existing content-type header indicating a + # url-encoded form, leave it alone. + self.headers["Content-Type"] = [HDR_FORM_URLENCODED] + self.body = utils.urlencode(odict.lst) + + def get_path_components(self): + """ + Returns the path components of the URL as a list of strings. + + Components are unquoted. + """ + _, _, path, _, _, _ = urlparse.urlparse(self.url) + return [urllib.unquote(i) for i in path.split("/") if i] + + def set_path_components(self, lst): + """ + Takes a list of strings, and sets the path component of the URL. + + Components are quoted. + """ + lst = [urllib.quote(i, safe="") for i in lst] + path = "/" + "/".join(lst) + scheme, netloc, _, params, query, fragment = urlparse.urlparse(self.url) + self.url = urlparse.urlunparse( + [scheme, netloc, path, params, query, fragment] + ) + + def get_query(self): + """ + Gets the request query string. Returns an ODict object. + """ + _, _, _, _, query, _ = urlparse.urlparse(self.url) + if query: + return odict.ODict(utils.urldecode(query)) + return odict.ODict([]) + + def set_query(self, odict): + """ + Takes an ODict object, and sets the request query string. + """ + scheme, netloc, path, params, _, fragment = urlparse.urlparse(self.url) + query = utils.urlencode(odict.lst) + self.url = urlparse.urlunparse( + [scheme, netloc, path, params, query, fragment] + ) + + def pretty_host(self, hostheader): + """ + Heuristic to get the host of the request. + + Note that pretty_host() does not always return the TCP destination + of the request, e.g. if an upstream proxy is in place + + If hostheader is set to True, the Host: header will be used as + additional (and preferred) data source. This is handy in + transparent mode, where only the IO of the destination is known, + but not the resolved name. This is disabled by default, as an + attacker may spoof the host header to confuse an analyst. + """ + host = None + if hostheader: + host = self.headers.get_first("host") + if not host: + host = self.host + if host: + try: + return host.encode("idna") + except ValueError: + return host + else: + return None + + def pretty_url(self, hostheader): + if self.form_out == "authority": # upstream proxy mode + return "%s:%s" % (self.pretty_host(hostheader), self.port) + return utils.unparse_url(self.scheme, + self.pretty_host(hostheader), + self.port, + self.path).encode('ascii') + + def get_cookies(self): + """ + Returns a possibly empty netlib.odict.ODict object. + """ + ret = odict.ODict() + for i in self.headers["cookie"]: + ret.extend(cookies.parse_cookie_header(i)) + return ret + + def set_cookies(self, odict): + """ + Takes an netlib.odict.ODict object. Over-writes any existing Cookie + headers. + """ + v = cookies.format_cookie_header(odict) + self.headers["Cookie"] = [v] + + @property + def url(self): + """ + Returns a URL string, constructed from the Request's URL components. + """ + return utils.unparse_url( + self.scheme, + self.host, + self.port, + self.path + ).encode('ascii') + + @url.setter + def url(self, url): + """ + Parses a URL specification, and updates the Request's information + accordingly. + + Returns False if the URL was invalid, True if the request succeeded. + """ + parts = utils.parse_url(url) + if not parts: + raise ValueError("Invalid URL: %s" % url) + self.scheme, self.host, self.port, self.path = parts @property def content(self): @@ -139,7 +378,56 @@ class Response(object): return False def __repr__(self): - return "Response(%s - %s)" % (self.status_code, self.msg) + # return "Response(%s - %s)" % (self.status_code, self.msg) + + if self.body: + size = utils.pretty_size(len(self.body)) + else: + size = "content missing" + return "<HTTPResponse: {status_code} {msg} ({contenttype}, {size})>".format( + status_code=self.status_code, + msg=self.msg, + contenttype=self.headers.get_first( + "content-type", "unknown content type" + ), + size=size + ) + + + def get_cookies(self): + """ + Get the contents of all Set-Cookie headers. + + Returns a possibly empty ODict, where keys are cookie name strings, + and values are [value, attr] lists. Value is a string, and attr is + an ODictCaseless containing cookie attributes. Within attrs, unary + attributes (e.g. HTTPOnly) are indicated by a Null value. + """ + ret = [] + for header in self.headers["set-cookie"]: + v = cookies.parse_set_cookie_header(header) + if v: + name, value, attrs = v + ret.append([name, [value, attrs]]) + return odict.ODict(ret) + + def set_cookies(self, odict): + """ + Set the Set-Cookie headers on this response, over-writing existing + headers. + + Accepts an ODict of the same format as that returned by get_cookies. + """ + values = [] + for i in odict.lst: + values.append( + cookies.format_set_cookie_header( + i[0], + i[1][0], + i[1][1] + ) + ) + self.headers["Set-Cookie"] = values @property def content(self): @@ -160,77 +448,3 @@ class Response(object): def code(self, code): # TODO: remove deprecated setter self.status_code = code - - - -def is_valid_port(port): - if not 0 <= port <= 65535: - return False - return True - - -def is_valid_host(host): - try: - host.decode("idna") - except ValueError: - return False - if "\0" in host: - return None - return True - - -def parse_url(url): - """ - Returns a (scheme, host, port, path) tuple, or None on error. - - Checks that: - port is an integer 0-65535 - host is a valid IDNA-encoded hostname with no null-bytes - path is valid ASCII - """ - try: - scheme, netloc, path, params, query, fragment = urlparse.urlparse(url) - except ValueError: - return None - if not scheme: - return None - if '@' in netloc: - # FIXME: Consider what to do with the discarded credentials here Most - # probably we should extend the signature to return these as a separate - # value. - _, netloc = string.rsplit(netloc, '@', maxsplit=1) - if ':' in netloc: - host, port = string.rsplit(netloc, ':', maxsplit=1) - try: - port = int(port) - except ValueError: - return None - else: - host = netloc - if scheme == "https": - port = 443 - else: - port = 80 - path = urlparse.urlunparse(('', '', path, params, query, fragment)) - if not path.startswith("/"): - path = "/" + path - if not is_valid_host(host): - return None - if not utils.isascii(path): - return None - if not is_valid_port(port): - return None - return scheme, host, port, path - - -def get_header_tokens(headers, key): - """ - Retrieve all tokens for a header key. A number of different headers - follow a pattern where each header line can containe comma-separated - tokens, and headers can be set multiple times. - """ - toks = [] - for i in headers[key]: - for j in i.split(","): - toks.append(j.strip()) - return toks diff --git a/test/tutils.py b/netlib/tutils.py index 94139f6f..5018b9e8 100644 --- a/test/tutils.py +++ b/netlib/tutils.py @@ -1,10 +1,11 @@ import cStringIO import tempfile import os +import time import shutil from contextlib import contextmanager -from netlib import tcp, utils +from netlib import tcp, utils, odict, http def treader(bytes): @@ -66,3 +67,59 @@ def raises(exc, obj, *args, **kwargs): raise AssertionError("No exception raised. Return value: {}".format(ret)) test_data = utils.Data(__name__) + + + + +def treq(content="content", scheme="http", host="address", port=22): + """ + @return: libmproxy.protocol.http.HTTPRequest + """ + headers = odict.ODictCaseless() + headers["header"] = ["qvalue"] + req = http.Request( + "relative", + "GET", + scheme, + host, + port, + "/path", + (1, 1), + headers, + content, + None, + None, + ) + return req + + +def treq_absolute(content="content"): + """ + @return: libmproxy.protocol.http.HTTPRequest + """ + r = treq(content) + r.form_in = r.form_out = "absolute" + r.host = "address" + r.port = 22 + r.scheme = "http" + return r + + +def tresp(content="message"): + """ + @return: libmproxy.protocol.http.HTTPResponse + """ + + headers = odict.ODictCaseless() + headers["header_response"] = ["svalue"] + + resp = http.semantics.Response( + (1, 1), + 200, + "OK", + headers, + content, + time.time(), + time.time(), + ) + return resp diff --git a/netlib/utils.py b/netlib/utils.py index 86e33f33..39354605 100644 --- a/netlib/utils.py +++ b/netlib/utils.py @@ -1,5 +1,10 @@ from __future__ import (absolute_import, print_function, division) import os.path +import cgi +import urllib +import urlparse +import string + def isascii(s): try: @@ -131,6 +136,81 @@ class Data(object): return fullpath + + +def is_valid_port(port): + if not 0 <= port <= 65535: + return False + return True + + +def is_valid_host(host): + try: + host.decode("idna") + except ValueError: + return False + if "\0" in host: + return None + return True + + +def parse_url(url): + """ + Returns a (scheme, host, port, path) tuple, or None on error. + + Checks that: + port is an integer 0-65535 + host is a valid IDNA-encoded hostname with no null-bytes + path is valid ASCII + """ + try: + scheme, netloc, path, params, query, fragment = urlparse.urlparse(url) + except ValueError: + return None + if not scheme: + return None + if '@' in netloc: + # FIXME: Consider what to do with the discarded credentials here Most + # probably we should extend the signature to return these as a separate + # value. + _, netloc = string.rsplit(netloc, '@', maxsplit=1) + if ':' in netloc: + host, port = string.rsplit(netloc, ':', maxsplit=1) + try: + port = int(port) + except ValueError: + return None + else: + host = netloc + if scheme == "https": + port = 443 + else: + port = 80 + path = urlparse.urlunparse(('', '', path, params, query, fragment)) + if not path.startswith("/"): + path = "/" + path + if not is_valid_host(host): + return None + if not isascii(path): + return None + if not is_valid_port(port): + return None + return scheme, host, port, path + + +def get_header_tokens(headers, key): + """ + Retrieve all tokens for a header key. A number of different headers + follow a pattern where each header line can containe comma-separated + tokens, and headers can be set multiple times. + """ + toks = [] + for i in headers[key]: + for j in i.split(","): + toks.append(j.strip()) + return toks + + def hostport(scheme, host, port): """ Returns the host component, with a port specifcation if needed. @@ -139,3 +219,23 @@ def hostport(scheme, host, port): return host else: return "%s:%s" % (host, port) + +def unparse_url(scheme, host, port, path=""): + """ + Returns a URL string, constructed from the specified compnents. + """ + return "%s://%s%s" % (scheme, hostport(scheme, host, port), path) + + +def urlencode(s): + """ + Takes a list of (key, value) tuples and returns a urlencoded string. + """ + s = [tuple(i) for i in s] + return urllib.urlencode(s, False) + +def urldecode(s): + """ + Takes a urlencoded string and returns a list of (key, value) tuples. + """ + return cgi.parse_qsl(s, keep_blank_values=True) diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py index b196b7a3..05bad1af 100644 --- a/test/http/http1/test_protocol.py +++ b/test/http/http1/test_protocol.py @@ -75,16 +75,6 @@ def test_connection_close(): assert HTTP1Protocol.connection_close((1, 1), h) -def test_get_header_tokens(): - h = odict.ODictCaseless() - assert http.get_header_tokens(h, "foo") == [] - h["foo"] = ["bar"] - assert http.get_header_tokens(h, "foo") == ["bar"] - h["foo"] = ["bar, voing"] - assert http.get_header_tokens(h, "foo") == ["bar", "voing"] - h["foo"] = ["bar, voing", "oink"] - assert http.get_header_tokens(h, "foo") == ["bar", "voing", "oink"] - def test_read_http_body_request(): h = odict.ODictCaseless() diff --git a/test/http/test_exceptions.py b/test/http/test_exceptions.py new file mode 100644 index 00000000..aa57f831 --- /dev/null +++ b/test/http/test_exceptions.py @@ -0,0 +1,6 @@ +from netlib.http.exceptions import * + +def test_HttpAuthenticationError(): + x = HttpAuthenticationError({"foo": "bar"}) + assert str(x) + assert "foo" in x.headers diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py index c4605302..986afc39 100644 --- a/test/http/test_semantics.py +++ b/test/http/test_semantics.py @@ -1,54 +1,267 @@ import cStringIO import textwrap import binascii +from mock import MagicMock from netlib import http, odict, tcp from netlib.http import http1 +from netlib.http.semantics import CONTENT_MISSING from .. import tutils, tservers def test_httperror(): e = http.exceptions.HttpError(404, "Not found") assert str(e) +class TestRequest: + # def test_asterisk_form_in(self): + # f = tutils.tflow(req=None) + # protocol = mock_protocol("OPTIONS * HTTP/1.1") + # f.request = HTTPRequest.from_protocol(protocol) + # + # assert f.request.form_in == "relative" + # f.request.host = f.server_conn.address.host + # f.request.port = f.server_conn.address.port + # f.request.scheme = "http" + # assert protocol.assemble(f.request) == ( + # "OPTIONS * HTTP/1.1\r\n" + # "Host: address:22\r\n" + # "Content-Length: 0\r\n\r\n") + # + # def test_relative_form_in(self): + # protocol = mock_protocol("GET /foo\xff HTTP/1.1") + # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) + # + # protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c") + # r = HTTPRequest.from_protocol(protocol) + # assert r.headers["Upgrade"] == ["h2c"] + # + # def test_expect_header(self): + # protocol = mock_protocol( + # "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar") + # r = HTTPRequest.from_protocol(protocol) + # assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" + # assert r.content == "foo" + # assert protocol.tcp_handler.rfile.read(3) == "bar" + # + # def test_authority_form_in(self): + # protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1") + # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) + # + # protocol = mock_protocol("CONNECT address:22 HTTP/1.1") + # r = HTTPRequest.from_protocol(protocol) + # r.scheme, r.host, r.port = "http", "address", 22 + # assert protocol.assemble(r) == ( + # "CONNECT address:22 HTTP/1.1\r\n" + # "Host: address:22\r\n" + # "Content-Length: 0\r\n\r\n") + # assert r.pretty_url(False) == "address:22" + # + # def test_absolute_form_in(self): + # protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1") + # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) + # + # protocol = mock_protocol("GET http://address:22/ HTTP/1.1") + # r = HTTPRequest.from_protocol(protocol) + # assert protocol.assemble(r) == ( + # "GET http://address:22/ HTTP/1.1\r\n" + # "Host: address:22\r\n" + # "Content-Length: 0\r\n\r\n") + # + # def test_http_options_relative_form_in(self): + # """ + # Exercises fix for Issue #392. + # """ + # protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1") + # r = HTTPRequest.from_protocol(protocol) + # r.host = 'address' + # r.port = 80 + # r.scheme = "http" + # assert protocol.assemble(r) == ( + # "OPTIONS /secret/resource HTTP/1.1\r\n" + # "Host: address\r\n" + # "Content-Length: 0\r\n\r\n") + # + # def test_http_options_absolute_form_in(self): + # protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1") + # r = HTTPRequest.from_protocol(protocol) + # r.host = 'address' + # r.port = 80 + # r.scheme = "http" + # assert protocol.assemble(r) == ( + # "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" + # "Host: address\r\n" + # "Content-Length: 0\r\n\r\n") -def test_parse_url(): - assert not http.parse_url("") - - u = "http://foo.com:8888/test" - s, h, po, pa = http.parse_url(u) - assert s == "http" - assert h == "foo.com" - assert po == 8888 - assert pa == "/test" - - s, h, po, pa = http.parse_url("http://foo/bar") - assert s == "http" - assert h == "foo" - assert po == 80 - assert pa == "/bar" - - s, h, po, pa = http.parse_url("http://user:pass@foo/bar") - assert s == "http" - assert h == "foo" - assert po == 80 - assert pa == "/bar" - - s, h, po, pa = http.parse_url("http://foo") - assert pa == "/" - - s, h, po, pa = http.parse_url("https://foo") - assert po == 443 - - assert not http.parse_url("https://foo:bar") - assert not http.parse_url("https://foo:") - - # Invalid IDNA - assert not http.parse_url("http://\xfafoo") - # Invalid PATH - assert not http.parse_url("http:/\xc6/localhost:56121") - # Null byte in host - assert not http.parse_url("http://foo\0") - # Port out of range - assert not http.parse_url("http://foo:999999") - # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt - assert not http.parse_url('http://lo[calhost') + def test_set_url(self): + r = tutils.treq_absolute() + r.url = "https://otheraddress:42/ORLY" + assert r.scheme == "https" + assert r.host == "otheraddress" + assert r.port == 42 + assert r.path == "/ORLY" + + def test_repr(self): + r = tutils.treq() + assert repr(r) + + def test_pretty_host(self): + r = tutils.treq() + assert r.pretty_host(True) == "address" + assert r.pretty_host(False) == "address" + r.headers["host"] = ["other"] + assert r.pretty_host(True) == "other" + assert r.pretty_host(False) == "address" + r.host = None + assert r.pretty_host(True) == "other" + assert r.pretty_host(False) is None + del r.headers["host"] + assert r.pretty_host(True) is None + assert r.pretty_host(False) is None + + # Invalid IDNA + r.headers["host"] = [".disqus.com"] + assert r.pretty_host(True) == ".disqus.com" + + def test_get_form_for_urlencoded(self): + r = tutils.treq() + r.headers.add("content-type", "application/x-www-form-urlencoded") + r.get_form_urlencoded = MagicMock() + + r.get_form() + + assert r.get_form_urlencoded.called + + def test_get_form_for_multipart(self): + r = tutils.treq() + r.headers.add("content-type", "multipart/form-data") + r.get_form_multipart = MagicMock() + + r.get_form() + + assert r.get_form_multipart.called + + def test_get_cookies_none(self): + h = odict.ODictCaseless() + r = tutils.treq() + r.headers = h + assert len(r.get_cookies()) == 0 + + def test_get_cookies_single(self): + h = odict.ODictCaseless() + h["Cookie"] = ["cookiename=cookievalue"] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + assert len(result) == 1 + assert result['cookiename'] == ['cookievalue'] + + def test_get_cookies_double(self): + h = odict.ODictCaseless() + h["Cookie"] = [ + "cookiename=cookievalue;othercookiename=othercookievalue" + ] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + assert len(result) == 2 + assert result['cookiename'] == ['cookievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_get_cookies_withequalsign(self): + h = odict.ODictCaseless() + h["Cookie"] = [ + "cookiename=coo=kievalue;othercookiename=othercookievalue" + ] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + assert len(result) == 2 + assert result['cookiename'] == ['coo=kievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_set_cookies(self): + h = odict.ODictCaseless() + h["Cookie"] = ["cookiename=cookievalue"] + r = tutils.treq() + r.headers = h + result = r.get_cookies() + result["cookiename"] = ["foo"] + r.set_cookies(result) + assert r.get_cookies()["cookiename"] == ["foo"] + + +class TestResponse(object): + def test_repr(self): + r = tutils.tresp() + assert "unknown content type" in repr(r) + r.headers["content-type"] = ["foo"] + assert "foo" in repr(r) + assert repr(tutils.tresp(content=CONTENT_MISSING)) + + def test_get_cookies_none(self): + h = odict.ODictCaseless() + resp = tutils.tresp() + resp.headers = h + assert not resp.get_cookies() + + def test_get_cookies_simple(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = ["cookiename=cookievalue"] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", odict.ODict()] + + def test_get_cookies_with_parameters(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = [ + "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "cookievalue" + attrs = result["cookiename"][0][1] + assert len(attrs) == 4 + assert attrs["domain"] == ["example.com"] + assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"] + assert attrs["path"] == ["/"] + assert attrs["httponly"] == [None] + + def test_get_cookies_no_value(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = [ + "cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/" + ] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "" + assert len(result["cookiename"][0][1]) == 2 + + def test_get_cookies_twocookies(self): + h = odict.ODictCaseless() + h["Set-Cookie"] = ["cookiename=cookievalue", "othercookie=othervalue"] + resp = tutils.tresp() + resp.headers = h + result = resp.get_cookies() + assert len(result) == 2 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", odict.ODict()] + assert "othercookie" in result + assert result["othercookie"][0] == ["othervalue", odict.ODict()] + + def test_set_cookies(self): + resp = tutils.tresp() + v = resp.get_cookies() + v.add("foo", ["bar", odict.ODictCaseless()]) + resp.set_cookies(v) + + v = resp.get_cookies() + assert len(v) == 1 + assert v["foo"] == [["bar", odict.ODictCaseless()]] diff --git a/test/test_utils.py b/test/test_utils.py index 8e66bce4..0153030c 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -1,4 +1,6 @@ -from netlib import utils +import urlparse + +from netlib import utils, odict import tutils @@ -27,3 +29,76 @@ def test_pretty_size(): assert utils.pretty_size(1024) == "1kB" assert utils.pretty_size(1024 + (1024 / 2.0)) == "1.5kB" assert utils.pretty_size(1024 * 1024) == "1MB" + + + + +def test_parse_url(): + assert not utils.parse_url("") + + u = "http://foo.com:8888/test" + s, h, po, pa = utils.parse_url(u) + assert s == "http" + assert h == "foo.com" + assert po == 8888 + assert pa == "/test" + + s, h, po, pa = utils.parse_url("http://foo/bar") + assert s == "http" + assert h == "foo" + assert po == 80 + assert pa == "/bar" + + s, h, po, pa = utils.parse_url("http://user:pass@foo/bar") + assert s == "http" + assert h == "foo" + assert po == 80 + assert pa == "/bar" + + s, h, po, pa = utils.parse_url("http://foo") + assert pa == "/" + + s, h, po, pa = utils.parse_url("https://foo") + assert po == 443 + + assert not utils.parse_url("https://foo:bar") + assert not utils.parse_url("https://foo:") + + # Invalid IDNA + assert not utils.parse_url("http://\xfafoo") + # Invalid PATH + assert not utils.parse_url("http:/\xc6/localhost:56121") + # Null byte in host + assert not utils.parse_url("http://foo\0") + # Port out of range + assert not utils.parse_url("http://foo:999999") + # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt + assert not utils.parse_url('http://lo[calhost') + + +def test_unparse_url(): + assert utils.unparse_url("http", "foo.com", 99, "") == "http://foo.com:99" + assert utils.unparse_url("http", "foo.com", 80, "") == "http://foo.com" + assert utils.unparse_url("https", "foo.com", 80, "") == "https://foo.com:80" + assert utils.unparse_url("https", "foo.com", 443, "") == "https://foo.com" + + +def test_urlencode(): + assert utils.urlencode([('foo', 'bar')]) + + + +def test_urldecode(): + s = "one=two&three=four" + assert len(utils.urldecode(s)) == 2 + + +def test_get_header_tokens(): + h = odict.ODictCaseless() + assert utils.get_header_tokens(h, "foo") == [] + h["foo"] = ["bar"] + assert utils.get_header_tokens(h, "foo") == ["bar"] + h["foo"] = ["bar, voing"] + assert utils.get_header_tokens(h, "foo") == ["bar", "voing"] + h["foo"] = ["bar, voing", "oink"] + assert utils.get_header_tokens(h, "foo") == ["bar", "voing", "oink"] |