1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
import asyncio
import io
import pytest
import asynctest
import mitmproxy.io
from mitmproxy import exceptions
from mitmproxy.addons import readfile
from mitmproxy.test import taddons
from mitmproxy.test import tflow
@pytest.fixture
def data():
f = io.BytesIO()
w = mitmproxy.io.FlowWriter(f)
flows = [
tflow.tflow(resp=True),
tflow.tflow(err=True),
tflow.ttcpflow(),
tflow.ttcpflow(err=True)
]
for flow in flows:
w.add(flow)
f.seek(0)
return f
@pytest.fixture
def corrupt_data():
f = data()
f.seek(0, io.SEEK_END)
f.write(b"qibble")
f.seek(0)
return f
class TestReadFile:
def test_configure(self):
rf = readfile.ReadFile()
with taddons.context(rf) as tctx:
tctx.configure(rf, readfile_filter="~q")
with pytest.raises(Exception, match="Invalid readfile filter"):
tctx.configure(rf, readfile_filter="~~")
@pytest.mark.asyncio
async def test_read(self, tmpdir, data, corrupt_data):
rf = readfile.ReadFile()
with taddons.context(rf) as tctx:
assert not rf.reading()
tf = tmpdir.join("tfile")
with asynctest.patch('mitmproxy.master.Master.load_flow') as mck:
tf.write(data.getvalue())
tctx.configure(
rf,
rfile = str(tf),
readfile_filter = ".*"
)
assert not mck.awaited
rf.running()
await asyncio.sleep(0)
assert mck.awaited
tf.write(corrupt_data.getvalue())
tctx.configure(rf, rfile=str(tf))
rf.running()
assert await tctx.master.await_log("corrupted")
@pytest.mark.asyncio
async def test_corrupt(self, corrupt_data):
rf = readfile.ReadFile()
with taddons.context(rf) as tctx:
with pytest.raises(exceptions.FlowReadException):
await rf.load_flows(io.BytesIO(b"qibble"))
tctx.master.clear()
with pytest.raises(exceptions.FlowReadException):
await rf.load_flows(corrupt_data)
assert await tctx.master.await_log("file corrupted")
@pytest.mark.asyncio
async def test_nonexistent_file(self):
rf = readfile.ReadFile()
with taddons.context(rf) as tctx:
with pytest.raises(exceptions.FlowReadException):
await rf.load_flows_from_path("nonexistent")
assert await tctx.master.await_log("nonexistent")
class TestReadFileStdin:
@asynctest.patch('sys.stdin')
@pytest.mark.asyncio
async def test_stdin(self, stdin, data, corrupt_data):
rf = readfile.ReadFileStdin()
with taddons.context(rf):
with asynctest.patch('mitmproxy.master.Master.load_flow') as mck:
stdin.buffer = data
assert not mck.awaited
await rf.load_flows(stdin.buffer)
assert mck.awaited
stdin.buffer = corrupt_data
with pytest.raises(exceptions.FlowReadException):
await rf.load_flows(stdin.buffer)
@pytest.mark.asyncio
async def test_normal(self, tmpdir, data):
rf = readfile.ReadFileStdin()
with taddons.context(rf) as tctx:
tf = tmpdir.join("tfile")
with asynctest.patch('mitmproxy.master.Master.load_flow') as mck:
tf.write(data.getvalue())
tctx.configure(rf, rfile=str(tf))
assert not mck.awaited
rf.running()
await asyncio.sleep(0)
assert mck.awaited
|