From 7fdcbb09e6034ab1f76724965cfdf45f3d775129 Mon Sep 17 00:00:00 2001 From: anneborcherding <55282902+anneborcherding@users.noreply.github.com> Date: Mon, 4 May 2020 10:37:13 +0200 Subject: added add-ons that enhance the performance of web application scanners. (#3961) * added add-ons that enhance the performance of web application scanners. Co-authored-by: weichweich <14820950+weichweich@users.noreply.github.com> --- examples/complex/webscanner_helper/urldict.py | 90 +++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 examples/complex/webscanner_helper/urldict.py (limited to 'examples/complex/webscanner_helper/urldict.py') diff --git a/examples/complex/webscanner_helper/urldict.py b/examples/complex/webscanner_helper/urldict.py new file mode 100644 index 00000000..28e6b5e6 --- /dev/null +++ b/examples/complex/webscanner_helper/urldict.py @@ -0,0 +1,90 @@ +import itertools +import json +import typing +from collections.abc import MutableMapping +from typing import Any, Dict, Generator, List, TextIO, Callable + +from mitmproxy import flowfilter +from mitmproxy.http import HTTPFlow + + +def f_id(x): + return x + + +class URLDict(MutableMapping): + """Data structure to store information using filters as keys.""" + def __init__(self): + self.store: Dict[flowfilter.TFilter, Any] = {} + + def __getitem__(self, key, *, count=0): + if count: + ret = itertools.islice(self.get_generator(key), 0, count) + else: + ret = list(self.get_generator(key)) + + if ret: + return ret + else: + raise KeyError + + def __setitem__(self, key: str, value): + fltr = flowfilter.parse(key) + if fltr: + self.store.__setitem__(fltr, value) + else: + raise ValueError("Not a valid filter") + + def __delitem__(self, key): + self.store.__delitem__(key) + + def __iter__(self): + return self.store.__iter__() + + def __len__(self): + return self.store.__len__() + + def get_generator(self, flow: HTTPFlow) -> Generator[Any, None, None]: + + for fltr, value in self.store.items(): + if flowfilter.match(fltr, flow): + yield value + + def get(self, flow: HTTPFlow, default=None, *, count=0) -> List[Any]: + try: + return self.__getitem__(flow, count=count) + except KeyError: + return default + + @classmethod + def _load(cls, json_obj, value_loader: Callable = f_id): + url_dict = cls() + for fltr, value in json_obj.items(): + url_dict[fltr] = value_loader(value) + return url_dict + + @classmethod + def load(cls, f: TextIO, value_loader: Callable = f_id): + json_obj = json.load(f) + return cls._load(json_obj, value_loader) + + @classmethod + def loads(cls, json_str: str, value_loader: Callable = f_id): + json_obj = json.loads(json_str) + return cls._load(json_obj, value_loader) + + def _dump(self, value_dumper: Callable = f_id) -> Dict: + dumped: Dict[typing.Union[flowfilter.TFilter, str], Any] = {} + for fltr, value in self.store.items(): + if hasattr(fltr, 'pattern'): + # cast necessary for mypy + dumped[typing.cast(Any, fltr).pattern] = value_dumper(value) + else: + dumped[str(fltr)] = value_dumper(value) + return dumped + + def dump(self, f: TextIO, value_dumper: Callable = f_id): + json.dump(self._dump(value_dumper), f) + + def dumps(self, value_dumper: Callable = f_id): + return json.dumps(self._dump(value_dumper)) -- cgit v1.2.3