aboutsummaryrefslogtreecommitdiffstats
path: root/test/tutils.py
blob: f7a49577193f336b71b9fefe4428500cc883c748 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os.path, threading, Queue
import libpry
from libmproxy import proxy, filt, flow, controller
import serv, sslserv
import random

def treq(conn=None):
    if not conn:
        conn = flow.ClientConnect(("address", 22))
    headers = flow.Headers()
    headers["header"] = ["qvalue"]
    return flow.Request(conn, "host", 80, "http", "GET", "/path", headers, "content")


def tresp(req=None):
    if not req:
        req = treq()
    headers = flow.Headers()
    headers["header_response"] = ["svalue"]
    return flow.Response(req, 200, "message", headers, "content_response")


def tflow():
    r = treq()
    return flow.Flow(r)


def tflow_full():
    r = treq()
    f = flow.Flow(r)
    f.response = tresp(r)
    return f


def tflow_err():
    r = treq()
    f = flow.Flow(r)
    f.error = flow.Error(r, "error")
    return f


# Yes, the random ports are horrible. During development, sockets are often not
# properly closed during error conditions, which means you have to wait until
# you can re-bind to the same port. This is a pain in the ass, so we just pick
# a random port and keep moving.
PROXL_PORT = random.randint(10000, 20000)
HTTP_PORT = random.randint(20000, 30000)
HTTPS_PORT = random.randint(30000, 40000)


class TestMaster(controller.Master):
    def __init__(self, port, testq):
        serv = proxy.ProxyServer(proxy.SSLConfig("data/testkey.pem"), port)
        controller.Master.__init__(self, serv)
        self.testq = testq
        self.log = []

    def clear(self):
        self.log = []

    def handle(self, m):
        self.log.append(m)
        m._ack()


class ProxyThread(threading.Thread):
    def __init__(self, port, testq):
        self.tmaster = TestMaster(port, testq)
        controller.should_exit = False
        threading.Thread.__init__(self)

    def run(self):
        self.tmaster.run()

    def shutdown(self):
        self.tmaster.shutdown()


class ServerThread(threading.Thread):
    def __init__(self, server):
        self.server = server
        threading.Thread.__init__(self)

    def run(self):
        self.server.serve_forever()

    def shutdown(self):
        self.server.shutdown()


class TestServers(libpry.TestContainer):
    def setUpAll(self):
        self.tqueue = Queue.Queue()
        # We don't make any concurrent requests, so we can access
        # the attributes on this object safely.
        self.proxthread = ProxyThread(PROXL_PORT, self.tqueue)
        self.threads = [
            ServerThread(serv.make(HTTP_PORT)),
            ServerThread(sslserv.make(HTTPS_PORT)),
            self.proxthread
        ]
        for i in self.threads:
            i.start()

    def setUp(self):
        self.proxthread.tmaster.clear()

    def tearDownAll(self):
        for i in self.threads:
            i.shutdown()


class ProxTest(libpry.AutoTree):
    def log(self):
        pthread = self.findAttr("proxthread")
        return pthread.tmaster.log