1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
import pytest
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy import io
from mitmproxy import exceptions
from mitmproxy import options
from mitmproxy.addons import save
from mitmproxy.addons import view
def test_configure(tmpdir):
sa = save.Save()
with taddons.context(options=options.Options()) as tctx:
with pytest.raises(exceptions.OptionsError):
tctx.configure(sa, save_stream_file=str(tmpdir))
with pytest.raises(Exception, match="Invalid filter"):
tctx.configure(
sa, save_stream_file=str(tmpdir.join("foo")), save_stream_filter="~~"
)
tctx.configure(sa, save_stream_filter="foo")
assert sa.filt
tctx.configure(sa, save_stream_filter=None)
assert not sa.filt
def rd(p):
x = io.FlowReader(open(p, "rb"))
return list(x.stream())
def test_tcp(tmpdir):
sa = save.Save()
with taddons.context() as tctx:
p = str(tmpdir.join("foo"))
tctx.configure(sa, save_stream_file=p)
tt = tflow.ttcpflow()
sa.tcp_start(tt)
sa.tcp_end(tt)
tctx.configure(sa, save_stream_file=None)
assert rd(p)
def test_save_command(tmpdir):
sa = save.Save()
with taddons.context() as tctx:
p = str(tmpdir.join("foo"))
sa.save([tflow.tflow(resp=True)], p)
assert len(rd(p)) == 1
sa.save([tflow.tflow(resp=True)], p)
assert len(rd(p)) == 1
sa.save([tflow.tflow(resp=True)], "+" + p)
assert len(rd(p)) == 2
with pytest.raises(exceptions.CommandError):
sa.save([tflow.tflow(resp=True)], str(tmpdir))
v = view.View()
tctx.master.addons.add(v)
tctx.master.addons.add(sa)
tctx.master.commands.call_args("save.file", ["@shown", p])
def test_simple(tmpdir):
sa = save.Save()
with taddons.context() as tctx:
p = str(tmpdir.join("foo"))
tctx.configure(sa, save_stream_file=p)
f = tflow.tflow(resp=True)
sa.request(f)
sa.response(f)
tctx.configure(sa, save_stream_file=None)
assert rd(p)[0].response
tctx.configure(sa, save_stream_file="+" + p)
f = tflow.tflow()
sa.request(f)
tctx.configure(sa, save_stream_file=None)
assert not rd(p)[1].response
|