aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Kriechbaumer <thomas@kriechbaumer.name>2015-08-01 10:40:19 +0200
committerThomas Kriechbaumer <thomas@kriechbaumer.name>2015-08-01 14:37:32 +0200
commitdb02553e2afee044faac898d12bd8d1adadbcd21 (patch)
tree1e5910599b7b47c527528f03f896efd2fbf7c907
parent8a051511706e2e62c32e0f70e05ecab11d444b6f (diff)
downloadmitmproxy-db02553e2afee044faac898d12bd8d1adadbcd21.tar.gz
mitmproxy-db02553e2afee044faac898d12bd8d1adadbcd21.tar.bz2
mitmproxy-db02553e2afee044faac898d12bd8d1adadbcd21.zip
move code from mitmproxy to netlib
-rw-r--r--libmproxy/cmdline.py5
-rw-r--r--libmproxy/console/contentview.py2
-rw-r--r--libmproxy/console/flowlist.py7
-rw-r--r--libmproxy/encoding.py82
-rw-r--r--libmproxy/protocol/http.py37
-rw-r--r--libmproxy/protocol/http_wrappers.py295
-rw-r--r--libmproxy/utils.py22
-rw-r--r--test/test_console.py6
-rw-r--r--test/test_console_contentview.py6
-rw-r--r--test/test_dump.py11
-rw-r--r--test/test_flow.py76
-rw-r--r--test/test_protocol_http.py259
-rw-r--r--test/test_server.py7
-rw-r--r--test/test_utils.py16
-rw-r--r--test/tutils.py82
15 files changed, 134 insertions, 779 deletions
diff --git a/libmproxy/cmdline.py b/libmproxy/cmdline.py
index 08639f6d..d033fb76 100644
--- a/libmproxy/cmdline.py
+++ b/libmproxy/cmdline.py
@@ -2,7 +2,10 @@ from __future__ import absolute_import
import os
import re
import configargparse
+
from netlib import http
+import netlib.utils
+
from . import filt, utils, version
from .proxy import config
@@ -100,7 +103,7 @@ def parse_setheader(s):
def parse_server_spec(url):
- p = http.parse_url(url)
+ p = netlib.utils.parse_url(url)
if not p or not p[1] or p[0] not in ("http", "https"):
raise configargparse.ArgumentTypeError(
"Invalid server specification: %s" % url
diff --git a/libmproxy/console/contentview.py b/libmproxy/console/contentview.py
index e4ffcd47..8f18ea7a 100644
--- a/libmproxy/console/contentview.py
+++ b/libmproxy/console/contentview.py
@@ -225,7 +225,7 @@ class ViewURLEncoded:
content_types = ["application/x-www-form-urlencoded"]
def __call__(self, hdrs, content, limit):
- lines = utils.urldecode(content)
+ lines = netlib.utils.urldecode(content)
if lines:
body = common.format_keyvals(
[(k + ":", v) for (k, v) in lines],
diff --git a/libmproxy/console/flowlist.py b/libmproxy/console/flowlist.py
index 46cd0de1..2b77f4a3 100644
--- a/libmproxy/console/flowlist.py
+++ b/libmproxy/console/flowlist.py
@@ -1,6 +1,9 @@
from __future__ import absolute_import
import urwid
+
from netlib import http
+import netlib.utils
+
from . import common, signals
@@ -219,7 +222,7 @@ class ConnectionItem(urwid.WidgetWrap):
elif key == "U":
for f in self.state.flows:
self.state.set_flow_marked(f, False)
- signals.flowlist_change.send(self)
+ signals.flowlist_change.send(self)
elif key == "V":
if not self.flow.modified():
signals.status_message.send(message="Flow not modified.")
@@ -321,7 +324,7 @@ class FlowListBox(urwid.ListBox):
)
def new_request(self, url, method):
- parts = http.parse_url(str(url))
+ parts = netlib.utils.parse_url(str(url))
if not parts:
signals.status_message.send(message="Invalid Url")
return
diff --git a/libmproxy/encoding.py b/libmproxy/encoding.py
deleted file mode 100644
index f107eb5f..00000000
--- a/libmproxy/encoding.py
+++ /dev/null
@@ -1,82 +0,0 @@
-"""
- Utility functions for decoding response bodies.
-"""
-from __future__ import absolute_import
-import cStringIO
-import gzip
-import zlib
-
-__ALL__ = ["ENCODINGS"]
-
-ENCODINGS = set(["identity", "gzip", "deflate"])
-
-
-def decode(e, content):
- encoding_map = {
- "identity": identity,
- "gzip": decode_gzip,
- "deflate": decode_deflate,
- }
- if e not in encoding_map:
- return None
- return encoding_map[e](content)
-
-
-def encode(e, content):
- encoding_map = {
- "identity": identity,
- "gzip": encode_gzip,
- "deflate": encode_deflate,
- }
- if e not in encoding_map:
- return None
- return encoding_map[e](content)
-
-
-def identity(content):
- """
- Returns content unchanged. Identity is the default value of
- Accept-Encoding headers.
- """
- return content
-
-
-def decode_gzip(content):
- gfile = gzip.GzipFile(fileobj=cStringIO.StringIO(content))
- try:
- return gfile.read()
- except (IOError, EOFError):
- return None
-
-
-def encode_gzip(content):
- s = cStringIO.StringIO()
- gf = gzip.GzipFile(fileobj=s, mode='wb')
- gf.write(content)
- gf.close()
- return s.getvalue()
-
-
-def decode_deflate(content):
- """
- Returns decompressed data for DEFLATE. Some servers may respond with
- compressed data without a zlib header or checksum. An undocumented
- feature of zlib permits the lenient decompression of data missing both
- values.
-
- http://bugs.python.org/issue5784
- """
- try:
- try:
- return zlib.decompress(content)
- except zlib.error:
- return zlib.decompress(content, -15)
- except zlib.error:
- return None
-
-
-def encode_deflate(content):
- """
- Returns compressed content, always including zlib header and checksum.
- """
- return zlib.compress(content)
diff --git a/libmproxy/protocol/http.py b/libmproxy/protocol/http.py
index 35fd7d28..961e50a6 100644
--- a/libmproxy/protocol/http.py
+++ b/libmproxy/protocol/http.py
@@ -19,9 +19,6 @@ from .. import encoding, utils, controller, stateobject, proxy
from .http_wrappers import decoded, HTTPRequest, HTTPResponse
-HDR_FORM_URLENCODED = "application/x-www-form-urlencoded"
-HDR_FORM_MULTIPART = "multipart/form-data"
-
class KillSignal(Exception):
pass
@@ -39,7 +36,10 @@ def send_connect_request(conn, host, port, update_state=True):
odict.ODictCaseless(),
""
)
+
+ # we currently only support HTTP/1 CONNECT requests
protocol = http1.HTTP1Protocol(conn)
+
conn.send(protocol.assemble(upstream_request))
resp = HTTPResponse.from_protocol(protocol, upstream_request.method)
if resp.status_code != 200:
@@ -144,18 +144,6 @@ class HTTPFlow(Flow):
return c
-class HttpAuthenticationError(Exception):
- def __init__(self, auth_headers=None):
- super(HttpAuthenticationError, self).__init__(
- "Proxy Authentication Required"
- )
- self.headers = auth_headers
- self.code = 407
-
- def __repr__(self):
- return "Proxy Authentication Required"
-
-
class HTTPHandler(ProtocolHandler):
"""
HTTPHandler implements mitmproxys understanding of the HTTP protocol.
@@ -179,7 +167,7 @@ class HTTPHandler(ProtocolHandler):
try:
if not self.c.server_conn.protocol:
# instantiate new protocol if connection does not have one yet
- self.c.server_conn.protocol = http2.HTTP2Protocol(self.c.server_conn)
+ self.c.server_conn.protocol = http2.HTTP2Protocol(self.c.server_conn) # TODO: select correct protocol
self.c.server_conn.protocol.perform_connection_preface()
self.c.server_conn.send(self.c.server_conn.protocol.assemble(flow.request))
@@ -225,6 +213,7 @@ class HTTPHandler(ProtocolHandler):
flow.response.content = CONTENT_MISSING
else:
if isinstance(flow.server_conn.protocol, http1.HTTP1Protocol):
+ # streaming is only supported with HTTP/1 at the moment
flow.response.content = flow.server_conn.protocol.read_http_body(
flow.response.headers,
self.c.config.body_size_limit,
@@ -241,6 +230,7 @@ class HTTPHandler(ProtocolHandler):
try:
if not flow.client_conn.protocol:
# instantiate new protocol if connection does not have one yet
+ # the first request might be a CONNECT - which is currently only supported with HTTP/1
flow.client_conn.protocol = http1.HTTP1Protocol(self.c.client_conn)
req = HTTPRequest.from_protocol(
@@ -258,8 +248,8 @@ class HTTPHandler(ProtocolHandler):
)
ret = self.process_request(flow, req)
if ret:
- # CONNECT successful - upgrade to HTTP/2
# instantiate new protocol if connection does not have one yet
+ # TODO: select correct protocol
flow.client_conn.protocol = http2.HTTP2Protocol(self.c.client_conn, is_server=True)
if ret is not None:
return ret
@@ -329,7 +319,7 @@ class HTTPHandler(ProtocolHandler):
return True # Next flow please.
except (
- HttpAuthenticationError,
+ http.HttpAuthenticationError,
http.HttpError,
proxy.ProxyError,
tcp.NetLibError,
@@ -389,6 +379,7 @@ class HTTPHandler(ProtocolHandler):
pass
def send_error(self, code, message, headers):
+ # TODO: implement this again
raise NotImplementedError("todo - adapt for HTTP/2 - make use of make_error_reponse from pathod")
# response = http.status_codes.RESPONSES.get(code, "Unknown")
# html_content = """
@@ -457,6 +448,9 @@ class HTTPHandler(ProtocolHandler):
self.c.set_server_address((request.host, request.port))
# Update server_conn attribute on the flow
flow.server_conn = self.c.server_conn
+
+ # since we currently only support HTTP/1 CONNECT requests
+ # the response must be HTTP/1 as well
self.c.client_conn.send(
('HTTP/%s.%s 200 ' % (request.httpversion[0], request.httpversion[1])) +
'Connection established\r\n' +
@@ -495,7 +489,7 @@ class HTTPHandler(ProtocolHandler):
400,
"Invalid request: No host information"
)
- p = http.parse_url("http://" + h)
+ p = netlib.utils.parse_url("http://" + h)
request.scheme = p[0]
request.host = p[1]
request.port = p[2]
@@ -602,6 +596,9 @@ class HTTPHandler(ProtocolHandler):
Checks if the connection should be closed depending on the HTTP
semantics. Returns True, if so.
"""
+
+ # TODO: add logic for HTTP/2
+
close_connection = (
http1.HTTP1Protocol.connection_close(
flow.request.httpversion,
@@ -684,7 +681,7 @@ class HTTPHandler(ProtocolHandler):
if self.c.config.authenticator.authenticate(request.headers):
self.c.config.authenticator.clean(request.headers)
else:
- raise HttpAuthenticationError(
+ raise http.HttpAuthenticationError(
self.c.config.authenticator.auth_challenge_headers())
return request.headers
diff --git a/libmproxy/protocol/http_wrappers.py b/libmproxy/protocol/http_wrappers.py
index 18a355dc..758ebfe0 100644
--- a/libmproxy/protocol/http_wrappers.py
+++ b/libmproxy/protocol/http_wrappers.py
@@ -8,18 +8,13 @@ import urlparse
from email.utils import parsedate_tz, formatdate, mktime_tz
import netlib
-from netlib import http, tcp, odict, utils
+from netlib import http, tcp, odict, utils, encoding
from netlib.http import cookies, semantics, http1
from .tcp import TCPHandler
from .primitives import KILL, ProtocolHandler, Flow, Error
from ..proxy.connection import ServerConnection
-from .. import encoding, utils, controller, stateobject, proxy
-
-
-HDR_FORM_URLENCODED = "application/x-www-form-urlencoded"
-HDR_FORM_MULTIPART = "multipart/form-data"
-CONTENT_MISSING = 0
+from .. import utils, controller, stateobject, proxy
class decoded(object):
@@ -249,12 +244,6 @@ class HTTPRequest(MessageMixin, semantics.Request):
f.load_state(state)
return f
- def __repr__(self):
- return "<HTTPRequest: {0}>".format(
- # just for visualisation purposes we use HTTP/1 protocol here
- http.http1.HTTP1Protocol._assemble_request_first_line(self)[:-9]
- )
-
@classmethod
def from_protocol(
self,
@@ -281,211 +270,26 @@ class HTTPRequest(MessageMixin, semantics.Request):
req.timestamp_end,
)
+ @classmethod
+ def wrap(self, request):
+ return HTTPRequest(
+ form_in=request.form_in,
+ method=request.method,
+ scheme=request.scheme,
+ host=request.host,
+ port=request.port,
+ path=request.path,
+ httpversion=request.httpversion,
+ headers=request.headers,
+ body=request.body,
+ timestamp_start=request.timestamp_start,
+ timestamp_end=request.timestamp_end,
+ form_out=(request.form_out if hasattr(request, 'form_out') else None),
+ )
def __hash__(self):
return id(self)
- def anticache(self):
- """
- Modifies this request to remove headers that might produce a cached
- response. That is, we remove ETags and If-Modified-Since headers.
- """
- delheaders = [
- "if-modified-since",
- "if-none-match",
- ]
- for i in delheaders:
- del self.headers[i]
-
- def anticomp(self):
- """
- Modifies this request to remove headers that will compress the
- resource's data.
- """
- self.headers["accept-encoding"] = ["identity"]
-
- def constrain_encoding(self):
- """
- Limits the permissible Accept-Encoding values, based on what we can
- decode appropriately.
- """
- if self.headers["accept-encoding"]:
- self.headers["accept-encoding"] = [
- ', '.join(
- e for e in encoding.ENCODINGS if e in self.headers["accept-encoding"][0])]
-
- def update_host_header(self):
- """
- Update the host header to reflect the current target.
- """
- self.headers["Host"] = [self.host]
-
- def get_form(self):
- """
- Retrieves the URL-encoded or multipart form data, returning an ODict object.
- Returns an empty ODict if there is no data or the content-type
- indicates non-form data.
- """
- if self.body:
- if self.headers.in_any("content-type", HDR_FORM_URLENCODED, True):
- return self.get_form_urlencoded()
- elif self.headers.in_any("content-type", HDR_FORM_MULTIPART, True):
- return self.get_form_multipart()
- return odict.ODict([])
-
- def get_form_urlencoded(self):
- """
- Retrieves the URL-encoded form data, returning an ODict object.
- Returns an empty ODict if there is no data or the content-type
- indicates non-form data.
- """
- if self.body and self.headers.in_any(
- "content-type",
- HDR_FORM_URLENCODED,
- True):
- return odict.ODict(utils.urldecode(self.body))
- return odict.ODict([])
-
- def get_form_multipart(self):
- if self.body and self.headers.in_any(
- "content-type",
- HDR_FORM_MULTIPART,
- True):
- return odict.ODict(
- utils.multipartdecode(
- self.headers,
- self.body))
- return odict.ODict([])
-
- def set_form_urlencoded(self, odict):
- """
- Sets the body to the URL-encoded form data, and adds the
- appropriate content-type header. Note that this will destory the
- existing body if there is one.
- """
- # FIXME: If there's an existing content-type header indicating a
- # url-encoded form, leave it alone.
- self.headers["Content-Type"] = [HDR_FORM_URLENCODED]
- self.body = utils.urlencode(odict.lst)
-
- def get_path_components(self):
- """
- Returns the path components of the URL as a list of strings.
-
- Components are unquoted.
- """
- _, _, path, _, _, _ = urlparse.urlparse(self.url)
- return [urllib.unquote(i) for i in path.split("/") if i]
-
- def set_path_components(self, lst):
- """
- Takes a list of strings, and sets the path component of the URL.
-
- Components are quoted.
- """
- lst = [urllib.quote(i, safe="") for i in lst]
- path = "/" + "/".join(lst)
- scheme, netloc, _, params, query, fragment = urlparse.urlparse(self.url)
- self.url = urlparse.urlunparse(
- [scheme, netloc, path, params, query, fragment]
- )
-
- def get_query(self):
- """
- Gets the request query string. Returns an ODict object.
- """
- _, _, _, _, query, _ = urlparse.urlparse(self.url)
- if query:
- return odict.ODict(utils.urldecode(query))
- return odict.ODict([])
-
- def set_query(self, odict):
- """
- Takes an ODict object, and sets the request query string.
- """
- scheme, netloc, path, params, _, fragment = urlparse.urlparse(self.url)
- query = utils.urlencode(odict.lst)
- self.url = urlparse.urlunparse(
- [scheme, netloc, path, params, query, fragment]
- )
-
- def pretty_host(self, hostheader):
- """
- Heuristic to get the host of the request.
-
- Note that pretty_host() does not always return the TCP destination
- of the request, e.g. if an upstream proxy is in place
-
- If hostheader is set to True, the Host: header will be used as
- additional (and preferred) data source. This is handy in
- transparent mode, where only the IO of the destination is known,
- but not the resolved name. This is disabled by default, as an
- attacker may spoof the host header to confuse an analyst.
- """
- host = None
- if hostheader:
- host = self.headers.get_first("host")
- if not host:
- host = self.host
- if host:
- try:
- return host.encode("idna")
- except ValueError:
- return host
- else:
- return None
-
- def pretty_url(self, hostheader):
- if self.form_out == "authority": # upstream proxy mode
- return "%s:%s" % (self.pretty_host(hostheader), self.port)
- return utils.unparse_url(self.scheme,
- self.pretty_host(hostheader),
- self.port,
- self.path).encode('ascii')
-
- @property
- def url(self):
- """
- Returns a URL string, constructed from the Request's URL components.
- """
- return utils.unparse_url(
- self.scheme,
- self.host,
- self.port,
- self.path
- ).encode('ascii')
-
- @url.setter
- def url(self, url):
- """
- Parses a URL specification, and updates the Request's information
- accordingly.
-
- Returns False if the URL was invalid, True if the request succeeded.
- """
- parts = http.parse_url(url)
- if not parts:
- raise ValueError("Invalid URL: %s" % url)
- self.scheme, self.host, self.port, self.path = parts
-
- def get_cookies(self):
- """
-
- Returns a possibly empty netlib.odict.ODict object.
- """
- ret = odict.ODict()
- for i in self.headers["cookie"]:
- ret.extend(cookies.parse_cookie_header(i))
- return ret
-
- def set_cookies(self, odict):
- """
- Takes an netlib.odict.ODict object. Over-writes any existing Cookie
- headers.
- """
- v = cookies.format_cookie_header(odict)
- self.headers["Cookie"] = [v]
-
def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in the headers, the
@@ -552,7 +356,7 @@ class HTTPResponse(MessageMixin, semantics.Response):
_stateobject_attributes = MessageMixin._stateobject_attributes.copy()
_stateobject_attributes.update(
- code=int,
+ status_code=int,
msg=str
)
@@ -567,20 +371,6 @@ class HTTPResponse(MessageMixin, semantics.Response):
f.load_state(state)
return f
- def __repr__(self):
- if self.body:
- size = netlib.utils.pretty_size(len(self.body))
- else:
- size = "content missing"
- return "<HTTPResponse: {status_code} {msg} ({contenttype}, {size})>".format(
- status_code=self.status_code,
- msg=self.msg,
- contenttype=self.headers.get_first(
- "content-type", "unknown content type"
- ),
- size=size
- )
-
@classmethod
def from_protocol(
self,
@@ -605,6 +395,18 @@ class HTTPResponse(MessageMixin, semantics.Response):
resp.timestamp_end,
)
+ @classmethod
+ def wrap(self, response):
+ return HTTPResponse(
+ httpversion=response.httpversion,
+ status_code=response.status_code,
+ msg=response.msg,
+ headers=response.headers,
+ body=response.body,
+ timestamp_start=response.timestamp_start,
+ timestamp_end=response.timestamp_end,
+ )
+
def _refresh_cookie(self, c, delta):
"""
Takes a cookie string c and a time delta in seconds, and returns
@@ -654,38 +456,3 @@ class HTTPResponse(MessageMixin, semantics.Response):
c.append(self._refresh_cookie(i, delta))
if c:
self.headers["set-cookie"] = c
-
- def get_cookies(self):
- """
- Get the contents of all Set-Cookie headers.
-
- Returns a possibly empty ODict, where keys are cookie name strings,
- and values are [value, attr] lists. Value is a string, and attr is
- an ODictCaseless containing cookie attributes. Within attrs, unary
- attributes (e.g. HTTPOnly) are indicated by a Null value.
- """
- ret = []
- for header in self.headers["set-cookie"]:
- v = http.cookies.parse_set_cookie_header(header)
- if v:
- name, value, attrs = v
- ret.append([name, [value, attrs]])
- return odict.ODict(ret)
-
- def set_cookies(self, odict):
- """
- Set the Set-Cookie headers on this response, over-writing existing
- headers.
-
- Accepts an ODict of the same format as that returned by get_cookies.
- """
- values = []
- for i in odict.lst:
- values.append(
- http.cookies.format_set_cookie_header(
- i[0],
- i[1][0],
- i[1][1]
- )
- )
- self.headers["Set-Cookie"] = values
diff --git a/libmproxy/utils.py b/libmproxy/utils.py
index 78f74767..22ab4344 100644
--- a/libmproxy/utils.py
+++ b/libmproxy/utils.py
@@ -61,21 +61,6 @@ def pretty_json(s):
return json.dumps(p, sort_keys=True, indent=4).split("\n")
-def urldecode(s):
- """
- Takes a urlencoded string and returns a list of (key, value) tuples.
- """
- return cgi.parse_qsl(s, keep_blank_values=True)
-
-
-def urlencode(s):
- """
- Takes a list of (key, value) tuples and returns a urlencoded string.
- """
- s = [tuple(i) for i in s]
- return urllib.urlencode(s, False)
-
-
def multipartdecode(hdrs, content):
"""
Takes a multipart boundary encoded string and returns list of (key, value) tuples.
@@ -197,13 +182,6 @@ def parse_content_type(c):
-def unparse_url(scheme, host, port, path=""):
- """
- Returns a URL string, constructed from the specified compnents.
- """
- return "%s://%s%s" % (scheme, netlib.utils.hostport(scheme, host, port), path)
-
-
def clean_hanging_newline(t):
"""
Many editors will silently add a newline to the final line of a
diff --git a/test/test_console.py b/test/test_console.py
index ed8408a5..3304fdbb 100644
--- a/test/test_console.py
+++ b/test/test_console.py
@@ -4,6 +4,8 @@ import mock
import gc
from os.path import normpath
import mock_urwid
+
+import netlib.tutils
from libmproxy import console
from libmproxy.console import common
@@ -60,13 +62,13 @@ class TestConsoleState:
def _add_response(self, state):
f = self._add_request(state)
- f.response = tutils.tresp()
+ f.response = netlib.tutils.tresp()
state.update_flow(f)
def test_add_response(self):
c = console.ConsoleState()
f = self._add_request(c)
- f.response = tutils.tresp()
+ f.response = netlib.tutils.tresp()
c.focus = None
c.update_flow(f)
diff --git a/test/test_console_contentview.py b/test/test_console_contentview.py
index f2d82419..b98e1021 100644
--- a/test/test_console_contentview.py
+++ b/test/test_console_contentview.py
@@ -4,7 +4,9 @@ if os.name == "nt":
raise SkipTest("Skipped on Windows.")
import sys
+import netlib.utils
from netlib import odict
+
import libmproxy.console.contentview as cv
from libmproxy import utils, flow, encoding
import tutils
@@ -65,10 +67,10 @@ class TestContentView:
assert f[0].startswith("XML")
def test_view_urlencoded(self):
- d = utils.urlencode([("one", "two"), ("three", "four")])
+ d = netlib.utils.urlencode([("one", "two"), ("three", "four")])
v = cv.ViewURLEncoded()
assert v([], d, 100)
- d = utils.urlencode([("adsfa", "")])
+ d = netlib.utils.urlencode([("adsfa", "")])
v = cv.ViewURLEncoded()
assert v([], d, 100)
diff --git a/test/test_dump.py b/test/test_dump.py
index 46c832d3..b3d724a5 100644
--- a/test/test_dump.py
+++ b/test/test_dump.py
@@ -1,17 +1,18 @@
import os
from cStringIO import StringIO
+import netlib.tutils
from netlib.http.semantics import CONTENT_MISSING
from libmproxy import dump, flow
-from libmproxy.protocol import http
+from libmproxy.protocol import http, http_wrappers
from libmproxy.proxy.primitives import Log
import tutils
import mock
def test_strfuncs():
- t = tutils.tresp()
+ t = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
t.is_replay = True
dump.str_response(t)
@@ -26,14 +27,14 @@ def test_strfuncs():
class TestDumpMaster:
def _cycle(self, m, content):
- f = tutils.tflow(req=tutils.treq(content))
+ f = tutils.tflow(req=netlib.tutils.treq(content))
l = Log("connect")
l.reply = mock.MagicMock()
m.handle_log(l)
m.handle_clientconnect(f.client_conn)
m.handle_serverconnect(f.server_conn)
m.handle_request(f)
- f.response = tutils.tresp(content)
+ f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp(content))
f = m.handle_response(f)
m.handle_clientdisconnect(f.client_conn)
return f
@@ -70,7 +71,7 @@ class TestDumpMaster:
f = tutils.tflow()
f.request.content = CONTENT_MISSING
m.handle_request(f)
- f.response = tutils.tresp()
+ f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
f.response.content = CONTENT_MISSING
m.handle_response(f)
assert "content missing" in cs.getvalue()
diff --git a/test/test_flow.py b/test/test_flow.py
index c72a583c..711688da 100644
--- a/test/test_flow.py
+++ b/test/test_flow.py
@@ -3,18 +3,20 @@ import time
import os.path
from cStringIO import StringIO
import email.utils
+import mock
+import netlib.utils
from netlib import odict
-from netlib.http.semantics import CONTENT_MISSING
+from netlib.http.semantics import CONTENT_MISSING, HDR_FORM_URLENCODED, HDR_FORM_MULTIPART
from libmproxy import filt, protocol, controller, utils, tnetstring, flow
+from libmproxy.protocol import http_wrappers
from libmproxy.protocol.primitives import Error, Flow
from libmproxy.protocol.http import decoded
from libmproxy.proxy.config import HostMatcher
from libmproxy.proxy import ProxyConfig
from libmproxy.proxy.server import DummyServer
from libmproxy.proxy.connection import ClientConnection
-import mock
import tutils
@@ -22,7 +24,7 @@ def test_app_registry():
ar = flow.AppRegistry()
ar.add("foo", "domain", 80)
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
r.host = "domain"
r.port = 80
assert ar.get(r)
@@ -30,7 +32,7 @@ def test_app_registry():
r.port = 81
assert not ar.get(r)
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
r.host = "domain2"
r.port = 80
assert not ar.get(r)
@@ -41,7 +43,7 @@ def test_app_registry():
class TestStickyCookieState:
def _response(self, cookie, host):
s = flow.StickyCookieState(filt.parse(".*"))
- f = tutils.tflow(req=tutils.treq(host=host, port=80), resp=True)
+ f = tutils.tflow(req=netlib.tutils.treq(host=host, port=80), resp=True)
f.response.headers["Set-Cookie"] = [cookie]
s.handle_response(f)
return s, f
@@ -383,7 +385,7 @@ class TestFlow:
def test_backup(self):
f = tutils.tflow()
- f.response = tutils.tresp()
+ f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
f.request.content = "foo"
assert not f.modified()
f.backup()
@@ -516,16 +518,16 @@ class TestState:
assert c.add_flow(newf)
assert c.active_flow_count() == 2
- f.response = tutils.tresp()
+ f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
assert c.update_flow(f)
assert c.flow_count() == 2
assert c.active_flow_count() == 1
- _ = tutils.tresp()
+ _ = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
assert not c.update_flow(None)
assert c.active_flow_count() == 1
- newf.response = tutils.tresp()
+ newf.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
assert c.update_flow(newf)
assert c.active_flow_count() == 0
@@ -557,7 +559,7 @@ class TestState:
c.set_limit("~s")
assert c.limit_txt == "~s"
assert len(c.view) == 0
- f.response = tutils.tresp()
+ f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
c.update_flow(f)
assert len(c.view) == 1
c.set_limit(None)
@@ -589,7 +591,7 @@ class TestState:
def _add_response(self, state):
f = tutils.tflow()
state.add_flow(f)
- f.response = tutils.tresp()
+ f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
state.update_flow(f)
def _add_error(self, state):
@@ -807,11 +809,11 @@ class TestFlowMaster:
fm.anticomp = True
f = tutils.tflow(req=None)
fm.handle_clientconnect(f.client_conn)
- f.request = tutils.treq()
+ f.request = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
fm.handle_request(f)
assert s.flow_count() == 1
- f.response = tutils.tresp()
+ f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
fm.handle_response(f)
assert not fm.handle_response(None)
assert s.flow_count() == 1
@@ -856,7 +858,7 @@ class TestFlowMaster:
s = flow.State()
f = tutils.tflow()
- f.response = tutils.tresp(f.request)
+ f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp(f.request))
pb = [f]
fm = flow.FlowMaster(None, s)
@@ -910,7 +912,7 @@ class TestFlowMaster:
def test_server_playback_kill(self):
s = flow.State()
f = tutils.tflow()
- f.response = tutils.tresp(f.request)
+ f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp(f.request))
pb = [f]
fm = flow.FlowMaster(None, s)
fm.refresh_server_playback = True
@@ -1009,7 +1011,7 @@ class TestRequest:
assert r.get_state() == r2.get_state()
def test_get_url(self):
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
assert r.url == "http://address:22/path"
@@ -1030,7 +1032,7 @@ class TestRequest:
assert r.pretty_url(True) == "https://foo.com:22/path"
def test_path_components(self):
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
r.path = "/"
assert r.get_path_components() == []
r.path = "/foo/bar"
@@ -1050,8 +1052,8 @@ class TestRequest:
def test_getset_form_urlencoded(self):
d = odict.ODict([("one", "two"), ("three", "four")])
- r = tutils.treq(content=utils.urlencode(d.lst))
- r.headers["content-type"] = [protocol.http.HDR_FORM_URLENCODED]
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq(content=netlib.utils.urlencode(d.lst)))
+ r.headers["content-type"] = [HDR_FORM_URLENCODED]
assert r.get_form_urlencoded() == d
d = odict.ODict([("x", "y")])
@@ -1064,7 +1066,7 @@ class TestRequest:
def test_getset_query(self):
h = odict.ODictCaseless()
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
r.path = "/foo?x=y&a=b"
q = r.get_query()
assert q.lst == [("x", "y"), ("a", "b")]
@@ -1087,7 +1089,7 @@ class TestRequest:
def test_anticache(self):
h = odict.ODictCaseless()
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
r.headers = h
h["if-modified-since"] = ["test"]
h["if-none-match"] = ["test"]
@@ -1096,7 +1098,7 @@ class TestRequest:
assert not "if-none-match" in r.headers
def test_replace(self):
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
r.path = "path/foo"
r.headers["Foo"] = ["fOo"]
r.content = "afoob"
@@ -1106,31 +1108,31 @@ class TestRequest:
assert r.headers["boo"] == ["boo"]
def test_constrain_encoding(self):
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
r.headers["accept-encoding"] = ["gzip", "oink"]
r.constrain_encoding()
assert "oink" not in r.headers["accept-encoding"]
def test_decodeencode(self):
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
r.content = "falafel"
assert not r.decode()
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("identity")
assert r.headers["content-encoding"] == ["identity"]
assert r.content == "falafel"
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("gzip")
@@ -1141,7 +1143,7 @@ class TestRequest:
assert r.content == "falafel"
def test_get_decoded_content(self):
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
r.content = None
r.headers["content-encoding"] = ["identity"]
assert r.get_decoded_content() == None
@@ -1153,7 +1155,7 @@ class TestRequest:
def test_get_content_type(self):
h = odict.ODictCaseless()
h["Content-Type"] = ["text/plain"]
- resp = tutils.tresp()
+ resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
resp.headers = h
assert resp.headers.get_first("content-type") == "text/plain"
@@ -1166,7 +1168,7 @@ class TestResponse:
assert resp2.get_state() == resp.get_state()
def test_refresh(self):
- r = tutils.tresp()
+ r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
n = time.time()
r.headers["date"] = [email.utils.formatdate(n)]
pre = r.headers["date"]
@@ -1184,7 +1186,7 @@ class TestResponse:
r.refresh()
def test_refresh_cookie(self):
- r = tutils.tresp()
+ r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
# Invalid expires format, sent to us by Reddit.
c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/"
@@ -1194,7 +1196,7 @@ class TestResponse:
assert "00:21:38" in r._refresh_cookie(c, 60)
def test_replace(self):
- r = tutils.tresp()
+ r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
r.headers["Foo"] = ["fOo"]
r.content = "afoob"
assert r.replace("foo(?i)", "boo") == 3
@@ -1202,21 +1204,21 @@ class TestResponse:
assert r.headers["boo"] == ["boo"]
def test_decodeencode(self):
- r = tutils.tresp()
+ r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
assert r.decode()
assert not r.headers["content-encoding"]
assert r.content == "falafel"
- r = tutils.tresp()
+ r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("identity")
assert r.headers["content-encoding"] == ["identity"]
assert r.content == "falafel"
- r = tutils.tresp()
+ r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
r.encode("gzip")
@@ -1233,7 +1235,7 @@ class TestResponse:
def test_get_content_type(self):
h = odict.ODictCaseless()
h["Content-Type"] = ["text/plain"]
- resp = tutils.tresp()
+ resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
resp.headers = h
assert resp.headers.get_first("content-type") == "text/plain"
@@ -1277,7 +1279,7 @@ class TestClientConnection:
def test_decoded():
- r = tutils.treq()
+ r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
assert r.content == "content"
assert not r.headers["content-encoding"]
r.encode("gzip")
diff --git a/test/test_protocol_http.py b/test/test_protocol_http.py
index 75f0a7b9..81c39f09 100644
--- a/test/test_protocol_http.py
+++ b/test/test_protocol_http.py
@@ -17,14 +17,7 @@ def mock_protocol(data='', chunked=False):
return http1.HTTP1Protocol(rfile=rfile, wfile=wfile)
-
-def test_HttpAuthenticationError():
- x = HttpAuthenticationError({"foo": "bar"})
- assert str(x)
- assert "foo" in x.headers
-
-
-# TODO: move test to netlib
+# TODO: move test to netlib http1 protocol
# def test_stripped_chunked_encoding_no_content():
# """
# https://github.com/mitmproxy/mitmproxy/issues/186
@@ -38,183 +31,6 @@ def test_HttpAuthenticationError():
# assert "Content-Length" in r._assemble_headers()
#
-class TestHTTPRequest:
- def test_asterisk_form_in(self):
- f = tutils.tflow(req=None)
- protocol = mock_protocol("OPTIONS * HTTP/1.1")
- f.request = HTTPRequest.from_protocol(protocol)
-
- assert f.request.form_in == "relative"
- f.request.host = f.server_conn.address.host
- f.request.port = f.server_conn.address.port
- f.request.scheme = "http"
- assert protocol.assemble(f.request) == (
- "OPTIONS * HTTP/1.1\r\n"
- "Host: address:22\r\n"
- "Content-Length: 0\r\n\r\n")
-
- def test_relative_form_in(self):
- protocol = mock_protocol("GET /foo\xff HTTP/1.1")
- tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
-
- protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c")
- r = HTTPRequest.from_protocol(protocol)
- assert r.headers["Upgrade"] == ["h2c"]
-
- def test_expect_header(self):
- protocol = mock_protocol(
- "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar")
- r = HTTPRequest.from_protocol(protocol)
- assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
- assert r.content == "foo"
- assert protocol.tcp_handler.rfile.read(3) == "bar"
-
- def test_authority_form_in(self):
- protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1")
- tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
-
- protocol = mock_protocol("CONNECT address:22 HTTP/1.1")
- r = HTTPRequest.from_protocol(protocol)
- r.scheme, r.host, r.port = "http", "address", 22
- assert protocol.assemble(r) == (
- "CONNECT address:22 HTTP/1.1\r\n"
- "Host: address:22\r\n"
- "Content-Length: 0\r\n\r\n")
- assert r.pretty_url(False) == "address:22"
-
- def test_absolute_form_in(self):
- protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1")
- tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
-
- protocol = mock_protocol("GET http://address:22/ HTTP/1.1")
- r = HTTPRequest.from_protocol(protocol)
- assert protocol.assemble(r) == (
- "GET http://address:22/ HTTP/1.1\r\n"
- "Host: address:22\r\n"
- "Content-Length: 0\r\n\r\n")
-
- def test_http_options_relative_form_in(self):
- """
- Exercises fix for Issue #392.
- """
- protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1")
- r = HTTPRequest.from_protocol(protocol)
- r.host = 'address'
- r.port = 80
- r.scheme = "http"
- assert protocol.assemble(r) == (
- "OPTIONS /secret/resource HTTP/1.1\r\n"
- "Host: address\r\n"
- "Content-Length: 0\r\n\r\n")
-
- def test_http_options_absolute_form_in(self):
- protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1")
- r = HTTPRequest.from_protocol(protocol)
- r.host = 'address'
- r.port = 80
- r.scheme = "http"
- assert protocol.assemble(r) == (
- "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n"
- "Host: address\r\n"
- "Content-Length: 0\r\n\r\n")
-
- def test_set_url(self):
- r = tutils.treq_absolute()
- r.url = "https://otheraddress:42/ORLY"
- assert r.scheme == "https"
- assert r.host == "otheraddress"
- assert r.port == 42
- assert r.path == "/ORLY"
-
- def test_repr(self):
- r = tutils.treq()
- assert repr(r)
-
- def test_pretty_host(self):
- r = tutils.treq()
- assert r.pretty_host(True) == "address"
- assert r.pretty_host(False) == "address"
- r.headers["host"] = ["other"]
- assert r.pretty_host(True) == "other"
- assert r.pretty_host(False) == "address"
- r.host = None
- assert r.pretty_host(True) == "other"
- assert r.pretty_host(False) is None
- del r.headers["host"]
- assert r.pretty_host(True) is None
- assert r.pretty_host(False) is None
-
- # Invalid IDNA
- r.headers["host"] = [".disqus.com"]
- assert r.pretty_host(True) == ".disqus.com"
-
- def test_get_form_for_urlencoded(self):
- r = tutils.treq()
- r.headers.add("content-type", "application/x-www-form-urlencoded")
- r.get_form_urlencoded = MagicMock()
-
- r.get_form()
-
- assert r.get_form_urlencoded.called
-
- def test_get_form_for_multipart(self):
- r = tutils.treq()
- r.headers.add("content-type", "multipart/form-data")
- r.get_form_multipart = MagicMock()
-
- r.get_form()
-
- assert r.get_form_multipart.called
-
- def test_get_cookies_none(self):
- h = odict.ODictCaseless()
- r = tutils.treq()
- r.headers = h
- assert len(r.get_cookies()) == 0
-
- def test_get_cookies_single(self):
- h = odict.ODictCaseless()
- h["Cookie"] = ["cookiename=cookievalue"]
- r = tutils.treq()
- r.headers = h
- result = r.get_cookies()
- assert len(result) == 1
- assert result['cookiename'] == ['cookievalue']
-
- def test_get_cookies_double(self):
- h = odict.ODictCaseless()
- h["Cookie"] = [
- "cookiename=cookievalue;othercookiename=othercookievalue"
- ]
- r = tutils.treq()
- r.headers = h
- result = r.get_cookies()
- assert len(result) == 2
- assert result['cookiename'] == ['cookievalue']
- assert result['othercookiename'] == ['othercookievalue']
-
- def test_get_cookies_withequalsign(self):
- h = odict.ODictCaseless()
- h["Cookie"] = [
- "cookiename=coo=kievalue;othercookiename=othercookievalue"
- ]
- r = tutils.treq()
- r.headers = h
- result = r.get_cookies()
- assert len(result) == 2
- assert result['cookiename'] == ['coo=kievalue']
- assert result['othercookiename'] == ['othercookievalue']
-
- def test_set_cookies(self):
- h = odict.ODictCaseless()
- h["Cookie"] = ["cookiename=cookievalue"]
- r = tutils.treq()
- r.headers = h
- result = r.get_cookies()
- result["cookiename"] = ["foo"]
- r.set_cookies(result)
- assert r.get_cookies()["cookiename"] == ["foo"]
-
class TestHTTPResponse:
def test_read_from_stringio(self):
@@ -241,80 +57,7 @@ class TestHTTPResponse:
HTTPResponse.from_protocol, protocol, "GET"
)
- def test_repr(self):
- r = tutils.tresp()
- assert "unknown content type" in repr(r)
- r.headers["content-type"] = ["foo"]
- assert "foo" in repr(r)
- assert repr(tutils.tresp(content=CONTENT_MISSING))
-
- def test_get_cookies_none(self):
- h = odict.ODictCaseless()
- resp = tutils.tresp()
- resp.headers = h
- assert not resp.get_cookies()
-
- def test_get_cookies_simple(self):
- h = odict.ODictCaseless()
- h["Set-Cookie"] = ["cookiename=cookievalue"]
- resp = tutils.tresp()
- resp.headers = h
- result = resp.get_cookies()
- assert len(result) == 1
- assert "cookiename" in result
- assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
-
- def test_get_cookies_with_parameters(self):
- h = odict.ODictCaseless()
- h["Set-Cookie"] = [
- "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
- resp = tutils.tresp()
- resp.headers = h
- result = resp.get_cookies()
- assert len(result) == 1
- assert "cookiename" in result
- assert result["cookiename"][0][0] == "cookievalue"
- attrs = result["cookiename"][0][1]
- assert len(attrs) == 4
- assert attrs["domain"] == ["example.com"]
- assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
- assert attrs["path"] == ["/"]
- assert attrs["httponly"] == [None]
-
- def test_get_cookies_no_value(self):
- h = odict.ODictCaseless()
- h["Set-Cookie"] = [
- "cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"
- ]
- resp = tutils.tresp()
- resp.headers = h
- result = resp.get_cookies()
- assert len(result) == 1
- assert "cookiename" in result
- assert result["cookiename"][0][0] == ""
- assert len(result["cookiename"][0][1]) == 2
-
- def test_get_cookies_twocookies(self):
- h = odict.ODictCaseless()
- h["Set-Cookie"] = ["cookiename=cookievalue", "othercookie=othervalue"]
- resp = tutils.tresp()
- resp.headers = h
- result = resp.get_cookies()
- assert len(result) == 2
- assert "cookiename" in result
- assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
- assert "othercookie" in result
- assert result["othercookie"][0] == ["othervalue", odict.ODict()]
-
- def test_set_cookies(self):
- resp = tutils.tresp()
- v = resp.get_cookies()
- v.add("foo", ["bar", odict.ODictCaseless()])
- resp.set_cookies(v)
- v = resp.get_cookies()
- assert len(v) == 1
- assert v["foo"] == [["bar", odict.ODictCaseless()]]
class TestHTTPFlow(object):
diff --git a/test/test_server.py b/test/test_server.py
index 27b8aad3..77ba4576 100644
--- a/test/test_server.py
+++ b/test/test_server.py
@@ -2,6 +2,7 @@ import socket
import time
from OpenSSL import SSL
+import netlib.tutils
from netlib import tcp, http, socks
from netlib.certutils import SSLCert
from netlib.http import authentication
@@ -9,7 +10,7 @@ from netlib.http.semantics import CONTENT_MISSING
from libpathod import pathoc, pathod
from libmproxy.proxy.config import HostMatcher
-from libmproxy.protocol import KILL, Error
+from libmproxy.protocol import KILL, Error, http_wrappers
import tutils
import tservers
@@ -783,7 +784,7 @@ class TestStreamRequest(tservers.HTTPProxTest):
class MasterFakeResponse(tservers.TestMaster):
def handle_request(self, f):
- resp = tutils.tresp()
+ resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
f.reply(resp)
@@ -848,7 +849,7 @@ class TestTransparentResolveError(tservers.TransparentProxTest):
class MasterIncomplete(tservers.TestMaster):
def handle_request(self, f):
- resp = tutils.tresp()
+ resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
resp.content = CONTENT_MISSING
f.reply(resp)
diff --git a/test/test_utils.py b/test/test_utils.py
index 0c514f5d..0ee23b97 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -44,11 +44,6 @@ def test_pretty_json():
assert not utils.pretty_json("moo")
-def test_urldecode():
- s = "one=two&three=four"
- assert len(utils.urldecode(s)) == 2
-
-
def test_multipartdecode():
boundary = 'somefancyboundary'
headers = odict.ODict(
@@ -116,13 +111,6 @@ def test_LRUCache():
assert len(cache.cache) == 2
-def test_unparse_url():
- assert utils.unparse_url("http", "foo.com", 99, "") == "http://foo.com:99"
- assert utils.unparse_url("http", "foo.com", 80, "") == "http://foo.com"
- assert utils.unparse_url("https", "foo.com", 80, "") == "https://foo.com:80"
- assert utils.unparse_url("https", "foo.com", 443, "") == "https://foo.com"
-
-
def test_parse_size():
assert not utils.parse_size("")
assert utils.parse_size("1") == 1
@@ -144,7 +132,3 @@ def test_parse_content_type():
def test_safe_subn():
assert utils.safe_subn("foo", u"bar", "\xc2foo")
-
-
-def test_urlencode():
- assert utils.urlencode([('foo', 'bar')])
diff --git a/test/tutils.py b/test/tutils.py
index 7c7d1db3..61b1154c 100644
--- a/test/tutils.py
+++ b/test/tutils.py
@@ -1,21 +1,24 @@
-from cStringIO import StringIO
import os
import shutil
import tempfile
import argparse
-from contextlib import contextmanager
import sys
+import mock_urwid
+from cStringIO import StringIO
+from contextlib import contextmanager
+from nose.plugins.skip import SkipTest
+from mock import Mock
+from time import time
+
+from netlib import certutils, odict
+import netlib.tutils
+
from libmproxy import flow, utils, controller
-from libmproxy.protocol import http
+from libmproxy.protocol import http, http_wrappers
from libmproxy.proxy.connection import ClientConnection, ServerConnection
-import mock_urwid
from libmproxy.console.flowview import FlowView
from libmproxy.console import ConsoleState
from libmproxy.protocol.primitives import Error
-from netlib import certutils, odict
-from nose.plugins.skip import SkipTest
-from mock import Mock
-from time import time
def _SkipWindows():
@@ -43,12 +46,17 @@ def tflow(client_conn=True, server_conn=True, req=True, resp=None, err=None):
if server_conn is True:
server_conn = tserver_conn()
if req is True:
- req = treq()
+ req = netlib.tutils.treq()
if resp is True:
- resp = tresp()
+ resp = netlib.tutils.tresp()
if err is True:
err = terr()
+ if req:
+ req = http_wrappers.HTTPRequest.wrap(req)
+ if resp:
+ resp = http_wrappers.HTTPResponse.wrap(resp)
+
f = http.HTTPFlow(client_conn, server_conn)
f.request = req
f.response = resp
@@ -83,60 +91,6 @@ def tserver_conn():
return c
-def treq(content="content", scheme="http", host="address", port=22):
- """
- @return: libmproxy.protocol.http.HTTPRequest
- """
- headers = odict.ODictCaseless()
- headers["header"] = ["qvalue"]
- req = http.HTTPRequest(
- "relative",
- "GET",
- scheme,
- host,
- port,
- "/path",
- (1, 1),
- headers,
- content,
- None,
- None,
- None,
- )
- return req
-
-
-def treq_absolute(content="content"):
- """
- @return: libmproxy.protocol.http.HTTPRequest
- """
- r = treq(content)
- r.form_in = r.form_out = "absolute"
- r.host = "address"
- r.port = 22
- r.scheme = "http"
- return r
-
-
-def tresp(content="message"):
- """
- @return: libmproxy.protocol.http.HTTPResponse
- """
-
- headers = odict.ODictCaseless()
- headers["header_response"] = ["svalue"]
-
- resp = http.HTTPResponse(
- (1, 1),
- 200,
- "OK",
- headers,
- content,
- time(),
- time(),
- )
- return resp
-
def terr(content="error"):
"""