1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
import os.path, threading, Queue
import libpry
from libmproxy import proxy, filt, flow, controller
import serv, sslserv
import random
def treq(conn=None):
if not conn:
conn = flow.ClientConnect(("address", 22))
headers = flow.Headers()
headers["header"] = ["qvalue"]
return flow.Request(conn, "host", 80, "http", "GET", "/path", headers, "content")
def tresp(req=None):
if not req:
req = treq()
headers = flow.Headers()
headers["header_response"] = ["svalue"]
return flow.Response(req, 200, "message", headers, "content_response")
def tflow():
r = treq()
return flow.Flow(r)
def tflow_full():
r = treq()
f = flow.Flow(r)
f.response = tresp(r)
return f
def tflow_err():
r = treq()
f = flow.Flow(r)
f.error = flow.Error(r, "error")
return f
# Yes, the random ports are horrible. During development, sockets are often not
# properly closed during error conditions, which means you have to wait until
# you can re-bind to the same port. This is a pain in the ass, so we just pick
# a random port and keep moving.
PROXL_PORT = random.randint(10000, 20000)
HTTP_PORT = random.randint(20000, 30000)
HTTPS_PORT = random.randint(30000, 40000)
class TestMaster(controller.Master):
def __init__(self, port, testq):
serv = proxy.ProxyServer(proxy.SSLConfig("data/testkey.pem"), port)
controller.Master.__init__(self, serv)
self.testq = testq
self.log = []
def clear(self):
self.log = []
def handle(self, m):
self.log.append(m)
m._ack()
class ProxyThread(threading.Thread):
def __init__(self, port, testq):
self.tmaster = TestMaster(port, testq)
controller.should_exit = False
threading.Thread.__init__(self)
def run(self):
self.tmaster.run()
def shutdown(self):
self.tmaster.shutdown()
class ServerThread(threading.Thread):
def __init__(self, server):
self.server = server
threading.Thread.__init__(self)
def run(self):
self.server.serve_forever()
def shutdown(self):
self.server.shutdown()
class TestServers(libpry.TestContainer):
def setUpAll(self):
self.tqueue = Queue.Queue()
# We don't make any concurrent requests, so we can access
# the attributes on this object safely.
self.proxthread = ProxyThread(PROXL_PORT, self.tqueue)
self.threads = [
ServerThread(serv.make(HTTP_PORT)),
ServerThread(sslserv.make(HTTPS_PORT)),
self.proxthread
]
for i in self.threads:
i.start()
def setUp(self):
self.proxthread.tmaster.clear()
def tearDownAll(self):
for i in self.threads:
i.shutdown()
class ProxTest(libpry.AutoTree):
def log(self):
pthread = self.findAttr("proxthread")
return pthread.tmaster.log
|