aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/tools/dump.py
blob: 47f69303990128eede9403b7785f5f7113502c82 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import typing
from typing import Optional

from mitmproxy import controller
from mitmproxy import exceptions
from mitmproxy import addons
from mitmproxy import io
from mitmproxy import options
from mitmproxy import master
from mitmproxy.addons import dumper, termlog
from netlib import tcp


class DumpError(Exception):
    pass


class Options(options.Options):
    def __init__(
            self,
            keepserving: bool = False,
            filtstr: Optional[str] = None,
            flow_detail: int = 1,
            tfile: Optional[typing.io.TextIO] = None,
            **kwargs
    ):
        self.filtstr = filtstr
        self.flow_detail = flow_detail
        self.keepserving = keepserving
        self.tfile = tfile
        super().__init__(**kwargs)


class DumpMaster(master.Master):

    def __init__(self, options, server):
        master.Master.__init__(self, options, server)
        self.has_errored = False
        self.addons.add(termlog.TermLog())
        self.addons.add(*addons.default_addons())
        self.addons.add(dumper.Dumper())
        # This line is just for type hinting
        self.options = self.options  # type: Options

        if not self.options.no_server:
            self.add_log(
                "Proxy server listening at http://{}".format(server.address),
                "info"
            )

        if self.server and self.options.http2 and not tcp.HAS_ALPN:  # pragma: no cover
            self.add_log(
                "ALPN support missing (OpenSSL 1.0.2+ required)!\n"
                "HTTP/2 is disabled. Use --no-http2 to silence this warning.",
                "error"
            )

        if options.rfile:
            try:
                self.load_flows_file(options.rfile)
            except exceptions.FlowReadException as v:
                self.add_log("Flow file corrupted.", "error")
                raise DumpError(v)

    def _readflow(self, paths):
        """
        Utitility function that reads a list of flows
        or raises a DumpError if that fails.
        """
        try:
            return io.read_flows_from_paths(paths)
        except exceptions.FlowReadException as e:
            raise DumpError(str(e))

    @controller.handler
    def log(self, e):
        if e.level == "error":
            self.has_errored = True

    def run(self):  # pragma: no cover
        if self.options.rfile and not self.options.keepserving:
            self.addons.done()
            return
        super().run()