From dc57040a59ce4211a6ab6db843c497a87a825509 Mon Sep 17 00:00:00 2001 From: Erik Trauschke Date: Thu, 24 Sep 2015 20:24:28 -0700 Subject: OpenSSL backend code for CRLs --- src/cryptography/hazmat/backends/multibackend.py | 18 ++ .../hazmat/backends/openssl/backend.py | 26 ++- src/cryptography/hazmat/backends/openssl/x509.py | 258 +++++++++++++++++++++ src/cryptography/x509/__init__.py | 6 +- src/cryptography/x509/base.py | 8 + 5 files changed, 312 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/cryptography/hazmat/backends/multibackend.py b/src/cryptography/hazmat/backends/multibackend.py index 9db32aa5..cda33145 100644 --- a/src/cryptography/hazmat/backends/multibackend.py +++ b/src/cryptography/hazmat/backends/multibackend.py @@ -325,6 +325,24 @@ class MultiBackend(object): _Reasons.UNSUPPORTED_X509 ) + def load_pem_x509_crl(self, data): + for b in self._filtered_backends(X509Backend): + return b.load_pem_x509_crl(data) + + raise UnsupportedAlgorithm( + "This backend does not support X.509.", + _Reasons.UNSUPPORTED_X509 + ) + + def load_der_x509_crl(self, data): + for b in self._filtered_backends(X509Backend): + return b.load_der_x509_crl(data) + + raise UnsupportedAlgorithm( + "This backend does not support X.509.", + _Reasons.UNSUPPORTED_X509 + ) + def load_der_x509_csr(self, data): for b in self._filtered_backends(X509Backend): return b.load_der_x509_csr(data) diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index c74d90d4..8503b9b9 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -39,8 +39,8 @@ from cryptography.hazmat.backends.openssl.rsa import ( _RSAPrivateKey, _RSAPublicKey ) from cryptography.hazmat.backends.openssl.x509 import ( - _Certificate, _CertificateSigningRequest, _DISTPOINT_TYPE_FULLNAME, - _DISTPOINT_TYPE_RELATIVENAME + _Certificate, _CertificateRevocationList, _CertificateSigningRequest, + _DISTPOINT_TYPE_FULLNAME, _DISTPOINT_TYPE_RELATIVENAME ) from cryptography.hazmat.bindings.openssl.binding import Binding from cryptography.hazmat.primitives import hashes, serialization @@ -1474,6 +1474,28 @@ class Backend(object): x509 = self._ffi.gc(x509, self._lib.X509_free) return _Certificate(self, x509) + def load_pem_x509_crl(self, data): + mem_bio = self._bytes_to_bio(data) + x509_crl = self._lib.PEM_read_bio_X509_CRL( + mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL + ) + if x509_crl == self._ffi.NULL: + self._consume_errors() + raise ValueError("Unable to load CRL") + + x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free) + return _CertificateRevocationList(self, x509_crl) + + def load_der_x509_crl(self, data): + mem_bio = self._bytes_to_bio(data) + x509_crl = self._lib.d2i_X509_CRL_bio(mem_bio.bio, self._ffi.NULL) + if x509_crl == self._ffi.NULL: + self._consume_errors() + raise ValueError("Unable to load CRL") + + x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free) + return _CertificateRevocationList(self, x509_crl) + def load_pem_x509_csr(self, data): mem_bio = self._bytes_to_bio(data) x509_req = self._lib.PEM_read_bio_X509_REQ( diff --git a/src/cryptography/hazmat/backends/openssl/x509.py b/src/cryptography/hazmat/backends/openssl/x509.py index 70cec163..79b516cc 100644 --- a/src/cryptography/hazmat/backends/openssl/x509.py +++ b/src/cryptography/hazmat/backends/openssl/x509.py @@ -4,7 +4,9 @@ from __future__ import absolute_import, division, print_function +import datetime import ipaddress + from email.utils import parseaddr import idna @@ -637,6 +639,262 @@ def _decode_inhibit_any_policy(backend, asn1_int): return x509.InhibitAnyPolicy(skip_certs) +@utils.register_interface(x509.RevokedCertificate) +class _RevokedCertificate(object): + def __init__(self, backend, x509_revoked): + self._backend = backend + self._x509_revoked = x509_revoked + + self.__serial_number = None + self.__revocation_date = None + self.__extensions = None + + @property + def serial_number(self): + if self.__serial_number: + return self.__serial_number + + bn = self._backend._lib.ASN1_INTEGER_to_BN( + self._x509_revoked.serialNumber, self._backend._ffi.NULL + ) + assert bn != self._backend._ffi.NULL + bn = self._backend._ffi.gc(bn, self._backend._lib.BN_free) + self.__serial_number = self._backend._bn_to_int(bn) + return self.__serial_number + + @property + def revocation_date(self): + if self.__revocation_date: + return self.__revocation_date + + self.__revocation_date = self._backend._parse_asn1_time( + self._x509_revoked.revocationDate) + return self.__revocation_date + + @property + def extensions(self): + if self.__extensions: + return self.__extensions + + extensions = [] + seen_oids = set() + extcount = self._backend._lib.X509_REVOKED_get_ext_count( + self._x509_revoked) + for i in range(0, extcount): + ext = self._backend._lib.X509_REVOKED_get_ext( + self._x509_revoked, i) + assert ext != self._backend._ffi.NULL + crit = self._backend._lib.X509_EXTENSION_get_critical(ext) + critical = crit == 1 + oid = x509.ObjectIdentifier(_obj2txt(self._backend, ext.object)) + if oid in seen_oids: + raise x509.DuplicateExtension( + "Duplicate {0} extension found".format(oid), oid + ) + + if oid == x509.OID_CRL_REASON: + value = self._build_crl_reason(ext) + elif oid == x509.OID_INVALIDITY_DATE: + value = self._build_invalidity_date(ext) + elif oid == x509.OID_CERTIFICATE_ISSUER and \ + self._backend._lib.OPENSSL_VERSION_NUMBER >= 0x10000000: + value = self._build_cert_issuer(ext) + elif critical: + raise x509.UnsupportedExtension( + "{0} is not currently supported".format(oid), oid + ) + else: + # Unsupported non-critical extension, silently skipping for now + seen_oids.add(oid) + continue + + seen_oids.add(oid) + extensions.append(x509.Extension(oid, critical, value)) + + self.__extensions = x509.Extensions(extensions) + return self.__extensions + + def get_reason(self): + """ + Returns the CRLReason extension if it exists. + """ + try: + return self.extensions.get_extension_for_oid( + x509.OID_CRL_REASON).value + except x509.ExtensionNotFound: + return None + + def get_invalidity_date(self): + """ + Returns the InvalidityDate extension if it exists. + """ + try: + return self.extensions.get_extension_for_oid( + x509.OID_INVALIDITY_DATE).value + except x509.ExtensionNotFound: + return None + + def get_certificate_issuer(self): + """ + Returns the CertificateIssuer extension if it exists. + """ + try: + return self.extensions.get_extension_for_oid( + x509.OID_CERTIFICATE_ISSUER).value + except x509.ExtensionNotFound: + return None + + def _build_crl_reason(self, ext): + enum = self._backend._lib.X509V3_EXT_d2i(ext) + assert enum != self._backend._ffi.NULL + enum = self._backend._ffi.cast("ASN1_ENUMERATED *", enum) + enum = self._backend._ffi.gc( + enum, self._backend._lib.ASN1_ENUMERATED_free + ) + code = self._backend._lib.ASN1_ENUMERATED_get(enum) + if code == 0: + return x509.ReasonFlags.unspecified + elif code == 1: + return x509.ReasonFlags.key_compromise + elif code == 2: + return x509.ReasonFlags.ca_compromise + elif code == 3: + return x509.ReasonFlags.affiliation_changed + elif code == 4: + return x509.ReasonFlags.superseded + elif code == 5: + return x509.ReasonFlags.cessation_of_operation + elif code == 6: + return x509.ReasonFlags.certificate_hold + elif code == 8: + return x509.ReasonFlags.remove_from_crl + elif code == 9: + return x509.ReasonFlags.privilege_withdrawn + elif code == 10: + return x509.ReasonFlags.aa_compromise + else: + raise ValueError("Unsupported reason code: {0}".format(code)) + + def _build_invalidity_date(self, ext): + generalized_time = self._backend._ffi.cast( + "ASN1_GENERALIZEDTIME *", + self._backend._lib.X509V3_EXT_d2i(ext) + ) + assert generalized_time != self._backend._ffi.NULL + generalized_time = self._backend._ffi.gc( + generalized_time, self._backend._lib.ASN1_GENERALIZEDTIME_free + ) + time = self._backend._ffi.string( + self._backend._lib.ASN1_STRING_data( + self._backend._ffi.cast("ASN1_STRING *", generalized_time) + ) + ).decode("ascii") + return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ") + + def _build_cert_issuer(self, ext): + gns = self._backend._ffi.cast( + "GENERAL_NAMES *", self._backend._lib.X509V3_EXT_d2i(ext) + ) + assert gns != self._backend._ffi.NULL + gns = self._backend._ffi.gc(gns, self._backend._lib.GENERAL_NAMES_free) + return x509.GeneralNames(_decode_general_names(self._backend, gns)) + + +@utils.register_interface(x509.CertificateRevocationList) +class _CertificateRevocationList(object): + def __init__(self, backend, x509_crl): + self._backend = backend + self._x509_crl = x509_crl + + self.__revoked = None + self.__issuer = None + self.__next_update = None + self.__last_update = None + + def __eq__(self, other): + if not isinstance(other, x509.CertificateRevocationList): + return NotImplemented + + res = self._backend._lib.X509_CRL_cmp(self._x509_crl, other._x509_crl) + return res == 0 + + def __ne__(self, other): + return not self == other + + def fingerprint(self, algorithm): + h = hashes.Hash(algorithm, self._backend) + bio = self._backend._create_mem_bio() + res = self._backend._lib.i2d_X509_CRL_bio( + bio, self._x509_crl + ) + assert res == 1 + der = self._backend._read_mem_bio(bio) + h.update(der) + return h.finalize() + + @property + def signature_hash_algorithm(self): + oid = _obj2txt(self._backend, self._x509_crl.sig_alg.algorithm) + try: + return x509._SIG_OIDS_TO_HASH[oid] + except KeyError: + raise UnsupportedAlgorithm( + "Signature algorithm OID:{0} not recognized".format(oid) + ) + + @property + def issuer(self): + if self.__issuer: + return self.__issuer + + issuer = self._backend._lib.X509_CRL_get_issuer(self._x509_crl) + assert issuer != self._backend._ffi.NULL + self.__issuer = _decode_x509_name(self._backend, issuer) + return self.__issuer + + @property + def next_update(self): + if self.__next_update: + return self.__next_update + + nu = self._backend._lib.X509_CRL_get_nextUpdate(self._x509_crl) + assert nu != self._backend._ffi.NULL + self.__next_update = self._backend._parse_asn1_time(nu) + return self.__next_update + + @property + def last_update(self): + if self.__last_update: + return self.__last_update + + lu = self._backend._lib.X509_CRL_get_lastUpdate(self._x509_crl) + assert lu != self._backend._ffi.NULL + self.__last_update = self._backend._parse_asn1_time(lu) + return self.__last_update + + @property + def revoked_certificates(self): + if self.__revoked: + return self.__revoked + + revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl) + assert revoked != self._backend._ffi.NULL + + num = self._backend._lib.sk_X509_REVOKED_num(revoked) + revoked_list = [] + for i in range(num): + r = self._backend._lib.sk_X509_REVOKED_value(revoked, i) + assert r != self._backend._ffi.NULL + revoked_list.append(_RevokedCertificate(self._backend, r)) + + self.__revoked = revoked_list + return self.__revoked + + @property + def extensions(self): + raise NotImplementedError() + + @utils.register_interface(x509.CertificateSigningRequest) class _CertificateSigningRequest(object): def __init__(self, backend, x509_req): diff --git a/src/cryptography/x509/__init__.py b/src/cryptography/x509/__init__.py index 1aa2598b..70e1d3da 100644 --- a/src/cryptography/x509/__init__.py +++ b/src/cryptography/x509/__init__.py @@ -8,8 +8,8 @@ from cryptography.x509.base import ( Certificate, CertificateBuilder, CertificateRevocationList, CertificateSigningRequest, CertificateSigningRequestBuilder, InvalidVersion, RevokedCertificate, - Version, load_der_x509_certificate, load_der_x509_csr, - load_pem_x509_certificate, load_pem_x509_csr, + Version, load_der_x509_certificate, load_der_x509_crl, load_der_x509_csr, + load_pem_x509_certificate, load_pem_x509_crl, load_pem_x509_csr, ) from cryptography.x509.extensions import ( AccessDescription, AuthorityInformationAccess, @@ -108,6 +108,8 @@ __all__ = [ "load_der_x509_certificate", "load_pem_x509_csr", "load_der_x509_csr", + "load_pem_x509_crl", + "load_der_x509_crl", "InvalidVersion", "DuplicateExtension", "UnsupportedExtension", diff --git a/src/cryptography/x509/base.py b/src/cryptography/x509/base.py index 27eafac6..9dc49a60 100644 --- a/src/cryptography/x509/base.py +++ b/src/cryptography/x509/base.py @@ -40,6 +40,14 @@ def load_der_x509_csr(data, backend): return backend.load_der_x509_csr(data) +def load_pem_x509_crl(data, backend): + return backend.load_pem_x509_crl(data) + + +def load_der_x509_crl(data, backend): + return backend.load_der_x509_crl(data) + + class InvalidVersion(Exception): def __init__(self, msg, parsed_version): super(InvalidVersion, self).__init__(msg) -- cgit v1.2.3