aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/tools/dump.py
blob: 4e2844a1f26c04b3094fda6ab324a3af4903a7a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from typing import Optional

from mitmproxy import controller
from mitmproxy import exceptions
from mitmproxy import addons
from mitmproxy import options
from mitmproxy import master
from mitmproxy.addons import dumper, termlog
from mitmproxy.net import tcp


class DumpError(Exception):
    pass


class Options(options.Options):
    def __init__(
            self,
            *,  # all args are keyword-only.
            keepserving: bool = False,
            filtstr: Optional[str] = None,
            flow_detail: int = 1,
            **kwargs
    ) -> None:
        self.filtstr = filtstr
        self.flow_detail = flow_detail
        self.keepserving = keepserving
        super().__init__(**kwargs)


class DumpMaster(master.Master):

    def __init__(self, options, server, with_termlog=True, with_dumper=True):
        master.Master.__init__(self, options, server)
        self.has_errored = False
        if with_termlog:
            self.addons.add(termlog.TermLog())
        self.addons.add(*addons.default_addons())
        if with_dumper:
            self.addons.add(dumper.Dumper())
        # This line is just for type hinting
        self.options = self.options  # type: Options

        if not self.options.no_server:
            self.add_log(
                "Proxy server listening at http://{}".format(server.address),
                "info"
            )

        if self.server and self.options.http2 and not tcp.HAS_ALPN:  # pragma: no cover
            self.add_log(
                "ALPN support missing (OpenSSL 1.0.2+ required)!\n"
                "HTTP/2 is disabled. Use --no-http2 to silence this warning.",
                "error"
            )

        if options.rfile:
            try:
                self.load_flows_file(options.rfile)
            except exceptions.FlowReadException as v:
                self.add_log("Flow file corrupted.", "error")
                raise DumpError(v)

    @controller.handler
    def log(self, e):
        if e.level == "error":
            self.has_errored = True

    def run(self):  # pragma: no cover
        if self.options.rfile and not self.options.keepserving:
            self.addons.done()
            return
        super().run()