From 907825714bf434efdee3ea99707aa509bb0f0c58 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Wed, 22 Mar 2017 12:02:18 +0100 Subject: move examples tests (#2199) --- test/examples/__init__.py | 0 test/examples/test_examples.py | 120 +++++++++ test/examples/test_har_dump.py | 114 +++++++++ test/examples/test_xss_scanner.py | 368 ++++++++++++++++++++++++++++ test/mitmproxy/examples/test_xss_scanner.py | 368 ---------------------------- test/mitmproxy/test_examples.py | 204 --------------- 6 files changed, 602 insertions(+), 572 deletions(-) create mode 100644 test/examples/__init__.py create mode 100644 test/examples/test_examples.py create mode 100644 test/examples/test_har_dump.py create mode 100644 test/examples/test_xss_scanner.py delete mode 100755 test/mitmproxy/examples/test_xss_scanner.py delete mode 100644 test/mitmproxy/test_examples.py (limited to 'test') diff --git a/test/examples/__init__.py b/test/examples/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/examples/test_examples.py b/test/examples/test_examples.py new file mode 100644 index 00000000..46fdcd36 --- /dev/null +++ b/test/examples/test_examples.py @@ -0,0 +1,120 @@ +import pytest + +from mitmproxy import options +from mitmproxy import contentviews +from mitmproxy import proxy +from mitmproxy import master +from mitmproxy.addons import script + +from mitmproxy.test import tflow +from mitmproxy.test import tutils +from mitmproxy.net.http import Headers + +from ..mitmproxy import tservers + +example_dir = tutils.test_data.push("../examples") + + +class ScriptError(Exception): + pass + + +class RaiseMaster(master.Master): + def add_log(self, e, level): + if level in ("warn", "error"): + raise ScriptError(e) + + +def tscript(cmd, args=""): + o = options.Options() + cmd = example_dir.path(cmd) + " " + args + m = RaiseMaster(o, proxy.DummyServer()) + sc = script.Script(cmd) + m.addons.add(sc) + return m, sc + + +class TestScripts(tservers.MasterTest): + def test_add_header(self): + m, _ = tscript("simple/add_header.py") + f = tflow.tflow(resp=tutils.tresp()) + m.addons.handle_lifecycle("response", f) + assert f.response.headers["newheader"] == "foo" + + def test_custom_contentviews(self): + m, sc = tscript("simple/custom_contentview.py") + swapcase = contentviews.get("swapcase") + _, fmt = swapcase(b"Test!") + assert any(b'tEST!' in val[0][1] for val in fmt) + + def test_iframe_injector(self): + with pytest.raises(ScriptError): + tscript("simple/modify_body_inject_iframe.py") + + m, sc = tscript("simple/modify_body_inject_iframe.py", "http://example.org/evil_iframe") + f = tflow.tflow(resp=tutils.tresp(content=b"mitmproxy")) + m.addons.handle_lifecycle("response", f) + content = f.response.content + assert b'iframe' in content and b'evil_iframe' in content + + def test_modify_form(self): + m, sc = tscript("simple/modify_form.py") + + form_header = Headers(content_type="application/x-www-form-urlencoded") + f = tflow.tflow(req=tutils.treq(headers=form_header)) + m.addons.handle_lifecycle("request", f) + + assert f.request.urlencoded_form["mitmproxy"] == "rocks" + + f.request.headers["content-type"] = "" + m.addons.handle_lifecycle("request", f) + assert list(f.request.urlencoded_form.items()) == [("foo", "bar")] + + def test_modify_querystring(self): + m, sc = tscript("simple/modify_querystring.py") + f = tflow.tflow(req=tutils.treq(path="/search?q=term")) + + m.addons.handle_lifecycle("request", f) + assert f.request.query["mitmproxy"] == "rocks" + + f.request.path = "/" + m.addons.handle_lifecycle("request", f) + assert f.request.query["mitmproxy"] == "rocks" + + def test_arguments(self): + m, sc = tscript("simple/script_arguments.py", "mitmproxy rocks") + f = tflow.tflow(resp=tutils.tresp(content=b"I <3 mitmproxy")) + m.addons.handle_lifecycle("response", f) + assert f.response.content == b"I <3 rocks" + + def test_redirect_requests(self): + m, sc = tscript("simple/redirect_requests.py") + f = tflow.tflow(req=tutils.treq(host="example.org")) + m.addons.handle_lifecycle("request", f) + assert f.request.host == "mitmproxy.org" + + def test_send_reply_from_proxy(self): + m, sc = tscript("simple/send_reply_from_proxy.py") + f = tflow.tflow(req=tutils.treq(host="example.com", port=80)) + m.addons.handle_lifecycle("request", f) + assert f.response.content == b"Hello World" + + def test_dns_spoofing(self): + m, sc = tscript("complex/dns_spoofing.py") + original_host = "example.com" + + host_header = Headers(host=original_host) + f = tflow.tflow(req=tutils.treq(headers=host_header, port=80)) + + m.addons.handle_lifecycle("requestheaders", f) + + # Rewrite by reverse proxy mode + f.request.scheme = "https" + f.request.port = 443 + + m.addons.handle_lifecycle("request", f) + + assert f.request.scheme == "http" + assert f.request.port == 80 + + assert f.request.headers["Host"] == original_host diff --git a/test/examples/test_har_dump.py b/test/examples/test_har_dump.py new file mode 100644 index 00000000..e5cfd2e1 --- /dev/null +++ b/test/examples/test_har_dump.py @@ -0,0 +1,114 @@ +import json +import shlex +import pytest + +from mitmproxy import options +from mitmproxy import proxy +from mitmproxy import master +from mitmproxy.addons import script + +from mitmproxy.test import tflow +from mitmproxy.test import tutils +from mitmproxy.net.http import cookies + +example_dir = tutils.test_data.push("../examples") + + +class ScriptError(Exception): + pass + + +class RaiseMaster(master.Master): + def add_log(self, e, level): + if level in ("warn", "error"): + raise ScriptError(e) + + +def tscript(cmd, args=""): + o = options.Options() + cmd = example_dir.path(cmd) + " " + args + m = RaiseMaster(o, proxy.DummyServer()) + sc = script.Script(cmd) + m.addons.add(sc) + return m, sc + + +class TestHARDump: + + def flow(self, resp_content=b'message'): + times = dict( + timestamp_start=746203272, + timestamp_end=746203272, + ) + + # Create a dummy flow for testing + return tflow.tflow( + req=tutils.treq(method=b'GET', **times), + resp=tutils.tresp(content=resp_content, **times) + ) + + def test_no_file_arg(self): + with pytest.raises(ScriptError): + tscript("complex/har_dump.py") + + def test_simple(self, tmpdir): + path = str(tmpdir.join("somefile")) + + m, sc = tscript("complex/har_dump.py", shlex.quote(path)) + m.addons.trigger("response", self.flow()) + m.addons.remove(sc) + + with open(path, "r") as inp: + har = json.load(inp) + assert len(har["log"]["entries"]) == 1 + + def test_base64(self, tmpdir): + path = str(tmpdir.join("somefile")) + + m, sc = tscript("complex/har_dump.py", shlex.quote(path)) + m.addons.trigger( + "response", self.flow(resp_content=b"foo" + b"\xFF" * 10) + ) + m.addons.remove(sc) + + with open(path, "r") as inp: + har = json.load(inp) + assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64" + + def test_format_cookies(self): + m, sc = tscript("complex/har_dump.py", "-") + format_cookies = sc.ns.format_cookies + + CA = cookies.CookieAttrs + + f = format_cookies([("n", "v", CA([("k", "v")]))])[0] + assert f['name'] == "n" + assert f['value'] == "v" + assert not f['httpOnly'] + assert not f['secure'] + + f = format_cookies([("n", "v", CA([("httponly", None), ("secure", None)]))])[0] + assert f['httpOnly'] + assert f['secure'] + + f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0] + assert f['expires'] + + def test_binary(self, tmpdir): + + f = self.flow() + f.request.method = "POST" + f.request.headers["content-type"] = "application/x-www-form-urlencoded" + f.request.content = b"foo=bar&baz=s%c3%bc%c3%9f" + f.response.headers["random-junk"] = bytes(range(256)) + f.response.content = bytes(range(256)) + + path = str(tmpdir.join("somefile")) + + m, sc = tscript("complex/har_dump.py", shlex.quote(path)) + m.addons.trigger("response", f) + m.addons.remove(sc) + + with open(path, "r") as inp: + har = json.load(inp) + assert len(har["log"]["entries"]) == 1 diff --git a/test/examples/test_xss_scanner.py b/test/examples/test_xss_scanner.py new file mode 100644 index 00000000..14ee6902 --- /dev/null +++ b/test/examples/test_xss_scanner.py @@ -0,0 +1,368 @@ +import pytest +import requests +from examples.complex import xss_scanner as xss +from mitmproxy.test import tflow, tutils + + +class TestXSSScanner(): + def test_get_XSS_info(self): + # First type of exploit: + # Exploitable: + xss_info = xss.get_XSS_data(b"" % + xss.FULL_PAYLOAD, + "https://example.com", + "End of URL") + expected_xss_info = xss.XSSData('https://example.com', + "End of URL", + '" % + xss.FULL_PAYLOAD.replace(b"'", b"%27").replace(b'"', b"%22"), + "https://example.com", + "End of URL") + expected_xss_info = xss.XSSData("https://example.com", + "End of URL", + '" % + xss.FULL_PAYLOAD.replace(b"'", b"%27").replace(b'"', b"%22").replace(b"/", b"%2F"), + "https://example.com", + "End of URL") + assert xss_info is None + # Second type of exploit: + # Exploitable: + xss_info = xss.get_XSS_data(b"" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").replace(b"\"", b"%22"), + "https://example.com", + "End of URL") + expected_xss_info = xss.XSSData("https://example.com", + "End of URL", + "';alert(0);g='", + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") + .replace(b"\"", b"%22").decode('utf-8')) + assert xss_info == expected_xss_info + # Non-Exploitable: + xss_info = xss.get_XSS_data(b"" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b"\"", b"%22").replace(b"'", b"%22"), + "https://example.com", + "End of URL") + assert xss_info is None + # Third type of exploit: + # Exploitable: + xss_info = xss.get_XSS_data(b"" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").replace(b"'", b"%27"), + "https://example.com", + "End of URL") + expected_xss_info = xss.XSSData("https://example.com", + "End of URL", + '";alert(0);g="', + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") + .replace(b"'", b"%27").decode('utf-8')) + assert xss_info == expected_xss_info + # Non-Exploitable: + xss_info = xss.get_XSS_data(b"" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b"'", b"%27").replace(b"\"", b"%22"), + "https://example.com", + "End of URL") + assert xss_info is None + # Fourth type of exploit: Test + # Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD, + "https://example.com", + "End of URL") + expected_xss_info = xss.XSSData("https://example.com", + "End of URL", + "'>", + xss.FULL_PAYLOAD.decode('utf-8')) + assert xss_info == expected_xss_info + # Non-Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD.replace(b"'", b"%27"), + "https://example.com", + "End of URL") + assert xss_info is None + # Fifth type of exploit: Test + # Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD.replace(b"'", b"%27"), + "https://example.com", + "End of URL") + expected_xss_info = xss.XSSData("https://example.com", + "End of URL", + "\">", + xss.FULL_PAYLOAD.replace(b"'", b"%27").decode('utf-8')) + assert xss_info == expected_xss_info + # Non-Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD.replace(b"'", b"%27").replace(b"\"", b"%22"), + "https://example.com", + "End of URL") + assert xss_info is None + # Sixth type of exploit: Test + # Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD, + "https://example.com", + "End of URL") + expected_xss_info = xss.XSSData("https://example.com", + "End of URL", + ">", + xss.FULL_PAYLOAD.decode('utf-8')) + assert xss_info == expected_xss_info + # Non-Exploitable + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") + .replace(b"=", b"%3D"), + "https://example.com", + "End of URL") + assert xss_info is None + # Seventh type of exploit: PAYLOAD + # Exploitable: + xss_info = xss.get_XSS_data(b"%s" % + xss.FULL_PAYLOAD, + "https://example.com", + "End of URL") + expected_xss_info = xss.XSSData("https://example.com", + "End of URL", + "", + xss.FULL_PAYLOAD.decode('utf-8')) + assert xss_info == expected_xss_info + # Non-Exploitable + xss_info = xss.get_XSS_data(b"%s" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").replace(b"/", b"%2F"), + "https://example.com", + "End of URL") + assert xss_info is None + # Eighth type of exploit: Test + # Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E"), + "https://example.com", + "End of URL") + expected_xss_info = xss.XSSData("https://example.com", + "End of URL", + "Javascript:alert(0)", + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").decode('utf-8')) + assert xss_info == expected_xss_info + # Non-Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") + .replace(b"=", b"%3D"), + "https://example.com", + "End of URL") + assert xss_info is None + # Ninth type of exploit: Test + # Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E"), + "https://example.com", + "End of URL") + expected_xss_info = xss.XSSData("https://example.com", + "End of URL", + '" onmouseover="alert(0)" t="', + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").decode('utf-8')) + assert xss_info == expected_xss_info + # Non-Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") + .replace(b'"', b"%22"), + "https://example.com", + "End of URL") + assert xss_info is None + # Tenth type of exploit: Test + # Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E"), + "https://example.com", + "End of URL") + expected_xss_info = xss.XSSData("https://example.com", + "End of URL", + "' onmouseover='alert(0)' t='", + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").decode('utf-8')) + assert xss_info == expected_xss_info + # Non-Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") + .replace(b"'", b"%22"), + "https://example.com", + "End of URL") + assert xss_info is None + # Eleventh type of exploit: Test + # Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E"), + "https://example.com", + "End of URL") + expected_xss_info = xss.XSSData("https://example.com", + "End of URL", + " onmouseover=alert(0) t=", + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").decode('utf-8')) + assert xss_info == expected_xss_info + # Non-Exploitable: + xss_info = xss.get_XSS_data(b"Test" % + xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") + .replace(b"=", b"%3D"), + "https://example.com", + "End of URL") + assert xss_info is None + + def test_get_SQLi_data(self): + sqli_data = xss.get_SQLi_data("SQL syntax MySQL", + "", + "https://example.com", + "End of URL") + expected_sqli_data = xss.SQLiData("https://example.com", + "End of URL", + "SQL syntax.*MySQL", + "MySQL") + assert sqli_data == expected_sqli_data + sqli_data = xss.get_SQLi_data("SQL syntax MySQL", + "SQL syntax MySQL", + "https://example.com", + "End of URL") + assert sqli_data is None + + def test_inside_quote(self): + assert not xss.inside_quote("'", b"no", 0, b"no") + assert xss.inside_quote("'", b"yes", 0, b"'yes'") + assert xss.inside_quote("'", b"yes", 1, b"'yes'otherJunk'yes'more") + assert not xss.inside_quote("'", b"longStringNotInIt", 1, b"short") + + def test_paths_to_text(self): + text = xss.paths_to_text("""

STRING

+ + """, "STRING") + expected_text = ["/html/head/h1", "/html/script"] + assert text == expected_text + assert xss.paths_to_text("""""", "STRING") == [] + + def mocked_requests_vuln(*args, headers=None, cookies=None): + class MockResponse: + def __init__(self, html, headers=None, cookies=None): + self.text = html + return MockResponse("%s" % xss.FULL_PAYLOAD) + + def mocked_requests_invuln(*args, headers=None, cookies=None): + class MockResponse: + def __init__(self, html, headers=None, cookies=None): + self.text = html + return MockResponse("") + + def test_test_end_of_url_injection(self, monkeypatch): + monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln) + xss_info = xss.test_end_of_URL_injection("", "https://example.com/index.html", {})[0] + expected_xss_info = xss.XSSData('https://example.com/index.html/1029zxcs\'d"aoso[sb]po(pc)se;sl/bsl\\eq=3847asd', + 'End of URL', + '', + '1029zxcs\\\'d"aoso[sb]po(pc)se;sl/bsl\\\\eq=3847asd') + sqli_info = xss.test_end_of_URL_injection("", "https://example.com/", {})[1] + assert xss_info == expected_xss_info + assert sqli_info is None + + def test_test_referer_injection(self, monkeypatch): + monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln) + xss_info = xss.test_referer_injection("", "https://example.com/", {})[0] + expected_xss_info = xss.XSSData('https://example.com/', + 'Referer', + '', + '1029zxcs\\\'d"aoso[sb]po(pc)se;sl/bsl\\\\eq=3847asd') + sqli_info = xss.test_referer_injection("", "https://example.com/", {})[1] + assert xss_info == expected_xss_info + assert sqli_info is None + + def test_test_user_agent_injection(self, monkeypatch): + monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln) + xss_info = xss.test_user_agent_injection("", "https://example.com/", {})[0] + expected_xss_info = xss.XSSData('https://example.com/', + 'User Agent', + '', + '1029zxcs\\\'d"aoso[sb]po(pc)se;sl/bsl\\\\eq=3847asd') + sqli_info = xss.test_user_agent_injection("", "https://example.com/", {})[1] + assert xss_info == expected_xss_info + assert sqli_info is None + + def test_test_query_injection(self, monkeypatch): + monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln) + xss_info = xss.test_query_injection("", "https://example.com/vuln.php?cmd=ls", {})[0] + expected_xss_info = xss.XSSData('https://example.com/vuln.php?cmd=1029zxcs\'d"aoso[sb]po(pc)se;sl/bsl\\eq=3847asd', + 'Query', + '', + '1029zxcs\\\'d"aoso[sb]po(pc)se;sl/bsl\\\\eq=3847asd') + sqli_info = xss.test_query_injection("", "https://example.com/vuln.php?cmd=ls", {})[1] + assert xss_info == expected_xss_info + assert sqli_info is None + + @pytest.fixture + def logger(self): + class Logger(): + def __init__(self): + self.args = [] + + def error(self, str): + self.args.append(str) + return Logger() + + def test_find_unclaimed_URLs(self, monkeypatch, logger): + logger.args = [] + monkeypatch.setattr("mitmproxy.ctx.log", logger) + xss.find_unclaimed_URLs("", + "https://example.com") + assert logger.args == [] + xss.find_unclaimed_URLs("", + "https://example.com") + assert logger.args[0] == 'XSS found in https://example.com due to unclaimed URL "http://unclaimedDomainName.com" in script tag.' + + def test_log_XSS_data(self, monkeypatch, logger): + logger.args = [] + monkeypatch.setattr("mitmproxy.ctx.log", logger) + xss.log_XSS_data(None) + assert logger.args == [] + # self, url: str, injection_point: str, exploit: str, line: str + xss.log_XSS_data(xss.XSSData('https://example.com', + 'Location', + 'String', + 'Line of HTML')) + assert logger.args[0] == '===== XSS Found ====' + assert logger.args[1] == 'XSS URL: https://example.com' + assert logger.args[2] == 'Injection Point: Location' + assert logger.args[3] == 'Suggested Exploit: String' + assert logger.args[4] == 'Line: Line of HTML' + + def test_log_SQLi_data(self, monkeypatch, logger): + logger.args = [] + monkeypatch.setattr("mitmproxy.ctx.log", logger) + xss.log_SQLi_data(None) + assert logger.args == [] + xss.log_SQLi_data(xss.SQLiData(b'https://example.com', + b'Location', + b'Oracle.*Driver', + b'Oracle')) + assert logger.args[0] == '===== SQLi Found =====' + assert logger.args[1] == 'SQLi URL: https://example.com' + assert logger.args[2] == 'Injection Point: Location' + assert logger.args[3] == 'Regex used: Oracle.*Driver' + + def test_get_cookies(self): + mocked_req = tutils.treq() + mocked_req.cookies = [("cookieName2", "cookieValue2")] + mocked_flow = tflow.tflow(req=mocked_req) + # It only uses the request cookies + assert xss.get_cookies(mocked_flow) == {"cookieName2": "cookieValue2"} + + def test_response(self, monkeypatch, logger): + logger.args = [] + monkeypatch.setattr("mitmproxy.ctx.log", logger) + monkeypatch.setattr(requests, 'get', self.mocked_requests_invuln) + mocked_flow = tflow.tflow(req=tutils.treq(path=b"index.html?q=1"), resp=tutils.tresp(content=b'')) + xss.response(mocked_flow) + assert logger.args == [] + + def test_data_equals(self): + xssData = xss.XSSData("a", "b", "c", "d") + sqliData = xss.SQLiData("a", "b", "c", "d") + assert xssData == xssData + assert sqliData == sqliData diff --git a/test/mitmproxy/examples/test_xss_scanner.py b/test/mitmproxy/examples/test_xss_scanner.py deleted file mode 100755 index 14ee6902..00000000 --- a/test/mitmproxy/examples/test_xss_scanner.py +++ /dev/null @@ -1,368 +0,0 @@ -import pytest -import requests -from examples.complex import xss_scanner as xss -from mitmproxy.test import tflow, tutils - - -class TestXSSScanner(): - def test_get_XSS_info(self): - # First type of exploit: - # Exploitable: - xss_info = xss.get_XSS_data(b"" % - xss.FULL_PAYLOAD, - "https://example.com", - "End of URL") - expected_xss_info = xss.XSSData('https://example.com', - "End of URL", - '" % - xss.FULL_PAYLOAD.replace(b"'", b"%27").replace(b'"', b"%22"), - "https://example.com", - "End of URL") - expected_xss_info = xss.XSSData("https://example.com", - "End of URL", - '" % - xss.FULL_PAYLOAD.replace(b"'", b"%27").replace(b'"', b"%22").replace(b"/", b"%2F"), - "https://example.com", - "End of URL") - assert xss_info is None - # Second type of exploit: - # Exploitable: - xss_info = xss.get_XSS_data(b"" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").replace(b"\"", b"%22"), - "https://example.com", - "End of URL") - expected_xss_info = xss.XSSData("https://example.com", - "End of URL", - "';alert(0);g='", - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") - .replace(b"\"", b"%22").decode('utf-8')) - assert xss_info == expected_xss_info - # Non-Exploitable: - xss_info = xss.get_XSS_data(b"" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b"\"", b"%22").replace(b"'", b"%22"), - "https://example.com", - "End of URL") - assert xss_info is None - # Third type of exploit: - # Exploitable: - xss_info = xss.get_XSS_data(b"" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").replace(b"'", b"%27"), - "https://example.com", - "End of URL") - expected_xss_info = xss.XSSData("https://example.com", - "End of URL", - '";alert(0);g="', - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") - .replace(b"'", b"%27").decode('utf-8')) - assert xss_info == expected_xss_info - # Non-Exploitable: - xss_info = xss.get_XSS_data(b"" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b"'", b"%27").replace(b"\"", b"%22"), - "https://example.com", - "End of URL") - assert xss_info is None - # Fourth type of exploit: Test - # Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD, - "https://example.com", - "End of URL") - expected_xss_info = xss.XSSData("https://example.com", - "End of URL", - "'>", - xss.FULL_PAYLOAD.decode('utf-8')) - assert xss_info == expected_xss_info - # Non-Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD.replace(b"'", b"%27"), - "https://example.com", - "End of URL") - assert xss_info is None - # Fifth type of exploit: Test - # Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD.replace(b"'", b"%27"), - "https://example.com", - "End of URL") - expected_xss_info = xss.XSSData("https://example.com", - "End of URL", - "\">", - xss.FULL_PAYLOAD.replace(b"'", b"%27").decode('utf-8')) - assert xss_info == expected_xss_info - # Non-Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD.replace(b"'", b"%27").replace(b"\"", b"%22"), - "https://example.com", - "End of URL") - assert xss_info is None - # Sixth type of exploit: Test - # Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD, - "https://example.com", - "End of URL") - expected_xss_info = xss.XSSData("https://example.com", - "End of URL", - ">", - xss.FULL_PAYLOAD.decode('utf-8')) - assert xss_info == expected_xss_info - # Non-Exploitable - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") - .replace(b"=", b"%3D"), - "https://example.com", - "End of URL") - assert xss_info is None - # Seventh type of exploit: PAYLOAD - # Exploitable: - xss_info = xss.get_XSS_data(b"%s" % - xss.FULL_PAYLOAD, - "https://example.com", - "End of URL") - expected_xss_info = xss.XSSData("https://example.com", - "End of URL", - "", - xss.FULL_PAYLOAD.decode('utf-8')) - assert xss_info == expected_xss_info - # Non-Exploitable - xss_info = xss.get_XSS_data(b"%s" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").replace(b"/", b"%2F"), - "https://example.com", - "End of URL") - assert xss_info is None - # Eighth type of exploit: Test - # Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E"), - "https://example.com", - "End of URL") - expected_xss_info = xss.XSSData("https://example.com", - "End of URL", - "Javascript:alert(0)", - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").decode('utf-8')) - assert xss_info == expected_xss_info - # Non-Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") - .replace(b"=", b"%3D"), - "https://example.com", - "End of URL") - assert xss_info is None - # Ninth type of exploit: Test - # Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E"), - "https://example.com", - "End of URL") - expected_xss_info = xss.XSSData("https://example.com", - "End of URL", - '" onmouseover="alert(0)" t="', - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").decode('utf-8')) - assert xss_info == expected_xss_info - # Non-Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") - .replace(b'"', b"%22"), - "https://example.com", - "End of URL") - assert xss_info is None - # Tenth type of exploit: Test - # Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E"), - "https://example.com", - "End of URL") - expected_xss_info = xss.XSSData("https://example.com", - "End of URL", - "' onmouseover='alert(0)' t='", - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").decode('utf-8')) - assert xss_info == expected_xss_info - # Non-Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") - .replace(b"'", b"%22"), - "https://example.com", - "End of URL") - assert xss_info is None - # Eleventh type of exploit: Test - # Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E"), - "https://example.com", - "End of URL") - expected_xss_info = xss.XSSData("https://example.com", - "End of URL", - " onmouseover=alert(0) t=", - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").decode('utf-8')) - assert xss_info == expected_xss_info - # Non-Exploitable: - xss_info = xss.get_XSS_data(b"Test" % - xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E") - .replace(b"=", b"%3D"), - "https://example.com", - "End of URL") - assert xss_info is None - - def test_get_SQLi_data(self): - sqli_data = xss.get_SQLi_data("SQL syntax MySQL", - "", - "https://example.com", - "End of URL") - expected_sqli_data = xss.SQLiData("https://example.com", - "End of URL", - "SQL syntax.*MySQL", - "MySQL") - assert sqli_data == expected_sqli_data - sqli_data = xss.get_SQLi_data("SQL syntax MySQL", - "SQL syntax MySQL", - "https://example.com", - "End of URL") - assert sqli_data is None - - def test_inside_quote(self): - assert not xss.inside_quote("'", b"no", 0, b"no") - assert xss.inside_quote("'", b"yes", 0, b"'yes'") - assert xss.inside_quote("'", b"yes", 1, b"'yes'otherJunk'yes'more") - assert not xss.inside_quote("'", b"longStringNotInIt", 1, b"short") - - def test_paths_to_text(self): - text = xss.paths_to_text("""

STRING

- - """, "STRING") - expected_text = ["/html/head/h1", "/html/script"] - assert text == expected_text - assert xss.paths_to_text("""""", "STRING") == [] - - def mocked_requests_vuln(*args, headers=None, cookies=None): - class MockResponse: - def __init__(self, html, headers=None, cookies=None): - self.text = html - return MockResponse("%s" % xss.FULL_PAYLOAD) - - def mocked_requests_invuln(*args, headers=None, cookies=None): - class MockResponse: - def __init__(self, html, headers=None, cookies=None): - self.text = html - return MockResponse("") - - def test_test_end_of_url_injection(self, monkeypatch): - monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln) - xss_info = xss.test_end_of_URL_injection("", "https://example.com/index.html", {})[0] - expected_xss_info = xss.XSSData('https://example.com/index.html/1029zxcs\'d"aoso[sb]po(pc)se;sl/bsl\\eq=3847asd', - 'End of URL', - '', - '1029zxcs\\\'d"aoso[sb]po(pc)se;sl/bsl\\\\eq=3847asd') - sqli_info = xss.test_end_of_URL_injection("", "https://example.com/", {})[1] - assert xss_info == expected_xss_info - assert sqli_info is None - - def test_test_referer_injection(self, monkeypatch): - monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln) - xss_info = xss.test_referer_injection("", "https://example.com/", {})[0] - expected_xss_info = xss.XSSData('https://example.com/', - 'Referer', - '', - '1029zxcs\\\'d"aoso[sb]po(pc)se;sl/bsl\\\\eq=3847asd') - sqli_info = xss.test_referer_injection("", "https://example.com/", {})[1] - assert xss_info == expected_xss_info - assert sqli_info is None - - def test_test_user_agent_injection(self, monkeypatch): - monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln) - xss_info = xss.test_user_agent_injection("", "https://example.com/", {})[0] - expected_xss_info = xss.XSSData('https://example.com/', - 'User Agent', - '', - '1029zxcs\\\'d"aoso[sb]po(pc)se;sl/bsl\\\\eq=3847asd') - sqli_info = xss.test_user_agent_injection("", "https://example.com/", {})[1] - assert xss_info == expected_xss_info - assert sqli_info is None - - def test_test_query_injection(self, monkeypatch): - monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln) - xss_info = xss.test_query_injection("", "https://example.com/vuln.php?cmd=ls", {})[0] - expected_xss_info = xss.XSSData('https://example.com/vuln.php?cmd=1029zxcs\'d"aoso[sb]po(pc)se;sl/bsl\\eq=3847asd', - 'Query', - '', - '1029zxcs\\\'d"aoso[sb]po(pc)se;sl/bsl\\\\eq=3847asd') - sqli_info = xss.test_query_injection("", "https://example.com/vuln.php?cmd=ls", {})[1] - assert xss_info == expected_xss_info - assert sqli_info is None - - @pytest.fixture - def logger(self): - class Logger(): - def __init__(self): - self.args = [] - - def error(self, str): - self.args.append(str) - return Logger() - - def test_find_unclaimed_URLs(self, monkeypatch, logger): - logger.args = [] - monkeypatch.setattr("mitmproxy.ctx.log", logger) - xss.find_unclaimed_URLs("", - "https://example.com") - assert logger.args == [] - xss.find_unclaimed_URLs("", - "https://example.com") - assert logger.args[0] == 'XSS found in https://example.com due to unclaimed URL "http://unclaimedDomainName.com" in script tag.' - - def test_log_XSS_data(self, monkeypatch, logger): - logger.args = [] - monkeypatch.setattr("mitmproxy.ctx.log", logger) - xss.log_XSS_data(None) - assert logger.args == [] - # self, url: str, injection_point: str, exploit: str, line: str - xss.log_XSS_data(xss.XSSData('https://example.com', - 'Location', - 'String', - 'Line of HTML')) - assert logger.args[0] == '===== XSS Found ====' - assert logger.args[1] == 'XSS URL: https://example.com' - assert logger.args[2] == 'Injection Point: Location' - assert logger.args[3] == 'Suggested Exploit: String' - assert logger.args[4] == 'Line: Line of HTML' - - def test_log_SQLi_data(self, monkeypatch, logger): - logger.args = [] - monkeypatch.setattr("mitmproxy.ctx.log", logger) - xss.log_SQLi_data(None) - assert logger.args == [] - xss.log_SQLi_data(xss.SQLiData(b'https://example.com', - b'Location', - b'Oracle.*Driver', - b'Oracle')) - assert logger.args[0] == '===== SQLi Found =====' - assert logger.args[1] == 'SQLi URL: https://example.com' - assert logger.args[2] == 'Injection Point: Location' - assert logger.args[3] == 'Regex used: Oracle.*Driver' - - def test_get_cookies(self): - mocked_req = tutils.treq() - mocked_req.cookies = [("cookieName2", "cookieValue2")] - mocked_flow = tflow.tflow(req=mocked_req) - # It only uses the request cookies - assert xss.get_cookies(mocked_flow) == {"cookieName2": "cookieValue2"} - - def test_response(self, monkeypatch, logger): - logger.args = [] - monkeypatch.setattr("mitmproxy.ctx.log", logger) - monkeypatch.setattr(requests, 'get', self.mocked_requests_invuln) - mocked_flow = tflow.tflow(req=tutils.treq(path=b"index.html?q=1"), resp=tutils.tresp(content=b'')) - xss.response(mocked_flow) - assert logger.args == [] - - def test_data_equals(self): - xssData = xss.XSSData("a", "b", "c", "d") - sqliData = xss.SQLiData("a", "b", "c", "d") - assert xssData == xssData - assert sqliData == sqliData diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py deleted file mode 100644 index 030f2c4e..00000000 --- a/test/mitmproxy/test_examples.py +++ /dev/null @@ -1,204 +0,0 @@ -import json -import shlex -import pytest - -from mitmproxy import options -from mitmproxy import contentviews -from mitmproxy import proxy -from mitmproxy import master -from mitmproxy.addons import script - -from mitmproxy.test import tflow -from mitmproxy.test import tutils -from mitmproxy.net.http import Headers -from mitmproxy.net.http import cookies - -from . import tservers - -example_dir = tutils.test_data.push("../examples") - - -class ScriptError(Exception): - pass - - -class RaiseMaster(master.Master): - def add_log(self, e, level): - if level in ("warn", "error"): - raise ScriptError(e) - - -def tscript(cmd, args=""): - o = options.Options() - cmd = example_dir.path(cmd) + " " + args - m = RaiseMaster(o, proxy.DummyServer()) - sc = script.Script(cmd) - m.addons.add(sc) - return m, sc - - -class TestScripts(tservers.MasterTest): - def test_add_header(self): - m, _ = tscript("simple/add_header.py") - f = tflow.tflow(resp=tutils.tresp()) - m.addons.handle_lifecycle("response", f) - assert f.response.headers["newheader"] == "foo" - - def test_custom_contentviews(self): - m, sc = tscript("simple/custom_contentview.py") - swapcase = contentviews.get("swapcase") - _, fmt = swapcase(b"Test!") - assert any(b'tEST!' in val[0][1] for val in fmt) - - def test_iframe_injector(self): - with pytest.raises(ScriptError): - tscript("simple/modify_body_inject_iframe.py") - - m, sc = tscript("simple/modify_body_inject_iframe.py", "http://example.org/evil_iframe") - f = tflow.tflow(resp=tutils.tresp(content=b"mitmproxy")) - m.addons.handle_lifecycle("response", f) - content = f.response.content - assert b'iframe' in content and b'evil_iframe' in content - - def test_modify_form(self): - m, sc = tscript("simple/modify_form.py") - - form_header = Headers(content_type="application/x-www-form-urlencoded") - f = tflow.tflow(req=tutils.treq(headers=form_header)) - m.addons.handle_lifecycle("request", f) - - assert f.request.urlencoded_form["mitmproxy"] == "rocks" - - f.request.headers["content-type"] = "" - m.addons.handle_lifecycle("request", f) - assert list(f.request.urlencoded_form.items()) == [("foo", "bar")] - - def test_modify_querystring(self): - m, sc = tscript("simple/modify_querystring.py") - f = tflow.tflow(req=tutils.treq(path="/search?q=term")) - - m.addons.handle_lifecycle("request", f) - assert f.request.query["mitmproxy"] == "rocks" - - f.request.path = "/" - m.addons.handle_lifecycle("request", f) - assert f.request.query["mitmproxy"] == "rocks" - - def test_arguments(self): - m, sc = tscript("simple/script_arguments.py", "mitmproxy rocks") - f = tflow.tflow(resp=tutils.tresp(content=b"I <3 mitmproxy")) - m.addons.handle_lifecycle("response", f) - assert f.response.content == b"I <3 rocks" - - def test_redirect_requests(self): - m, sc = tscript("simple/redirect_requests.py") - f = tflow.tflow(req=tutils.treq(host="example.org")) - m.addons.handle_lifecycle("request", f) - assert f.request.host == "mitmproxy.org" - - def test_send_reply_from_proxy(self): - m, sc = tscript("simple/send_reply_from_proxy.py") - f = tflow.tflow(req=tutils.treq(host="example.com", port=80)) - m.addons.handle_lifecycle("request", f) - assert f.response.content == b"Hello World" - - def test_dns_spoofing(self): - m, sc = tscript("complex/dns_spoofing.py") - original_host = "example.com" - - host_header = Headers(host=original_host) - f = tflow.tflow(req=tutils.treq(headers=host_header, port=80)) - - m.addons.handle_lifecycle("requestheaders", f) - - # Rewrite by reverse proxy mode - f.request.scheme = "https" - f.request.port = 443 - - m.addons.handle_lifecycle("request", f) - - assert f.request.scheme == "http" - assert f.request.port == 80 - - assert f.request.headers["Host"] == original_host - - -class TestHARDump: - - def flow(self, resp_content=b'message'): - times = dict( - timestamp_start=746203272, - timestamp_end=746203272, - ) - - # Create a dummy flow for testing - return tflow.tflow( - req=tutils.treq(method=b'GET', **times), - resp=tutils.tresp(content=resp_content, **times) - ) - - def test_no_file_arg(self): - with pytest.raises(ScriptError): - tscript("complex/har_dump.py") - - def test_simple(self, tmpdir): - path = str(tmpdir.join("somefile")) - - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.trigger("response", self.flow()) - m.addons.remove(sc) - - with open(path, "r") as inp: - har = json.load(inp) - assert len(har["log"]["entries"]) == 1 - - def test_base64(self, tmpdir): - path = str(tmpdir.join("somefile")) - - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.trigger( - "response", self.flow(resp_content=b"foo" + b"\xFF" * 10) - ) - m.addons.remove(sc) - - with open(path, "r") as inp: - har = json.load(inp) - assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64" - - def test_format_cookies(self): - m, sc = tscript("complex/har_dump.py", "-") - format_cookies = sc.ns.format_cookies - - CA = cookies.CookieAttrs - - f = format_cookies([("n", "v", CA([("k", "v")]))])[0] - assert f['name'] == "n" - assert f['value'] == "v" - assert not f['httpOnly'] - assert not f['secure'] - - f = format_cookies([("n", "v", CA([("httponly", None), ("secure", None)]))])[0] - assert f['httpOnly'] - assert f['secure'] - - f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0] - assert f['expires'] - - def test_binary(self, tmpdir): - - f = self.flow() - f.request.method = "POST" - f.request.headers["content-type"] = "application/x-www-form-urlencoded" - f.request.content = b"foo=bar&baz=s%c3%bc%c3%9f" - f.response.headers["random-junk"] = bytes(range(256)) - f.response.content = bytes(range(256)) - - path = str(tmpdir.join("somefile")) - - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.trigger("response", f) - m.addons.remove(sc) - - with open(path, "r") as inp: - har = json.load(inp) - assert len(har["log"]["entries"]) == 1 -- cgit v1.2.3