aboutsummaryrefslogtreecommitdiffstats
path: root/netlib/certutils.py
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@nullcube.com>2014-03-04 14:12:58 +1300
committerAldo Cortesi <aldo@nullcube.com>2014-03-04 14:12:58 +1300
commit7c82418e0baca311487230074655f5f106bcdd2b (patch)
tree35bee76de49c755f58637e78914f9882dbab789b /netlib/certutils.py
parentd56f7fba806e6c2008c40df9b0b290b81189cb92 (diff)
downloadmitmproxy-7c82418e0baca311487230074655f5f106bcdd2b.tar.gz
mitmproxy-7c82418e0baca311487230074655f5f106bcdd2b.tar.bz2
mitmproxy-7c82418e0baca311487230074655f5f106bcdd2b.zip
Beef up CertStore, add DH params.
Diffstat (limited to 'netlib/certutils.py')
-rw-r--r--netlib/certutils.py157
1 files changed, 85 insertions, 72 deletions
diff --git a/netlib/certutils.py b/netlib/certutils.py
index 0b29d52f..b9c291d0 100644
--- a/netlib/certutils.py
+++ b/netlib/certutils.py
@@ -5,23 +5,27 @@ from pyasn1.error import PyAsn1Error
import OpenSSL
import tcp
-default_exp = 62208000 # =24 * 60 * 60 * 720
-default_o = "mitmproxy"
-default_cn = "mitmproxy"
-
-def create_ca(o=default_o, cn=default_cn, exp=default_exp):
+DEFAULT_EXP = 62208000 # =24 * 60 * 60 * 720
+# Generated with "openssl dhparam". It's too slow to generate this on startup.
+DEFAULT_DHPARAM = """-----BEGIN DH PARAMETERS-----
+MIGHAoGBAOdPzMbYgoYfO3YBYauCLRlE8X1XypTiAjoeCFD0qWRx8YUsZ6Sj20W5
+zsfQxlZfKovo3f2MftjkDkbI/C/tDgxoe0ZPbjy5CjdOhkzxn0oTbKTs16Rw8DyK
+1LjTR65sQJkJEdgsX8TSi/cicCftJZl9CaZEaObF2bdgSgGK+PezAgEC
+-----END DH PARAMETERS-----"""
+
+def create_ca(o, cn, exp):
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 1024)
- ca = OpenSSL.crypto.X509()
- ca.set_serial_number(int(time.time()*10000))
- ca.set_version(2)
- ca.get_subject().CN = cn
- ca.get_subject().O = o
- ca.gmtime_adj_notBefore(0)
- ca.gmtime_adj_notAfter(exp)
- ca.set_issuer(ca.get_subject())
- ca.set_pubkey(key)
- ca.add_extensions([
+ cert = OpenSSL.crypto.X509()
+ cert.set_serial_number(int(time.time()*10000))
+ cert.set_version(2)
+ cert.get_subject().CN = cn
+ cert.get_subject().O = o
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(exp)
+ cert.set_issuer(cert.get_subject())
+ cert.set_pubkey(key)
+ cert.add_extensions([
OpenSSL.crypto.X509Extension("basicConstraints", True,
"CA:TRUE"),
OpenSSL.crypto.X509Extension("nsCertType", True,
@@ -32,80 +36,39 @@ def create_ca(o=default_o, cn=default_cn, exp=default_exp):
OpenSSL.crypto.X509Extension("keyUsage", False,
"keyCertSign, cRLSign"),
OpenSSL.crypto.X509Extension("subjectKeyIdentifier", False, "hash",
- subject=ca),
+ subject=cert),
])
- ca.sign(key, "sha1")
- return key, ca
-
-
-def dummy_ca(path, o=default_o, cn=default_cn, exp=default_exp):
- dirname = os.path.dirname(path)
- if not os.path.exists(dirname):
- os.makedirs(dirname)
- if path.endswith(".pem"):
- basename, _ = os.path.splitext(path)
- basename = os.path.basename(basename)
- else:
- basename = os.path.basename(path)
-
- key, ca = create_ca(o=o, cn=cn, exp=exp)
-
- # Dump the CA plus private key
- f = open(path, "wb")
- f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key))
- f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
- f.close()
-
- # Dump the certificate in PEM format
- f = open(os.path.join(dirname, basename + "-cert.pem"), "wb")
- f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
- f.close()
-
- # Create a .cer file with the same contents for Android
- f = open(os.path.join(dirname, basename + "-cert.cer"), "wb")
- f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
- f.close()
-
- # Dump the certificate in PKCS12 format for Windows devices
- f = open(os.path.join(dirname, basename + "-cert.p12"), "wb")
- p12 = OpenSSL.crypto.PKCS12()
- p12.set_certificate(ca)
- p12.set_privatekey(key)
- f.write(p12.export())
- f.close()
- return True
-
-
-def dummy_cert(ca, commonname, sans):
+ cert.sign(key, "sha1")
+ return key, cert
+
+
+def dummy_cert(pkey, cacert, commonname, sans):
"""
- Generates and writes a certificate to fp.
+ Generates a dummy certificate.
- ca: Path to the certificate authority file, or None.
+ pkey: CA private key
+ cacert: CA certificate
commonname: Common name for the generated certificate.
sans: A list of Subject Alternate Names.
- Returns cert path if operation succeeded, None if not.
+ Returns cert if operation succeeded, None if not.
"""
ss = []
for i in sans:
ss.append("DNS: %s"%i)
ss = ", ".join(ss)
- raw = file(ca, "rb").read()
- ca = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, raw)
- key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, raw)
-
cert = OpenSSL.crypto.X509()
cert.gmtime_adj_notBefore(-3600*48)
cert.gmtime_adj_notAfter(60 * 60 * 24 * 30)
- cert.set_issuer(ca.get_subject())
+ cert.set_issuer(cacert.get_subject())
cert.get_subject().CN = commonname
cert.set_serial_number(int(time.time()*10000))
if ss:
cert.set_version(2)
cert.add_extensions([OpenSSL.crypto.X509Extension("subjectAltName", True, ss)])
- cert.set_pubkey(ca.get_pubkey())
- cert.sign(key, "sha1")
+ cert.set_pubkey(cacert.get_pubkey())
+ cert.sign(pkey, "sha1")
return SSLCert(cert)
@@ -113,9 +76,59 @@ class CertStore:
"""
Implements an in-memory certificate store.
"""
- def __init__(self, cacert):
+ def __init__(self, pkey, cert):
+ self.pkey, self.cert = pkey, cert
self.certs = {}
- self.cacert = cacert
+
+ @classmethod
+ def from_store(klass, path, basename):
+ p = os.path.join(path, basename + "-ca.pem")
+ if not os.path.exists(p):
+ key, ca = klass.create_store(path, basename)
+ else:
+ p = os.path.join(path, basename + "-ca.pem")
+ raw = file(p, "rb").read()
+ ca = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, raw)
+ key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, raw)
+ return klass(key, ca)
+
+ @classmethod
+ def create_store(klass, path, basename, o=None, cn=None, expiry=DEFAULT_EXP):
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+ o = o or basename
+ cn = cn or basename
+
+ key, ca = create_ca(o=o, cn=cn, exp=expiry)
+ # Dump the CA plus private key
+ f = open(os.path.join(path, basename + "-ca.pem"), "wb")
+ f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key))
+ f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
+ f.close()
+
+ # Dump the certificate in PEM format
+ f = open(os.path.join(path, basename + "-cert.pem"), "wb")
+ f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
+ f.close()
+
+ # Create a .cer file with the same contents for Android
+ f = open(os.path.join(path, basename + "-cert.cer"), "wb")
+ f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
+ f.close()
+
+ # Dump the certificate in PKCS12 format for Windows devices
+ f = open(os.path.join(path, basename + "-cert.p12"), "wb")
+ p12 = OpenSSL.crypto.PKCS12()
+ p12.set_certificate(ca)
+ p12.set_privatekey(key)
+ f.write(p12.export())
+ f.close()
+
+ f = open(os.path.join(path, basename + "-dhparam.pem"), "wb")
+ f.write(DEFAULT_DHPARAM)
+ f.close()
+ return key, ca
def get_cert(self, commonname, sans):
"""
@@ -130,7 +143,7 @@ class CertStore:
"""
if commonname in self.certs:
return self.certs[commonname]
- c = dummy_cert(self.cacert, commonname, sans)
+ c = dummy_cert(self.pkey, self.cert, commonname, sans)
self.certs[commonname] = c
return c