diff options
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/x509.py | 154 |
1 files changed, 75 insertions, 79 deletions
diff --git a/src/cryptography/hazmat/backends/openssl/x509.py b/src/cryptography/hazmat/backends/openssl/x509.py index 8e361fa2..b7693bc1 100644 --- a/src/cryptography/hazmat/backends/openssl/x509.py +++ b/src/cryptography/hazmat/backends/openssl/x509.py @@ -163,6 +163,44 @@ def _decode_general_name(backend, gn): ) +def _decode_ocsp_no_check(backend, ext): + return x509.OCSPNoCheck() + + +class _X509ExtensionParser(object): + def __init__(self, ext_count, get_ext, handlers): + self.ext_count = ext_count + self.get_ext = get_ext + self.handlers = handlers + + def parse(self, backend, x509_obj): + extensions = [] + seen_oids = set() + for i in range(self.ext_count(x509_obj)): + ext = self.get_ext(x509_obj, i) + assert ext != backend._ffi.NULL + crit = backend._lib.X509_EXTENSION_get_critical(ext) + critical = crit == 1 + oid = x509.ObjectIdentifier(_obj2txt(backend, ext.object)) + if oid in seen_oids: + raise x509.DuplicateExtension( + "Duplicate {0} extension found".format(oid), oid + ) + for handler_oid, f in self.handlers: + if handler_oid == oid: + value = f(backend, ext) + extensions.append(x509.Extension(oid, critical, value)) + break + else: + if critical: + raise x509.UnsupportedExtension( + "{0} is not currently supported".format(oid), oid + ) + seen_oids.add(oid) + + return x509.Extensions(extensions) + + @utils.register_interface(x509.Certificate) class _Certificate(object): def __init__(self, backend, x509): @@ -268,58 +306,36 @@ class _Certificate(object): @property def extensions(self): - extensions = [] - seen_oids = set() - extcount = self._backend._lib.X509_get_ext_count(self._x509) - for i in range(0, extcount): - ext = self._backend._lib.X509_get_ext(self._x509, i) - assert ext != self._backend._ffi.NULL - crit = self._backend._lib.X509_EXTENSION_get_critical(ext) - critical = crit == 1 - oid = x509.ObjectIdentifier(_obj2txt(self._backend, ext.object)) - if oid in seen_oids: - raise x509.DuplicateExtension( - "Duplicate {0} extension found".format(oid), oid - ) - elif oid == x509.OID_BASIC_CONSTRAINTS: - value = _decode_basic_constraints(self._backend, ext) - elif oid == x509.OID_SUBJECT_KEY_IDENTIFIER: - value = _decode_subject_key_identifier(self._backend, ext) - elif oid == x509.OID_KEY_USAGE: - value = _decode_key_usage(self._backend, ext) - elif oid == x509.OID_SUBJECT_ALTERNATIVE_NAME: - value = _decode_subject_alt_name(self._backend, ext) - elif oid == x509.OID_EXTENDED_KEY_USAGE: - value = _decode_extended_key_usage(self._backend, ext) - elif oid == x509.OID_AUTHORITY_KEY_IDENTIFIER: - value = _decode_authority_key_identifier(self._backend, ext) - elif oid == x509.OID_AUTHORITY_INFORMATION_ACCESS: - value = _decode_authority_information_access( - self._backend, ext - ) - elif oid == x509.OID_CERTIFICATE_POLICIES: - value = _decode_certificate_policies(self._backend, ext) - elif oid == x509.OID_CRL_DISTRIBUTION_POINTS: - value = _decode_crl_distribution_points(self._backend, ext) - elif oid == x509.OID_OCSP_NO_CHECK: - value = x509.OCSPNoCheck() - elif oid == x509.OID_INHIBIT_ANY_POLICY: - value = _decode_inhibit_any_policy(self._backend, ext) - elif oid == x509.OID_ISSUER_ALTERNATIVE_NAME: - value = _decode_issuer_alt_name(self._backend, ext) - elif critical: - raise x509.UnsupportedExtension( - "{0} is not currently supported".format(oid), oid - ) - else: - # Unsupported non-critical extension, silently skipping for now - seen_oids.add(oid) - continue - - seen_oids.add(oid) - extensions.append(x509.Extension(oid, critical, value)) - - return x509.Extensions(extensions) + return _X509ExtensionParser( + ext_count=self._backend._lib.X509_get_ext_count, + get_ext=self._backend._lib.X509_get_ext, + handlers=[ + (x509.OID_BASIC_CONSTRAINTS, _decode_basic_constraints), + ( + x509.OID_SUBJECT_KEY_IDENTIFIER, + _decode_subject_key_identifier + ), + (x509.OID_KEY_USAGE, _decode_key_usage), + (x509.OID_SUBJECT_ALTERNATIVE_NAME, _decode_subject_alt_name), + (x509.OID_EXTENDED_KEY_USAGE, _decode_extended_key_usage), + ( + x509.OID_AUTHORITY_KEY_IDENTIFIER, + _decode_authority_key_identifier + ), + ( + x509.OID_AUTHORITY_INFORMATION_ACCESS, + _decode_authority_information_access + ), + (x509.OID_CERTIFICATE_POLICIES, _decode_certificate_policies), + ( + x509.OID_CRL_DISTRIBUTION_POINTS, + _decode_crl_distribution_points + ), + (x509.OID_OCSP_NO_CHECK, _decode_ocsp_no_check), + (x509.OID_INHIBIT_ANY_POLICY, _decode_inhibit_any_policy), + (x509.OID_ISSUER_ALTERNATIVE_NAME, _decode_issuer_alt_name), + ] + ).parse(self._backend, self._x509) def public_bytes(self, encoding): bio = self._backend._create_mem_bio() @@ -704,35 +720,15 @@ class _CertificateSigningRequest(object): @property def extensions(self): - extensions = [] - seen_oids = set() x509_exts = self._backend._lib.X509_REQ_get_extensions(self._x509_req) - extcount = self._backend._lib.sk_X509_EXTENSION_num(x509_exts) - for i in range(0, extcount): - ext = self._backend._lib.sk_X509_EXTENSION_value(x509_exts, i) - assert ext != self._backend._ffi.NULL - crit = self._backend._lib.X509_EXTENSION_get_critical(ext) - critical = crit == 1 - oid = x509.ObjectIdentifier(_obj2txt(self._backend, ext.object)) - if oid in seen_oids: - raise x509.DuplicateExtension( - "Duplicate {0} extension found".format(oid), oid - ) - elif oid == x509.OID_BASIC_CONSTRAINTS: - value = _decode_basic_constraints(self._backend, ext) - elif critical: - raise x509.UnsupportedExtension( - "{0} is not currently supported".format(oid), oid - ) - else: - # Unsupported non-critical extension, silently skipping for now - seen_oids.add(oid) - continue - seen_oids.add(oid) - extensions.append(x509.Extension(oid, critical, value)) - - return x509.Extensions(extensions) + return _X509ExtensionParser( + ext_count=self._backend._lib.sk_X509_EXTENSION_num, + get_ext=self._backend._lib.sk_X509_EXTENSION_value, + handlers=[ + (x509.OID_BASIC_CONSTRAINTS, _decode_basic_constraints), + ] + ).parse(self._backend, x509_exts) def public_bytes(self, encoding): bio = self._backend._create_mem_bio() |