diff options
Diffstat (limited to 'netlib/http/message.py')
-rw-r--r-- | netlib/http/message.py | 300 |
1 files changed, 0 insertions, 300 deletions
diff --git a/netlib/http/message.py b/netlib/http/message.py deleted file mode 100644 index 772a124e..00000000 --- a/netlib/http/message.py +++ /dev/null @@ -1,300 +0,0 @@ -import re -import warnings -from typing import Optional - -from mitmproxy.utils import strutils -from netlib.http import encoding -from mitmproxy.types import serializable -from netlib.http import headers - - -# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded. -def _native(x): - return x.decode("utf-8", "surrogateescape") - - -def _always_bytes(x): - return strutils.always_bytes(x, "utf-8", "surrogateescape") - - -class MessageData(serializable.Serializable): - def __eq__(self, other): - if isinstance(other, MessageData): - return self.__dict__ == other.__dict__ - return False - - def __ne__(self, other): - return not self.__eq__(other) - - def set_state(self, state): - for k, v in state.items(): - if k == "headers": - v = headers.Headers.from_state(v) - setattr(self, k, v) - - def get_state(self): - state = vars(self).copy() - state["headers"] = state["headers"].get_state() - return state - - @classmethod - def from_state(cls, state): - state["headers"] = headers.Headers.from_state(state["headers"]) - return cls(**state) - - -class Message(serializable.Serializable): - def __eq__(self, other): - if isinstance(other, Message): - return self.data == other.data - return False - - def __ne__(self, other): - return not self.__eq__(other) - - def get_state(self): - return self.data.get_state() - - def set_state(self, state): - self.data.set_state(state) - - @classmethod - def from_state(cls, state): - state["headers"] = headers.Headers.from_state(state["headers"]) - return cls(**state) - - @property - def headers(self): - """ - Message headers object - - Returns: - netlib.http.Headers - """ - return self.data.headers - - @headers.setter - def headers(self, h): - self.data.headers = h - - @property - def raw_content(self) -> bytes: - """ - The raw (encoded) HTTP message body - - See also: :py:attr:`content`, :py:class:`text` - """ - return self.data.content - - @raw_content.setter - def raw_content(self, content): - self.data.content = content - - def get_content(self, strict: bool=True) -> bytes: - """ - The HTTP message body decoded with the content-encoding header (e.g. gzip) - - Raises: - ValueError, when the content-encoding is invalid and strict is True. - - See also: :py:class:`raw_content`, :py:attr:`text` - """ - if self.raw_content is None: - return None - ce = self.headers.get("content-encoding") - if ce: - try: - return encoding.decode(self.raw_content, ce) - except ValueError: - if strict: - raise - return self.raw_content - else: - return self.raw_content - - def set_content(self, value): - if value is None: - self.raw_content = None - return - if not isinstance(value, bytes): - raise TypeError( - "Message content must be bytes, not {}. " - "Please use .text if you want to assign a str." - .format(type(value).__name__) - ) - ce = self.headers.get("content-encoding") - try: - self.raw_content = encoding.encode(value, ce or "identity") - except ValueError: - # So we have an invalid content-encoding? - # Let's remove it! - del self.headers["content-encoding"] - self.raw_content = value - self.headers["content-length"] = str(len(self.raw_content)) - - content = property(get_content, set_content) - - @property - def http_version(self): - """ - Version string, e.g. "HTTP/1.1" - """ - return _native(self.data.http_version) - - @http_version.setter - def http_version(self, http_version): - self.data.http_version = _always_bytes(http_version) - - @property - def timestamp_start(self): - """ - First byte timestamp - """ - return self.data.timestamp_start - - @timestamp_start.setter - def timestamp_start(self, timestamp_start): - self.data.timestamp_start = timestamp_start - - @property - def timestamp_end(self): - """ - Last byte timestamp - """ - return self.data.timestamp_end - - @timestamp_end.setter - def timestamp_end(self, timestamp_end): - self.data.timestamp_end = timestamp_end - - def _get_content_type_charset(self) -> Optional[str]: - ct = headers.parse_content_type(self.headers.get("content-type", "")) - if ct: - return ct[2].get("charset") - - def _guess_encoding(self) -> str: - enc = self._get_content_type_charset() - if enc: - return enc - - if "json" in self.headers.get("content-type", ""): - return "utf8" - else: - # We may also want to check for HTML meta tags here at some point. - return "latin-1" - - def get_text(self, strict: bool=True) -> str: - """ - The HTTP message body decoded with both content-encoding header (e.g. gzip) - and content-type header charset. - - Raises: - ValueError, when either content-encoding or charset is invalid and strict is True. - - See also: :py:attr:`content`, :py:class:`raw_content` - """ - if self.raw_content is None: - return None - enc = self._guess_encoding() - - content = self.get_content(strict) - try: - return encoding.decode(content, enc) - except ValueError: - if strict: - raise - return content.decode("utf8", "surrogateescape") - - def set_text(self, text): - if text is None: - self.content = None - return - enc = self._guess_encoding() - - try: - self.content = encoding.encode(text, enc) - except ValueError: - # Fall back to UTF-8 and update the content-type header. - ct = headers.parse_content_type(self.headers.get("content-type", "")) or ("text", "plain", {}) - ct[2]["charset"] = "utf-8" - self.headers["content-type"] = headers.assemble_content_type(*ct) - enc = "utf8" - self.content = text.encode(enc, "surrogateescape") - - text = property(get_text, set_text) - - def decode(self, strict=True): - """ - Decodes body based on the current Content-Encoding header, then - removes the header. If there is no Content-Encoding header, no - action is taken. - - Raises: - ValueError, when the content-encoding is invalid and strict is True. - """ - self.raw_content = self.get_content(strict) - self.headers.pop("content-encoding", None) - - def encode(self, e): - """ - Encodes body with the encoding e, where e is "gzip", "deflate", "identity", or "br". - Any existing content-encodings are overwritten, - the content is not decoded beforehand. - - Raises: - ValueError, when the specified content-encoding is invalid. - """ - self.headers["content-encoding"] = e - self.content = self.raw_content - if "content-encoding" not in self.headers: - raise ValueError("Invalid content encoding {}".format(repr(e))) - - def replace(self, pattern, repl, flags=0, count=0): - """ - Replaces a regular expression pattern with repl in both the headers - and the body of the message. Encoded body will be decoded - before replacement, and re-encoded afterwards. - - Returns: - The number of replacements made. - """ - if isinstance(pattern, str): - pattern = strutils.escaped_str_to_bytes(pattern) - if isinstance(repl, str): - repl = strutils.escaped_str_to_bytes(repl) - replacements = 0 - if self.content: - self.content, replacements = re.subn( - pattern, repl, self.content, flags=flags, count=count - ) - replacements += self.headers.replace(pattern, repl, flags=flags, count=count) - return replacements - - # Legacy - - @property - def body(self): # pragma: no cover - warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning) - return self.content - - @body.setter - def body(self, body): # pragma: no cover - warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning) - self.content = body - - -class decoded: - """ - Deprecated: You can now directly use :py:attr:`content`. - :py:attr:`raw_content` has the encoded content. - """ - - def __init__(self, message): # pragma no cover - warnings.warn("decoded() is deprecated, you can now directly use .content instead. " - ".raw_content has the encoded content.", DeprecationWarning) - - def __enter__(self): # pragma no cover - pass - - def __exit__(self, type, value, tb): # pragma no cover - pass |