1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
import typing
from typing import Optional
from mitmproxy import controller
from mitmproxy import exceptions
from mitmproxy import addons
from mitmproxy import io
from mitmproxy import options
from mitmproxy import master
from mitmproxy.addons import dumper, termlog
from netlib import tcp
class DumpError(Exception):
pass
class Options(options.Options):
def __init__(
self,
keepserving: bool = False,
filtstr: Optional[str] = None,
flow_detail: int = 1,
tfile: Optional[typing.io.TextIO] = None,
**kwargs
):
self.filtstr = filtstr
self.flow_detail = flow_detail
self.keepserving = keepserving
self.tfile = tfile
super().__init__(**kwargs)
class DumpMaster(master.Master):
def __init__(self, options, server):
master.Master.__init__(self, options, server)
self.has_errored = False
self.addons.add(termlog.TermLog())
self.addons.add(*addons.default_addons())
self.addons.add(dumper.Dumper())
# This line is just for type hinting
self.options = self.options # type: Options
if not self.options.no_server:
self.add_log(
"Proxy server listening at http://{}".format(server.address),
"info"
)
if self.server and self.options.http2 and not tcp.HAS_ALPN: # pragma: no cover
self.add_log(
"ALPN support missing (OpenSSL 1.0.2+ required)!\n"
"HTTP/2 is disabled. Use --no-http2 to silence this warning.",
"error"
)
if options.rfile:
try:
self.load_flows_file(options.rfile)
except exceptions.FlowReadException as v:
self.add_log("Flow file corrupted.", "error")
raise DumpError(v)
def _readflow(self, paths):
"""
Utitility function that reads a list of flows
or raises a DumpError if that fails.
"""
try:
return io.read_flows_from_paths(paths)
except exceptions.FlowReadException as e:
raise DumpError(str(e))
@controller.handler
def log(self, e):
if e.level == "error":
self.has_errored = True
def run(self): # pragma: no cover
if self.options.rfile and not self.options.keepserving:
self.addons.done()
return
super().run()
|