aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/complex/har_dump.py3
-rw-r--r--mitmproxy/net/http/encoding.py31
-rw-r--r--test/mitmproxy/net/http/test_encoding.py43
3 files changed, 55 insertions, 22 deletions
diff --git a/examples/complex/har_dump.py b/examples/complex/har_dump.py
index 62011903..f7c1e658 100644
--- a/examples/complex/har_dump.py
+++ b/examples/complex/har_dump.py
@@ -7,6 +7,7 @@ import json
import sys
import base64
import zlib
+import os
from datetime import datetime
import pytz
@@ -166,7 +167,7 @@ def done():
if dump_file.endswith('.zhar'):
raw = zlib.compress(raw, 9)
- with open(dump_file, "wb") as f:
+ with open(os.path.expanduser(dump_file), "wb") as f:
f.write(raw)
mitmproxy.ctx.log("HAR dump finished (wrote %s bytes to file)" % len(json_dump))
diff --git a/mitmproxy/net/http/encoding.py b/mitmproxy/net/http/encoding.py
index e123a033..5da07099 100644
--- a/mitmproxy/net/http/encoding.py
+++ b/mitmproxy/net/http/encoding.py
@@ -31,8 +31,8 @@ def decode(encoded: Union[str, bytes], encoding: str, errors: str='strict') -> U
Raises:
ValueError, if decoding fails.
"""
- if len(encoded) == 0:
- return encoded
+ if encoded is None:
+ return None
global _cache
cached = (
@@ -72,8 +72,8 @@ def encode(decoded: Union[str, bytes], encoding: str, errors: str='strict') -> U
Raises:
ValueError, if encoding fails.
"""
- if len(decoded) == 0:
- return decoded
+ if decoded is None:
+ return None
global _cache
cached = (
@@ -86,10 +86,7 @@ def encode(decoded: Union[str, bytes], encoding: str, errors: str='strict') -> U
return _cache.encoded
try:
try:
- value = decoded
- if isinstance(value, str):
- value = decoded.encode()
- encoded = custom_encode[encoding](value)
+ encoded = custom_encode[encoding](decoded)
except KeyError:
encoded = codecs.encode(decoded, encoding, errors)
if encoding in ("gzip", "deflate", "br"):
@@ -114,12 +111,14 @@ def identity(content):
return content
-def decode_gzip(content):
+def decode_gzip(content: bytes) -> bytes:
+ if not content:
+ return b""
gfile = gzip.GzipFile(fileobj=BytesIO(content))
return gfile.read()
-def encode_gzip(content):
+def encode_gzip(content: bytes) -> bytes:
s = BytesIO()
gf = gzip.GzipFile(fileobj=s, mode='wb')
gf.write(content)
@@ -127,15 +126,17 @@ def encode_gzip(content):
return s.getvalue()
-def decode_brotli(content):
+def decode_brotli(content: bytes) -> bytes:
+ if not content:
+ return b""
return brotli.decompress(content)
-def encode_brotli(content):
+def encode_brotli(content: bytes) -> bytes:
return brotli.compress(content)
-def decode_deflate(content):
+def decode_deflate(content: bytes) -> bytes:
"""
Returns decompressed data for DEFLATE. Some servers may respond with
compressed data without a zlib header or checksum. An undocumented
@@ -144,13 +145,15 @@ def decode_deflate(content):
http://bugs.python.org/issue5784
"""
+ if not content:
+ return b""
try:
return zlib.decompress(content)
except zlib.error:
return zlib.decompress(content, -15)
-def encode_deflate(content):
+def encode_deflate(content: bytes) -> bytes:
"""
Returns compressed content, always including zlib header and checksum.
"""
diff --git a/test/mitmproxy/net/http/test_encoding.py b/test/mitmproxy/net/http/test_encoding.py
index 11619c44..8dac12cb 100644
--- a/test/mitmproxy/net/http/test_encoding.py
+++ b/test/mitmproxy/net/http/test_encoding.py
@@ -21,26 +21,55 @@ def test_identity(encoder):
'deflate',
])
def test_encoders(encoder):
- assert "" == encoding.decode("", encoder)
+ """
+ This test is for testing byte->byte encoding/decoding
+ """
+ assert encoding.decode(None, encoder) is None
+ assert encoding.encode(None, encoder) is None
+
assert b"" == encoding.decode(b"", encoder)
- assert "string" == encoding.decode(
+ assert b"string" == encoding.decode(
encoding.encode(
- "string",
+ b"string",
encoder
),
encoder
)
- assert b"string" == encoding.decode(
+
+ with pytest.raises(TypeError):
+ encoding.encode("string", encoder)
+
+ with pytest.raises(TypeError):
+ encoding.decode("string", encoder)
+ with pytest.raises(ValueError):
+ encoding.decode(b"foobar", encoder)
+
+
+@pytest.mark.parametrize("encoder", [
+ 'utf8',
+ 'latin-1'
+])
+def test_encoders_strings(encoder):
+ """
+ This test is for testing byte->str decoding
+ and str->byte encoding
+ """
+ assert "" == encoding.decode(b"", encoder)
+
+ assert "string" == encoding.decode(
encoding.encode(
- b"string",
+ "string",
encoder
),
encoder
)
- with pytest.raises(ValueError):
- encoding.decode(b"foobar", encoder)
+ with pytest.raises(TypeError):
+ encoding.encode(b"string", encoder)
+
+ with pytest.raises(TypeError):
+ encoding.decode("foobar", encoder)
def test_cache():