aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2019-11-16 12:03:34 +0100
committerGitHub <noreply@github.com>2019-11-16 12:03:34 +0100
commit5c0be1de4a19be845be4a63f96b0da01c0f2fad7 (patch)
tree9936e88f5a5128c0b350e835e781452ecf8bb95d
parent698f7e2e177baf313e6af62ec0f79a26693e430b (diff)
parent484e099eb10897b89d83d4b0441b4455faab9422 (diff)
downloadmitmproxy-5c0be1de4a19be845be4a63f96b0da01c0f2fad7.tar.gz
mitmproxy-5c0be1de4a19be845be4a63f96b0da01c0f2fad7.tar.bz2
mitmproxy-5c0be1de4a19be845be4a63f96b0da01c0f2fad7.zip
Merge pull request #3448 from cript0nauta/master
Fix command injection vulnerability when exporting to curl or httpie
-rw-r--r--mitmproxy/addons/export.py81
-rw-r--r--test/mitmproxy/addons/test_export.py65
2 files changed, 104 insertions, 42 deletions
diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py
index 2776118a..80413ac9 100644
--- a/mitmproxy/addons/export.py
+++ b/mitmproxy/addons/export.py
@@ -1,56 +1,73 @@
+import shlex
import typing
-from mitmproxy import ctx
+import pyperclip
+
+import mitmproxy.types
from mitmproxy import command
-from mitmproxy import flow
+from mitmproxy import ctx, http
from mitmproxy import exceptions
-from mitmproxy.utils import strutils
+from mitmproxy import flow
from mitmproxy.net.http.http1 import assemble
-import mitmproxy.types
-
-import pyperclip
+from mitmproxy.utils import strutils
-def cleanup_request(f: flow.Flow):
+def cleanup_request(f: flow.Flow) -> http.HTTPRequest:
if not hasattr(f, "request"):
raise exceptions.CommandError("Can't export flow with no request.")
- request = f.request.copy() # type: ignore
+ assert isinstance(f, http.HTTPFlow)
+ request = f.request.copy()
request.decode(strict=False)
- # a bit of clean-up
- if request.method == 'GET' and request.headers.get("content-length", None) == "0":
- request.headers.pop('content-length')
- request.headers.pop(':authority', None)
+ # a bit of clean-up - these headers should be automatically set by curl/httpie
+ request.headers.pop('content-length')
+ if request.headers.get("host", "") == request.host:
+ request.headers.pop("host")
+ if request.headers.get(":authority", "") == request.host:
+ request.headers.pop(":authority")
return request
+def request_content_for_console(request: http.HTTPRequest) -> str:
+ try:
+ text = request.get_text(strict=True)
+ assert text
+ except ValueError:
+ # shlex.quote doesn't support a bytes object
+ # see https://github.com/python/cpython/pull/10871
+ raise exceptions.CommandError("Request content must be valid unicode")
+ escape_control_chars = {chr(i): f"\\x{i:02x}" for i in range(32)}
+ return "".join(
+ escape_control_chars.get(x, x)
+ for x in text
+ )
+
+
def curl_command(f: flow.Flow) -> str:
- data = "curl "
request = cleanup_request(f)
+ args = ["curl"]
for k, v in request.headers.items(multi=True):
- data += "--compressed " if k == 'accept-encoding' else ""
- data += "-H '%s:%s' " % (k, v)
+ if k.lower() == "accept-encoding":
+ args.append("--compressed")
+ else:
+ args += ["-H", f"{k}: {v}"]
+
if request.method != "GET":
- data += "-X %s " % request.method
- data += "'%s'" % request.url
+ args += ["-X", request.method]
+ args.append(request.url)
if request.content:
- data += " --data-binary '%s'" % strutils.bytes_to_escaped_str(
- request.content,
- escape_single_quotes=True
- )
- return data
+ args += ["-d", request_content_for_console(request)]
+ return ' '.join(shlex.quote(arg) for arg in args)
def httpie_command(f: flow.Flow) -> str:
request = cleanup_request(f)
- data = "http %s %s" % (request.method, request.url)
+ args = ["http", request.method, request.url]
for k, v in request.headers.items(multi=True):
- data += " '%s:%s'" % (k, v)
+ args.append(f"{k}: {v}")
+ cmd = ' '.join(shlex.quote(arg) for arg in args)
if request.content:
- data += " <<< '%s'" % strutils.bytes_to_escaped_str(
- request.content,
- escape_single_quotes=True
- )
- return data
+ cmd += " <<< " + shlex.quote(request_content_for_console(request))
+ return cmd
def raw(f: flow.Flow) -> bytes:
@@ -58,9 +75,9 @@ def raw(f: flow.Flow) -> bytes:
formats = dict(
- curl = curl_command,
- httpie = httpie_command,
- raw = raw,
+ curl=curl_command,
+ httpie=httpie_command,
+ raw=raw,
)
diff --git a/test/mitmproxy/addons/test_export.py b/test/mitmproxy/addons/test_export.py
index c86e0c7d..4b905722 100644
--- a/test/mitmproxy/addons/test_export.py
+++ b/test/mitmproxy/addons/test_export.py
@@ -1,4 +1,5 @@
import os
+import shlex
import pytest
import pyperclip
@@ -41,43 +42,87 @@ def tcp_flow():
class TestExportCurlCommand:
def test_get(self, get_request):
- result = """curl -H 'header:qvalue' 'http://address:22/path?a=foo&a=bar&b=baz'"""
+ result = """curl -H 'header: qvalue' 'http://address:22/path?a=foo&a=bar&b=baz'"""
assert export.curl_command(get_request) == result
def test_post(self, post_request):
- result = "curl -H 'content-length:256' -X POST 'http://address:22/path' --data-binary '{}'".format(
- str(bytes(range(256)))[2:-1]
- )
+ post_request.request.content = b'nobinarysupport'
+ result = "curl -X POST http://address:22/path -d nobinarysupport"
assert export.curl_command(post_request) == result
+ def test_fails_with_binary_data(self, post_request):
+ # shlex.quote doesn't support a bytes object
+ # see https://github.com/python/cpython/pull/10871
+ post_request.request.headers["Content-Type"] = "application/json; charset=utf-8"
+ with pytest.raises(exceptions.CommandError):
+ export.curl_command(post_request)
+
def test_patch(self, patch_request):
- result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'"""
+ result = """curl -H 'header: qvalue' -X PATCH 'http://address:22/path?query=param' -d content"""
assert export.curl_command(patch_request) == result
def test_tcp(self, tcp_flow):
with pytest.raises(exceptions.CommandError):
export.curl_command(tcp_flow)
+ def test_escape_single_quotes_in_body(self):
+ request = tflow.tflow(
+ req=tutils.treq(
+ method=b'POST',
+ headers=(),
+ content=b"'&#"
+ )
+ )
+ command = export.curl_command(request)
+ assert shlex.split(command)[-2] == '-d'
+ assert shlex.split(command)[-1] == "'&#"
+
+ def test_strip_unnecessary(self, get_request):
+ get_request.request.headers.clear()
+ get_request.request.headers["host"] = "address"
+ get_request.request.headers[":authority"] = "address"
+ get_request.request.headers["accept-encoding"] = "br"
+ result = """curl --compressed 'http://address:22/path?a=foo&a=bar&b=baz'"""
+ assert export.curl_command(get_request) == result
+
class TestExportHttpieCommand:
def test_get(self, get_request):
- result = """http GET http://address:22/path?a=foo&a=bar&b=baz 'header:qvalue'"""
+ result = """http GET 'http://address:22/path?a=foo&a=bar&b=baz' 'header: qvalue'"""
assert export.httpie_command(get_request) == result
def test_post(self, post_request):
- result = "http POST http://address:22/path 'content-length:256' <<< '{}'".format(
- str(bytes(range(256)))[2:-1]
- )
+ post_request.request.content = b'nobinarysupport'
+ result = "http POST http://address:22/path <<< nobinarysupport"
assert export.httpie_command(post_request) == result
+ def test_fails_with_binary_data(self, post_request):
+ # shlex.quote doesn't support a bytes object
+ # see https://github.com/python/cpython/pull/10871
+ post_request.request.headers["Content-Type"] = "application/json; charset=utf-8"
+ with pytest.raises(exceptions.CommandError):
+ export.httpie_command(post_request)
+
def test_patch(self, patch_request):
- result = """http PATCH http://address:22/path?query=param 'header:qvalue' 'content-length:7' <<< 'content'"""
+ result = """http PATCH 'http://address:22/path?query=param' 'header: qvalue' <<< content"""
assert export.httpie_command(patch_request) == result
def test_tcp(self, tcp_flow):
with pytest.raises(exceptions.CommandError):
export.httpie_command(tcp_flow)
+ def test_escape_single_quotes_in_body(self):
+ request = tflow.tflow(
+ req=tutils.treq(
+ method=b'POST',
+ headers=(),
+ content=b"'&#"
+ )
+ )
+ command = export.httpie_command(request)
+ assert shlex.split(command)[-2] == '<<<'
+ assert shlex.split(command)[-1] == "'&#"
+
class TestRaw:
def test_get(self, get_request):