diff options
Diffstat (limited to 'netlib/odict.py')
-rw-r--r-- | netlib/odict.py | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/netlib/odict.py b/netlib/odict.py new file mode 100644 index 00000000..1e6e381a --- /dev/null +++ b/netlib/odict.py @@ -0,0 +1,193 @@ +from __future__ import (absolute_import, print_function, division) +import re +import copy +import six + +from .utils import Serializable + + +def safe_subn(pattern, repl, target, *args, **kwargs): + """ + There are Unicode conversion problems with re.subn. We try to smooth + that over by casting the pattern and replacement to strings. We really + need a better solution that is aware of the actual content ecoding. + """ + return re.subn(str(pattern), str(repl), target, *args, **kwargs) + + +class ODict(Serializable): + + """ + A dictionary-like object for managing ordered (key, value) data. Think + about it as a convenient interface to a list of (key, value) tuples. + """ + + def __init__(self, lst=None): + self.lst = lst or [] + + def _kconv(self, s): + return s + + def __eq__(self, other): + return self.lst == other.lst + + def __ne__(self, other): + return not self.__eq__(other) + + def __iter__(self): + return self.lst.__iter__() + + def __getitem__(self, k): + """ + Returns a list of values matching key. + """ + ret = [] + k = self._kconv(k) + for i in self.lst: + if self._kconv(i[0]) == k: + ret.append(i[1]) + return ret + + def keys(self): + return list(set([self._kconv(i[0]) for i in self.lst])) + + def _filter_lst(self, k, lst): + k = self._kconv(k) + new = [] + for i in lst: + if self._kconv(i[0]) != k: + new.append(i) + return new + + def __len__(self): + """ + Total number of (key, value) pairs. + """ + return len(self.lst) + + def __setitem__(self, k, valuelist): + """ + Sets the values for key k. If there are existing values for this + key, they are cleared. + """ + if isinstance(valuelist, six.text_type) or isinstance(valuelist, six.binary_type): + raise ValueError( + "Expected list of values instead of string. " + "Example: odict[b'Host'] = [b'www.example.com']" + ) + kc = self._kconv(k) + new = [] + for i in self.lst: + if self._kconv(i[0]) == kc: + if valuelist: + new.append([k, valuelist.pop(0)]) + else: + new.append(i) + while valuelist: + new.append([k, valuelist.pop(0)]) + self.lst = new + + def __delitem__(self, k): + """ + Delete all items matching k. + """ + self.lst = self._filter_lst(k, self.lst) + + def __contains__(self, k): + k = self._kconv(k) + for i in self.lst: + if self._kconv(i[0]) == k: + return True + return False + + def add(self, key, value, prepend=False): + if prepend: + self.lst.insert(0, [key, value]) + else: + self.lst.append([key, value]) + + def get(self, k, d=None): + if k in self: + return self[k] + else: + return d + + def get_first(self, k, d=None): + if k in self: + return self[k][0] + else: + return d + + def items(self): + return self.lst[:] + + def copy(self): + """ + Returns a copy of this object. + """ + lst = copy.deepcopy(self.lst) + return self.__class__(lst) + + def extend(self, other): + """ + Add the contents of other, preserving any duplicates. + """ + self.lst.extend(other.lst) + + def __repr__(self): + return repr(self.lst) + + def in_any(self, key, value, caseless=False): + """ + Do any of the values matching key contain value? + + If caseless is true, value comparison is case-insensitive. + """ + if caseless: + value = value.lower() + for i in self[key]: + if caseless: + i = i.lower() + if value in i: + return True + return False + + def replace(self, pattern, repl, *args, **kwargs): + """ + Replaces a regular expression pattern with repl in both keys and + values. Encoded content will be decoded before replacement, and + re-encoded afterwards. + + Returns the number of replacements made. + """ + nlst, count = [], 0 + for i in self.lst: + k, c = safe_subn(pattern, repl, i[0], *args, **kwargs) + count += c + v, c = safe_subn(pattern, repl, i[1], *args, **kwargs) + count += c + nlst.append([k, v]) + self.lst = nlst + return count + + # Implement the StateObject protocol from mitmproxy + def get_state(self): + return [tuple(i) for i in self.lst] + + def set_state(self, state): + self.lst = [list(i) for i in state] + + @classmethod + def from_state(cls, state): + return cls([list(i) for i in state]) + + +class ODictCaseless(ODict): + + """ + A variant of ODict with "caseless" keys. This version _preserves_ key + case, but does not consider case when setting or getting items. + """ + + def _kconv(self, s): + return s.lower() |