diff options
-rw-r--r-- | libmproxy/certutils.py | 235 | ||||
-rw-r--r-- | libmproxy/proxy.py | 8 | ||||
-rw-r--r-- | libmproxy/utils.py | 238 | ||||
-rw-r--r-- | test/test_certutils.py | 69 | ||||
-rw-r--r-- | test/test_utils.py | 63 |
5 files changed, 310 insertions, 303 deletions
diff --git a/libmproxy/certutils.py b/libmproxy/certutils.py new file mode 100644 index 00000000..6e4c330a --- /dev/null +++ b/libmproxy/certutils.py @@ -0,0 +1,235 @@ +import subprocess, os, tempfile, ssl, hashlib, socket, re +import utils + +CERT_SLEEP_TIME = 1 +CERT_EXPIRY = str(365 * 3) + + +def dummy_ca(path): + """ + Creates a dummy CA, and writes it to path. + + This function also creates the necessary directories if they don't exist. + + Returns True if operation succeeded, False if not. + """ + dirname = os.path.dirname(path) + if not os.path.exists(dirname): + os.makedirs(dirname) + + if path.endswith(".pem"): + basename, _ = os.path.splitext(path) + else: + basename = path + + cmd = [ + "openssl", + "req", + "-new", + "-x509", + "-config", utils.pkg_data.path("resources/ca.cnf"), + "-nodes", + "-days", CERT_EXPIRY, + "-out", path, + "-newkey", "rsa:1024", + "-keyout", path, + ] + ret = subprocess.call( + cmd, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE + ) + # begin nocover + if ret: + return False + # end nocover + + cmd = [ + "openssl", + "pkcs12", + "-export", + "-password", "pass:", + "-nokeys", + "-in", path, + "-out", os.path.join(dirname, basename + "-cert.p12") + ] + ret = subprocess.call( + cmd, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE + ) + # begin nocover + if ret: + return False + # end nocover + cmd = [ + "openssl", + "x509", + "-in", path, + "-out", os.path.join(dirname, basename + "-cert.pem") + ] + ret = subprocess.call( + cmd, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE + ) + # begin nocover + if ret: + return False + # end nocover + + return True + + +def dummy_cert(certdir, ca, commonname, sans): + """ + certdir: Certificate directory. + ca: Path to the certificate authority file, or None. + commonname: Common name for the generated certificate. + + Returns cert path if operation succeeded, None if not. + """ + namehash = hashlib.sha256(commonname).hexdigest() + certpath = os.path.join(certdir, namehash + ".pem") + if os.path.exists(certpath): + return certpath + + confpath = os.path.join(certdir, namehash + ".cnf") + reqpath = os.path.join(certdir, namehash + ".req") + + template = open(utils.pkg_data.path("resources/cert.cnf")).read() + + ss = [] + for i, v in enumerate(sans): + ss.append("DNS.%s = %s"%(i+1, v)) + ss = "\n".join(ss) + + f = open(confpath, "w") + f.write( + template%( + dict( + commonname=commonname, + sans=ss, + altnames="subjectAltName = @alt_names" if ss else "" + ) + ) + ) + f.close() + + if ca: + # Create a dummy signed certificate. Uses same key as the signing CA + cmd = [ + "openssl", + "req", + "-new", + "-config", confpath, + "-out", reqpath, + "-key", ca, + ] + ret = subprocess.call( + cmd, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE + ) + if ret: return None + cmd = [ + "openssl", + "x509", + "-req", + "-in", reqpath, + "-days", CERT_EXPIRY, + "-out", certpath, + "-CA", ca, + "-CAcreateserial", + "-extfile", confpath, + "-extensions", "v3_cert_req", + ] + ret = subprocess.call( + cmd, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE + ) + if ret: return None + else: + # Create a new selfsigned certificate + key + cmd = [ + "openssl", + "req", + "-new", + "-x509", + "-config", confpath, + "-nodes", + "-days", CERT_EXPIRY, + "-out", certpath, + "-newkey", "rsa:1024", + "-keyout", certpath, + ] + ret = subprocess.call( + cmd, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE + ) + if ret: return None + return certpath + + +def get_remote_cn(host, port): + addr = socket.gethostbyname(host) + s = ssl.get_server_certificate((addr, port)) + f = tempfile.NamedTemporaryFile() + f.write(s) + f.flush() + p = subprocess.Popen( + [ + "openssl", + "x509", + "-in", f.name, + "-text", + "-noout" + ], + stdout = subprocess.PIPE + ) + out, _ = p.communicate() + return parse_text_cert(out) + + +CNRE = re.compile( + r""" + Subject:.*CN=([^ \t\n\r\f\v/]*) + """, + re.VERBOSE|re.MULTILINE +) +SANRE = re.compile( + r""" + X509v3\ Subject\ Alternative\ Name:\s* + (.*)$ + """, + re.VERBOSE|re.MULTILINE +) +def parse_text_cert(txt): + """ + Returns a (common name, [subject alternative names]) tuple. + """ + r = re.search(CNRE, txt) + if r: + cn = r.group(1) + else: + return None + + r = re.search(SANRE, txt) + san = [] + if r: + for i in r.group(1).split(","): + i = i.strip() + k, v = i.split(":") + if k == "DNS": + san.append(v) + else: + san = [] + return (cn, san) diff --git a/libmproxy/proxy.py b/libmproxy/proxy.py index b626e943..a6ba790f 100644 --- a/libmproxy/proxy.py +++ b/libmproxy/proxy.py @@ -21,7 +21,7 @@ import sys, os, string, socket, time import shutil, tempfile, threading import optparse, SocketServer, ssl -import utils, flow +import utils, flow, certutils NAME = "mitmproxy" @@ -350,8 +350,8 @@ class ProxyHandler(SocketServer.StreamRequestHandler): else: sans = [] if self.config.upstream_cert: - host, sans = utils.get_remote_cn(host, port) - ret = utils.dummy_cert(self.config.certdir, self.config.cacert, host, sans) + host, sans = certutils.get_remote_cn(host, port) + ret = certutils.dummy_cert(self.config.certdir, self.config.cacert, host, sans) time.sleep(self.config.cert_wait_time) if not ret: raise ProxyError(502, "mitmproxy: Unable to generate dummy cert.") @@ -524,7 +524,7 @@ def process_proxy_options(parser, options): cacert = os.path.join(options.confdir, "mitmproxy-ca.pem") cacert = os.path.expanduser(cacert) if not os.path.exists(cacert): - utils.dummy_ca(cacert) + certutils.dummy_ca(cacert) if getattr(options, "cache", None) is not None: options.cache = os.path.expanduser(options.cache) body_size_limit = utils.parse_size(options.body_size_limit) diff --git a/libmproxy/utils.py b/libmproxy/utils.py index 8e2097eb..f7cf5f32 100644 --- a/libmproxy/utils.py +++ b/libmproxy/utils.py @@ -12,13 +12,10 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import re, os, subprocess, datetime, urlparse, string, urllib, socket -import time, functools, cgi, textwrap, hashlib, ssl, tempfile +import re, os, datetime, urlparse, string, urllib +import time, functools, cgi, textwrap import json -CERT_SLEEP_TIME = 1 -CERT_EXPIRY = str(365 * 3) - def timestamp(): """ Returns a serializable UTC timestamp. @@ -197,180 +194,6 @@ class Data: pkg_data = Data(__name__) -def dummy_ca(path): - """ - Creates a dummy CA, and writes it to path. - - This function also creates the necessary directories if they don't exist. - - Returns True if operation succeeded, False if not. - """ - dirname = os.path.dirname(path) - if not os.path.exists(dirname): - os.makedirs(dirname) - - if path.endswith(".pem"): - basename, _ = os.path.splitext(path) - else: - basename = path - - cmd = [ - "openssl", - "req", - "-new", - "-x509", - "-config", pkg_data.path("resources/ca.cnf"), - "-nodes", - "-days", CERT_EXPIRY, - "-out", path, - "-newkey", "rsa:1024", - "-keyout", path, - ] - ret = subprocess.call( - cmd, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - stdin=subprocess.PIPE - ) - # begin nocover - if ret: - return False - # end nocover - - cmd = [ - "openssl", - "pkcs12", - "-export", - "-password", "pass:", - "-nokeys", - "-in", path, - "-out", os.path.join(dirname, basename + "-cert.p12") - ] - ret = subprocess.call( - cmd, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - stdin=subprocess.PIPE - ) - # begin nocover - if ret: - return False - # end nocover - cmd = [ - "openssl", - "x509", - "-in", path, - "-out", os.path.join(dirname, basename + "-cert.pem") - ] - ret = subprocess.call( - cmd, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - stdin=subprocess.PIPE - ) - # begin nocover - if ret: - return False - # end nocover - - return True - - -def dummy_cert(certdir, ca, commonname, sans): - """ - certdir: Certificate directory. - ca: Path to the certificate authority file, or None. - commonname: Common name for the generated certificate. - - Returns cert path if operation succeeded, None if not. - """ - namehash = hashlib.sha256(commonname).hexdigest() - certpath = os.path.join(certdir, namehash + ".pem") - if os.path.exists(certpath): - return certpath - - confpath = os.path.join(certdir, namehash + ".cnf") - reqpath = os.path.join(certdir, namehash + ".req") - - template = open(pkg_data.path("resources/cert.cnf")).read() - - ss = [] - for i, v in enumerate(sans): - ss.append("DNS.%s = %s"%(i+1, v)) - ss = "\n".join(ss) - - f = open(confpath, "w") - f.write( - template%( - dict( - commonname=commonname, - sans=ss, - altnames="subjectAltName = @alt_names" if ss else "" - ) - ) - ) - f.close() - - if ca: - # Create a dummy signed certificate. Uses same key as the signing CA - cmd = [ - "openssl", - "req", - "-new", - "-config", confpath, - "-out", reqpath, - "-key", ca, - ] - ret = subprocess.call( - cmd, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - stdin=subprocess.PIPE - ) - if ret: return None - cmd = [ - "openssl", - "x509", - "-req", - "-in", reqpath, - "-days", CERT_EXPIRY, - "-out", certpath, - "-CA", ca, - "-CAcreateserial", - "-extfile", confpath, - "-extensions", "v3_cert_req", - ] - ret = subprocess.call( - cmd, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - stdin=subprocess.PIPE - ) - if ret: return None - else: - # Create a new selfsigned certificate + key - cmd = [ - "openssl", - "req", - "-new", - "-x509", - "-config", confpath, - "-nodes", - "-days", CERT_EXPIRY, - "-out", certpath, - "-newkey", "rsa:1024", - "-keyout", certpath, - ] - ret = subprocess.call( - cmd, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - stdin=subprocess.PIPE - ) - if ret: return None - return certpath - - class LRUCache: """ A decorator that implements a self-expiring LRU cache for class @@ -497,60 +320,3 @@ def parse_size(s): return int(s) * mult except ValueError: raise ValueError("Invalid size specification: %s"%s) - - -def get_remote_cn(host, port): - addr = socket.gethostbyname(host) - s = ssl.get_server_certificate((addr, port)) - f = tempfile.NamedTemporaryFile() - f.write(s) - f.flush() - p = subprocess.Popen( - [ - "openssl", - "x509", - "-in", f.name, - "-text", - "-noout" - ], - stdout = subprocess.PIPE - ) - out, _ = p.communicate() - return parse_text_cert(out) - - -CNRE = re.compile( - r""" - Subject:.*CN=([^ \t\n\r\f\v/]*) - """, - re.VERBOSE|re.MULTILINE -) -SANRE = re.compile( - r""" - X509v3\ Subject\ Alternative\ Name:\s* - (.*)$ - """, - re.VERBOSE|re.MULTILINE -) -def parse_text_cert(txt): - """ - Returns a (common name, [subject alternative names]) tuple. - """ - r = re.search(CNRE, txt) - if r: - cn = r.group(1) - else: - return None - - r = re.search(SANRE, txt) - san = [] - if r: - for i in r.group(1).split(","): - i = i.strip() - k, v = i.split(":") - if k == "DNS": - san.append(v) - else: - san = [] - return (cn, san) - diff --git a/test/test_certutils.py b/test/test_certutils.py new file mode 100644 index 00000000..15e81f74 --- /dev/null +++ b/test/test_certutils.py @@ -0,0 +1,69 @@ +import os +import libpry +from libmproxy import certutils + + +class udummy_ca(libpry.AutoTree): + def test_all(self): + d = self.tmpdir() + path = os.path.join(d, "foo/cert.cnf") + assert certutils.dummy_ca(path) + assert os.path.exists(path) + + path = os.path.join(d, "foo/cert2.pem") + assert certutils.dummy_ca(path) + assert os.path.exists(path) + assert os.path.exists(os.path.join(d, "foo/cert2-cert.pem")) + assert os.path.exists(os.path.join(d, "foo/cert2-cert.p12")) + + +class udummy_cert(libpry.AutoTree): + def test_with_ca(self): + d = self.tmpdir() + cacert = os.path.join(d, "foo/cert.cnf") + assert certutils.dummy_ca(cacert) + p = certutils.dummy_cert( + os.path.join(d, "foo"), + cacert, + "foo.com", + ["one.com", "two.com", "*.three.com"] + ) + assert os.path.exists(p) + + # Short-circuit + assert certutils.dummy_cert( + os.path.join(d, "foo"), + cacert, + "foo.com", + [] + ) + + def test_no_ca(self): + d = self.tmpdir() + p = certutils.dummy_cert( + d, + None, + "foo.com", + [] + ) + assert os.path.exists(p) + + +class uparse_text_cert(libpry.AutoTree): + def test_simple(self): + c = file("data/text_cert", "r").read() + cn, san = certutils.parse_text_cert(c) + assert cn == "google.com" + assert len(san) == 436 + + c = file("data/text_cert_2", "r").read() + cn, san = certutils.parse_text_cert(c) + assert cn == "www.inode.co.nz" + assert len(san) == 2 + + +tests = [ + uparse_text_cert(), + udummy_ca(), + udummy_cert(), +] diff --git a/test/test_utils.py b/test/test_utils.py index 79ddf53d..17c677a7 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -127,52 +127,6 @@ class u_urldecode(libpry.AutoTree): assert len(utils.urldecode(s)) == 2 -class udummy_ca(libpry.AutoTree): - def test_all(self): - d = self.tmpdir() - path = os.path.join(d, "foo/cert.cnf") - assert utils.dummy_ca(path) - assert os.path.exists(path) - - path = os.path.join(d, "foo/cert2.pem") - assert utils.dummy_ca(path) - assert os.path.exists(path) - assert os.path.exists(os.path.join(d, "foo/cert2-cert.pem")) - assert os.path.exists(os.path.join(d, "foo/cert2-cert.p12")) - - -class udummy_cert(libpry.AutoTree): - def test_with_ca(self): - d = self.tmpdir() - cacert = os.path.join(d, "foo/cert.cnf") - assert utils.dummy_ca(cacert) - p = utils.dummy_cert( - os.path.join(d, "foo"), - cacert, - "foo.com", - ["one.com", "two.com", "*.three.com"] - ) - assert os.path.exists(p) - - # Short-circuit - assert utils.dummy_cert( - os.path.join(d, "foo"), - cacert, - "foo.com", - [] - ) - - def test_no_ca(self): - d = self.tmpdir() - p = utils.dummy_cert( - d, - None, - "foo.com", - [] - ) - assert os.path.exists(p) - - class uLRUCache(libpry.AutoTree): def test_one(self): class Foo: @@ -259,22 +213,7 @@ class u_parse_size(libpry.AutoTree): libpry.raises(ValueError, utils.parse_size, "ak") -class uparse_text_cert(libpry.AutoTree): - def test_simple(self): - c = file("data/text_cert", "r").read() - cn, san = utils.parse_text_cert(c) - assert cn == "google.com" - assert len(san) == 436 - - c = file("data/text_cert_2", "r").read() - cn, san = utils.parse_text_cert(c) - assert cn == "www.inode.co.nz" - assert len(san) == 2 - - - tests = [ - uparse_text_cert(), uformat_timestamp(), uisBin(), uisXML(), @@ -285,8 +224,6 @@ tests = [ upretty_json(), u_urldecode(), udel_all(), - udummy_ca(), - udummy_cert(), uLRUCache(), u_parse_url(), u_parse_proxy_spec(), |