diff options
author | Maximilian Hils <git@maximilianhils.com> | 2014-10-08 20:46:30 +0200 |
---|---|---|
committer | Maximilian Hils <git@maximilianhils.com> | 2014-10-08 20:46:30 +0200 |
commit | fdb6f5552d43d7ab02320ccd7e6d58750e33c4c4 (patch) | |
tree | c40f9386441f8dc9811b90d7d3176cde6303ea34 /netlib/certutils.py | |
parent | 274688172d62131ddf30cf67e6c084e0e928d4bf (diff) | |
download | mitmproxy-fdb6f5552d43d7ab02320ccd7e6d58750e33c4c4.tar.gz mitmproxy-fdb6f5552d43d7ab02320ccd7e6d58750e33c4c4.tar.bz2 mitmproxy-fdb6f5552d43d7ab02320ccd7e6d58750e33c4c4.zip |
CertStore: add support for cert chains
Diffstat (limited to 'netlib/certutils.py')
-rw-r--r-- | netlib/certutils.py | 70 |
1 files changed, 41 insertions, 29 deletions
diff --git a/netlib/certutils.py b/netlib/certutils.py index fe067ca1..c9e6df26 100644 --- a/netlib/certutils.py +++ b/netlib/certutils.py @@ -113,13 +113,21 @@ def dummy_cert(privkey, cacert, commonname, sans): # return current.value +class CertStoreEntry(object): + def __init__(self, cert, pkey=None, chain_file=None): + self.cert = cert + self.pkey = pkey + self.chain_file = chain_file + class CertStore: """ Implements an in-memory certificate store. """ - def __init__(self, privkey, cacert, dhparams=None): - self.privkey, self.cacert = privkey, cacert + def __init__(self, default_pkey, default_ca, default_chain_file, dhparams=None): + self.default_pkey = default_pkey + self.default_ca = default_ca + self.default_chain_file = default_chain_file self.dhparams = dhparams self.certs = dict() @@ -142,21 +150,21 @@ class CertStore: return dh @classmethod - def from_store(klass, path, basename): - p = os.path.join(path, basename + "-ca.pem") - if not os.path.exists(p): - key, ca = klass.create_store(path, basename) + def from_store(cls, path, basename): + ca_path = os.path.join(path, basename + "-ca.pem") + if not os.path.exists(ca_path): + key, ca = cls.create_store(path, basename) else: - p = os.path.join(path, basename + "-ca.pem") - raw = file(p, "rb").read() + with open(ca_path, "rb") as f: + raw = f.read() ca = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, raw) key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, raw) - dhp = os.path.join(path, basename + "-dhparam.pem") - dh = klass.load_dhparam(dhp) - return klass(key, ca, dh) + dh_path = os.path.join(path, basename + "-dhparam.pem") + dh = cls.load_dhparam(dh_path) + return cls(key, ca, ca_path, dh) @classmethod - def create_store(klass, path, basename, o=None, cn=None, expiry=DEFAULT_EXP): + def create_store(cls, path, basename, o=None, cn=None, expiry=DEFAULT_EXP): if not os.path.exists(path): os.makedirs(path) @@ -194,25 +202,29 @@ class CertStore: return key, ca def add_cert_file(self, spec, path): - raw = file(path, "rb").read() - cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, raw) + with open(path, "rb") as f: + raw = f.read() + cert = SSLCert(OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, raw)) try: - privkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, raw) + pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, raw) except Exception: - privkey = None - self.add_cert(SSLCert(cert), privkey, spec) + pkey = None + self.add_cert( + CertStoreEntry(cert, pkey, path), + spec + ) - def add_cert(self, cert, privkey, *names): + def add_cert(self, entry, *names): """ Adds a cert to the certstore. We register the CN in the cert plus any SANs, and also the list of names provided as an argument. """ - if cert.cn: - self.certs[cert.cn] = (cert, privkey) - for i in cert.altnames: - self.certs[i] = (cert, privkey) + if entry.cert.cn: + self.certs[entry.cert.cn] = entry + for i in entry.cert.altnames: + self.certs[i] = entry for i in names: - self.certs[i] = (cert, privkey) + self.certs[i] = entry @staticmethod def asterisk_forms(dn): @@ -246,17 +258,17 @@ class CertStore: name = next(itertools.ifilter(lambda key: key in self.certs, potential_keys), None) if name: - c = self.certs[name] + entry = self.certs[name] else: - c = dummy_cert(self.privkey, self.cacert, commonname, sans), None - self.certs[(commonname, tuple(sans))] = c + entry = CertStoreEntry(cert=dummy_cert(self.default_pkey, self.default_ca, commonname, sans)) + self.certs[(commonname, tuple(sans))] = entry - return c[0], (c[1] or self.privkey) + return entry.cert, (entry.pkey or self.default_pkey), (entry.chain_file or self.default_chain_file) def gen_pkey(self, cert): from . import certffi - certffi.set_flags(self.privkey, 1) - return self.privkey + certffi.set_flags(self.default_pkey, 1) + return self.default_pkey class _GeneralName(univ.Choice): |