diff options
Diffstat (limited to 'test/mitmproxy/test_http.py')
-rw-r--r-- | test/mitmproxy/test_http.py | 257 |
1 files changed, 256 insertions, 1 deletions
diff --git a/test/mitmproxy/test_http.py b/test/mitmproxy/test_http.py index 777ab4dd..889eb0a7 100644 --- a/test/mitmproxy/test_http.py +++ b/test/mitmproxy/test_http.py @@ -1 +1,256 @@ -# TODO: write tests +import pytest + +from mitmproxy.test import tflow +from mitmproxy.net.http import Headers +import mitmproxy.io +from mitmproxy import flowfilter +from mitmproxy.exceptions import Kill +from mitmproxy import flow +from mitmproxy import http + + +class TestHTTPRequest: + + def test_simple(self): + f = tflow.tflow() + r = f.request + u = r.url + r.url = u + with pytest.raises(ValueError): + setattr(r, "url", "") + assert r.url == u + r2 = r.copy() + assert r.get_state() == r2.get_state() + assert hash(r) + + def test_get_url(self): + r = http.HTTPRequest.wrap(mitmproxy.test.tutils.treq()) + + assert r.url == "http://address:22/path" + + r.scheme = "https" + assert r.url == "https://address:22/path" + + r.host = "host" + r.port = 42 + assert r.url == "https://host:42/path" + + r.host = "address" + r.port = 22 + assert r.url == "https://address:22/path" + + assert r.pretty_url == "https://address:22/path" + r.headers["Host"] = "foo.com:22" + assert r.url == "https://address:22/path" + assert r.pretty_url == "https://foo.com:22/path" + + def test_replace(self): + r = http.HTTPRequest.wrap(mitmproxy.test.tutils.treq()) + r.path = "path/foo" + r.headers["Foo"] = "fOo" + r.content = b"afoob" + assert r.replace("foo(?i)", "boo") == 4 + assert r.path == "path/boo" + assert b"foo" not in r.content + assert r.headers["boo"] == "boo" + + def test_constrain_encoding(self): + r = http.HTTPRequest.wrap(mitmproxy.test.tutils.treq()) + r.headers["accept-encoding"] = "gzip, oink" + r.constrain_encoding() + assert "oink" not in r.headers["accept-encoding"] + + r.headers.set_all("accept-encoding", ["gzip", "oink"]) + r.constrain_encoding() + assert "oink" not in r.headers["accept-encoding"] + + def test_get_content_type(self): + resp = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp()) + resp.headers = Headers(content_type="text/plain") + assert resp.headers["content-type"] == "text/plain" + + +class TestHTTPResponse: + + def test_simple(self): + f = tflow.tflow(resp=True) + resp = f.response + resp2 = resp.copy() + assert resp2.get_state() == resp.get_state() + + def test_replace(self): + r = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp()) + r.headers["Foo"] = "fOo" + r.content = b"afoob" + assert r.replace("foo(?i)", "boo") == 3 + assert b"foo" not in r.content + assert r.headers["boo"] == "boo" + + def test_get_content_type(self): + resp = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp()) + resp.headers = Headers(content_type="text/plain") + assert resp.headers["content-type"] == "text/plain" + + +class TestHTTPFlow: + + def test_copy(self): + f = tflow.tflow(resp=True) + assert repr(f) + f.get_state() + f2 = f.copy() + a = f.get_state() + b = f2.get_state() + del a["id"] + del b["id"] + assert a == b + assert not f == f2 + assert f is not f2 + assert f.request.get_state() == f2.request.get_state() + assert f.request is not f2.request + assert f.request.headers == f2.request.headers + assert f.request.headers is not f2.request.headers + assert f.response.get_state() == f2.response.get_state() + assert f.response is not f2.response + + f = tflow.tflow(err=True) + f2 = f.copy() + assert f is not f2 + assert f.request is not f2.request + assert f.request.headers == f2.request.headers + assert f.request.headers is not f2.request.headers + assert f.error.get_state() == f2.error.get_state() + assert f.error is not f2.error + + def test_match(self): + f = tflow.tflow(resp=True) + assert not flowfilter.match("~b test", f) + assert flowfilter.match(None, f) + assert not flowfilter.match("~b test", f) + + f = tflow.tflow(err=True) + assert flowfilter.match("~e", f) + + with pytest.raises(ValueError): + flowfilter.match("~", f) + + def test_backup(self): + f = tflow.tflow() + f.response = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp()) + f.request.content = b"foo" + assert not f.modified() + f.backup() + f.request.content = b"bar" + assert f.modified() + f.revert() + assert f.request.content == b"foo" + + def test_backup_idempotence(self): + f = tflow.tflow(resp=True) + f.backup() + f.revert() + f.backup() + f.revert() + + def test_getset_state(self): + f = tflow.tflow(resp=True) + state = f.get_state() + assert f.get_state() == http.HTTPFlow.from_state( + state).get_state() + + f.response = None + f.error = flow.Error("error") + state = f.get_state() + assert f.get_state() == http.HTTPFlow.from_state( + state).get_state() + + f2 = f.copy() + f2.id = f.id # copy creates a different uuid + assert f.get_state() == f2.get_state() + assert not f == f2 + f2.error = flow.Error("e2") + assert not f == f2 + f.set_state(f2.get_state()) + assert f.get_state() == f2.get_state() + + def test_kill(self): + f = tflow.tflow() + f.reply.handle() + f.intercept() + assert f.killable + f.kill() + assert not f.killable + assert f.reply.value == Kill + + def test_resume(self): + f = tflow.tflow() + f.reply.handle() + f.intercept() + assert f.reply.state == "taken" + f.resume() + assert f.reply.state == "committed" + + def test_replace_unicode(self): + f = tflow.tflow(resp=True) + f.response.content = b"\xc2foo" + f.replace(b"foo", u"bar") + + def test_replace_no_content(self): + f = tflow.tflow() + f.request.content = None + assert f.replace("foo", "bar") == 0 + + def test_replace(self): + f = tflow.tflow(resp=True) + f.request.headers["foo"] = "foo" + f.request.content = b"afoob" + + f.response.headers["foo"] = "foo" + f.response.content = b"afoob" + + assert f.replace("foo", "bar") == 6 + + assert f.request.headers["bar"] == "bar" + assert f.request.content == b"abarb" + assert f.response.headers["bar"] == "bar" + assert f.response.content == b"abarb" + + def test_replace_encoded(self): + f = tflow.tflow(resp=True) + f.request.content = b"afoob" + f.request.encode("gzip") + f.response.content = b"afoob" + f.response.encode("gzip") + + f.replace("foo", "bar") + + assert f.request.raw_content != b"abarb" + f.request.decode() + assert f.request.raw_content == b"abarb" + + assert f.response.raw_content != b"abarb" + f.response.decode() + assert f.response.raw_content == b"abarb" + + +def test_make_error_response(): + resp = http.make_error_response(543, 'foobar', Headers()) + assert resp + + +def test_make_connect_request(): + req = http.make_connect_request(('invalidhost', 1234)) + assert req.first_line_format == 'authority' + assert req.method == 'CONNECT' + assert req.http_version == 'HTTP/1.1' + + +def test_make_connect_response(): + resp = http.make_connect_response('foobar') + assert resp.http_version == 'foobar' + assert resp.status_code == 200 + + +def test_expect_continue_response(): + assert http.expect_continue_response.http_version == 'HTTP/1.1' + assert http.expect_continue_response.status_code == 100 |