aboutsummaryrefslogtreecommitdiffstats
path: root/netlib
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@nullcube.com>2015-06-22 14:52:23 +1200
committerAldo Cortesi <aldo@nullcube.com>2015-06-22 14:52:23 +1200
commit2aa1b98fbf8d03005e022da86e3e534cf25ebf62 (patch)
treeed0dc9ec2ed2cd6f43ea56eec940b759c711626c /netlib
parentf5c5deb2aea047394238f3b993ddf24c60845768 (diff)
downloadmitmproxy-2aa1b98fbf8d03005e022da86e3e534cf25ebf62.tar.gz
mitmproxy-2aa1b98fbf8d03005e022da86e3e534cf25ebf62.tar.bz2
mitmproxy-2aa1b98fbf8d03005e022da86e3e534cf25ebf62.zip
netlib/test.py -> test/tservers.py
Diffstat (limited to 'netlib')
-rw-r--r--netlib/test.py108
1 files changed, 0 insertions, 108 deletions
diff --git a/netlib/test.py b/netlib/test.py
deleted file mode 100644
index 1e1b5e9d..00000000
--- a/netlib/test.py
+++ /dev/null
@@ -1,108 +0,0 @@
-from __future__ import (absolute_import, print_function, division)
-import threading
-import Queue
-import cStringIO
-import OpenSSL
-from . import tcp, certutils
-from test import tutils
-
-
-class ServerThread(threading.Thread):
-
- def __init__(self, server):
- self.server = server
- threading.Thread.__init__(self)
-
- def run(self):
- self.server.serve_forever()
-
- def shutdown(self):
- self.server.shutdown()
-
-
-class ServerTestBase(object):
- ssl = None
- handler = None
- addr = ("localhost", 0)
-
- @classmethod
- def setupAll(cls):
- cls.q = Queue.Queue()
- s = cls.makeserver()
- cls.port = s.address.port
- cls.server = ServerThread(s)
- cls.server.start()
-
- @classmethod
- def makeserver(cls):
- return TServer(cls.ssl, cls.q, cls.handler, cls.addr)
-
- @classmethod
- def teardownAll(cls):
- cls.server.shutdown()
-
- @property
- def last_handler(self):
- return self.server.server.last_handler
-
-
-class TServer(tcp.TCPServer):
-
- def __init__(self, ssl, q, handler_klass, addr):
- """
- ssl: A dictionary of SSL parameters:
-
- cert, key, request_client_cert, cipher_list,
- dhparams, v3_only
- """
- tcp.TCPServer.__init__(self, addr)
-
- if ssl is True:
- self.ssl = dict()
- elif isinstance(ssl, dict):
- self.ssl = ssl
- else:
- self.ssl = None
-
- self.q = q
- self.handler_klass = handler_klass
- self.last_handler = None
-
- def handle_client_connection(self, request, client_address):
- h = self.handler_klass(request, client_address, self)
- self.last_handler = h
- if self.ssl is not None:
- raw_cert = self.ssl.get(
- "cert",
- tutils.test_data.path("data/server.crt"))
- cert = certutils.SSLCert.from_pem(open(raw_cert, "rb").read())
- raw_key = self.ssl.get(
- "key",
- tutils.test_data.path("data/server.key"))
- key = OpenSSL.crypto.load_privatekey(
- OpenSSL.crypto.FILETYPE_PEM,
- open(raw_key, "rb").read())
- if self.ssl.get("v3_only", False):
- method = tcp.SSLv3_METHOD
- options = OpenSSL.SSL.OP_NO_SSLv2 | OpenSSL.SSL.OP_NO_TLSv1
- else:
- method = tcp.SSLv23_METHOD
- options = None
- h.convert_to_ssl(
- cert, key,
- method=method,
- options=options,
- handle_sni=getattr(h, "handle_sni", None),
- request_client_cert=self.ssl.get("request_client_cert", None),
- cipher_list=self.ssl.get("cipher_list", None),
- dhparams=self.ssl.get("dhparams", None),
- chain_file=self.ssl.get("chain_file", None),
- alpn_select=self.ssl.get("alpn_select", None)
- )
- h.handle()
- h.finish()
-
- def handle_error(self, connection, client_address, fp=None):
- s = cStringIO.StringIO()
- tcp.TCPServer.handle_error(self, connection, client_address, s)
- self.q.put(s.getvalue())