aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorErik Trauschke <erik.trauschke@gmail.com>2015-09-28 08:54:36 -0700
committerErik Trauschke <erik.trauschke@gmail.com>2015-09-28 08:54:36 -0700
commite064f0236cd1a93a7ad434ea8dccb5b476dda90e (patch)
tree393c8a3a9cd2741c89693e1455c303b3a27fb836 /src
parent6efcd60f319920e5b4eae971c1ddf704e56d866d (diff)
parentcd33bcc4b11fef67806dbe387dc1246e9a211797 (diff)
downloadcryptography-e064f0236cd1a93a7ad434ea8dccb5b476dda90e.tar.gz
cryptography-e064f0236cd1a93a7ad434ea8dccb5b476dda90e.tar.bz2
cryptography-e064f0236cd1a93a7ad434ea8dccb5b476dda90e.zip
Merge branch 'crl_ossl_backend' of github.com:etrauschke/cryptography into crl_ossl_backend
Diffstat (limited to 'src')
-rw-r--r--src/cryptography/hazmat/backends/multibackend.py18
-rw-r--r--src/cryptography/hazmat/backends/openssl/backend.py26
-rw-r--r--src/cryptography/hazmat/backends/openssl/x509.py258
-rw-r--r--src/cryptography/x509/__init__.py6
-rw-r--r--src/cryptography/x509/base.py8
5 files changed, 312 insertions, 4 deletions
diff --git a/src/cryptography/hazmat/backends/multibackend.py b/src/cryptography/hazmat/backends/multibackend.py
index 9db32aa5..cda33145 100644
--- a/src/cryptography/hazmat/backends/multibackend.py
+++ b/src/cryptography/hazmat/backends/multibackend.py
@@ -325,6 +325,24 @@ class MultiBackend(object):
_Reasons.UNSUPPORTED_X509
)
+ def load_pem_x509_crl(self, data):
+ for b in self._filtered_backends(X509Backend):
+ return b.load_pem_x509_crl(data)
+
+ raise UnsupportedAlgorithm(
+ "This backend does not support X.509.",
+ _Reasons.UNSUPPORTED_X509
+ )
+
+ def load_der_x509_crl(self, data):
+ for b in self._filtered_backends(X509Backend):
+ return b.load_der_x509_crl(data)
+
+ raise UnsupportedAlgorithm(
+ "This backend does not support X.509.",
+ _Reasons.UNSUPPORTED_X509
+ )
+
def load_der_x509_csr(self, data):
for b in self._filtered_backends(X509Backend):
return b.load_der_x509_csr(data)
diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py
index a8f639d5..c4cac1a6 100644
--- a/src/cryptography/hazmat/backends/openssl/backend.py
+++ b/src/cryptography/hazmat/backends/openssl/backend.py
@@ -39,8 +39,8 @@ from cryptography.hazmat.backends.openssl.rsa import (
_RSAPrivateKey, _RSAPublicKey
)
from cryptography.hazmat.backends.openssl.x509 import (
- _Certificate, _CertificateSigningRequest, _DISTPOINT_TYPE_FULLNAME,
- _DISTPOINT_TYPE_RELATIVENAME
+ _Certificate, _CertificateRevocationList, _CertificateSigningRequest,
+ _DISTPOINT_TYPE_FULLNAME, _DISTPOINT_TYPE_RELATIVENAME
)
from cryptography.hazmat.bindings.openssl import binding
from cryptography.hazmat.primitives import hashes, serialization
@@ -1467,6 +1467,28 @@ class Backend(object):
x509 = self._ffi.gc(x509, self._lib.X509_free)
return _Certificate(self, x509)
+ def load_pem_x509_crl(self, data):
+ mem_bio = self._bytes_to_bio(data)
+ x509_crl = self._lib.PEM_read_bio_X509_CRL(
+ mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
+ )
+ if x509_crl == self._ffi.NULL:
+ self._consume_errors()
+ raise ValueError("Unable to load CRL")
+
+ x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
+ return _CertificateRevocationList(self, x509_crl)
+
+ def load_der_x509_crl(self, data):
+ mem_bio = self._bytes_to_bio(data)
+ x509_crl = self._lib.d2i_X509_CRL_bio(mem_bio.bio, self._ffi.NULL)
+ if x509_crl == self._ffi.NULL:
+ self._consume_errors()
+ raise ValueError("Unable to load CRL")
+
+ x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
+ return _CertificateRevocationList(self, x509_crl)
+
def load_pem_x509_csr(self, data):
mem_bio = self._bytes_to_bio(data)
x509_req = self._lib.PEM_read_bio_X509_REQ(
diff --git a/src/cryptography/hazmat/backends/openssl/x509.py b/src/cryptography/hazmat/backends/openssl/x509.py
index 80f32e29..8a13aa5d 100644
--- a/src/cryptography/hazmat/backends/openssl/x509.py
+++ b/src/cryptography/hazmat/backends/openssl/x509.py
@@ -4,7 +4,9 @@
from __future__ import absolute_import, division, print_function
+import datetime
import ipaddress
+
from email.utils import parseaddr
import idna
@@ -637,6 +639,262 @@ def _decode_inhibit_any_policy(backend, asn1_int):
return x509.InhibitAnyPolicy(skip_certs)
+@utils.register_interface(x509.RevokedCertificate)
+class _RevokedCertificate(object):
+ def __init__(self, backend, x509_revoked):
+ self._backend = backend
+ self._x509_revoked = x509_revoked
+
+ self.__serial_number = None
+ self.__revocation_date = None
+ self.__extensions = None
+
+ @property
+ def serial_number(self):
+ if self.__serial_number:
+ return self.__serial_number
+
+ bn = self._backend._lib.ASN1_INTEGER_to_BN(
+ self._x509_revoked.serialNumber, self._backend._ffi.NULL
+ )
+ assert bn != self._backend._ffi.NULL
+ bn = self._backend._ffi.gc(bn, self._backend._lib.BN_free)
+ self.__serial_number = self._backend._bn_to_int(bn)
+ return self.__serial_number
+
+ @property
+ def revocation_date(self):
+ if self.__revocation_date:
+ return self.__revocation_date
+
+ self.__revocation_date = self._backend._parse_asn1_time(
+ self._x509_revoked.revocationDate)
+ return self.__revocation_date
+
+ @property
+ def extensions(self):
+ if self.__extensions:
+ return self.__extensions
+
+ extensions = []
+ seen_oids = set()
+ extcount = self._backend._lib.X509_REVOKED_get_ext_count(
+ self._x509_revoked)
+ for i in range(0, extcount):
+ ext = self._backend._lib.X509_REVOKED_get_ext(
+ self._x509_revoked, i)
+ assert ext != self._backend._ffi.NULL
+ crit = self._backend._lib.X509_EXTENSION_get_critical(ext)
+ critical = crit == 1
+ oid = x509.ObjectIdentifier(_obj2txt(self._backend, ext.object))
+ if oid in seen_oids:
+ raise x509.DuplicateExtension(
+ "Duplicate {0} extension found".format(oid), oid
+ )
+
+ if oid == x509.OID_CRL_REASON:
+ value = self._build_crl_reason(ext)
+ elif oid == x509.OID_INVALIDITY_DATE:
+ value = self._build_invalidity_date(ext)
+ elif oid == x509.OID_CERTIFICATE_ISSUER and \
+ self._backend._lib.OPENSSL_VERSION_NUMBER >= 0x10000000:
+ value = self._build_cert_issuer(ext)
+ elif critical:
+ raise x509.UnsupportedExtension(
+ "{0} is not currently supported".format(oid), oid
+ )
+ else:
+ # Unsupported non-critical extension, silently skipping for now
+ seen_oids.add(oid)
+ continue
+
+ seen_oids.add(oid)
+ extensions.append(x509.Extension(oid, critical, value))
+
+ self.__extensions = x509.Extensions(extensions)
+ return self.__extensions
+
+ def get_reason(self):
+ """
+ Returns the CRLReason extension if it exists.
+ """
+ try:
+ return self.extensions.get_extension_for_oid(
+ x509.OID_CRL_REASON).value
+ except x509.ExtensionNotFound:
+ return None
+
+ def get_invalidity_date(self):
+ """
+ Returns the InvalidityDate extension if it exists.
+ """
+ try:
+ return self.extensions.get_extension_for_oid(
+ x509.OID_INVALIDITY_DATE).value
+ except x509.ExtensionNotFound:
+ return None
+
+ def get_certificate_issuer(self):
+ """
+ Returns the CertificateIssuer extension if it exists.
+ """
+ try:
+ return self.extensions.get_extension_for_oid(
+ x509.OID_CERTIFICATE_ISSUER).value
+ except x509.ExtensionNotFound:
+ return None
+
+ def _build_crl_reason(self, ext):
+ enum = self._backend._lib.X509V3_EXT_d2i(ext)
+ assert enum != self._backend._ffi.NULL
+ enum = self._backend._ffi.cast("ASN1_ENUMERATED *", enum)
+ enum = self._backend._ffi.gc(
+ enum, self._backend._lib.ASN1_ENUMERATED_free
+ )
+ code = self._backend._lib.ASN1_ENUMERATED_get(enum)
+ if code == 0:
+ return x509.ReasonFlags.unspecified
+ elif code == 1:
+ return x509.ReasonFlags.key_compromise
+ elif code == 2:
+ return x509.ReasonFlags.ca_compromise
+ elif code == 3:
+ return x509.ReasonFlags.affiliation_changed
+ elif code == 4:
+ return x509.ReasonFlags.superseded
+ elif code == 5:
+ return x509.ReasonFlags.cessation_of_operation
+ elif code == 6:
+ return x509.ReasonFlags.certificate_hold
+ elif code == 8:
+ return x509.ReasonFlags.remove_from_crl
+ elif code == 9:
+ return x509.ReasonFlags.privilege_withdrawn
+ elif code == 10:
+ return x509.ReasonFlags.aa_compromise
+ else:
+ raise ValueError("Unsupported reason code: {0}".format(code))
+
+ def _build_invalidity_date(self, ext):
+ generalized_time = self._backend._ffi.cast(
+ "ASN1_GENERALIZEDTIME *",
+ self._backend._lib.X509V3_EXT_d2i(ext)
+ )
+ assert generalized_time != self._backend._ffi.NULL
+ generalized_time = self._backend._ffi.gc(
+ generalized_time, self._backend._lib.ASN1_GENERALIZEDTIME_free
+ )
+ time = self._backend._ffi.string(
+ self._backend._lib.ASN1_STRING_data(
+ self._backend._ffi.cast("ASN1_STRING *", generalized_time)
+ )
+ ).decode("ascii")
+ return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ")
+
+ def _build_cert_issuer(self, ext):
+ gns = self._backend._ffi.cast(
+ "GENERAL_NAMES *", self._backend._lib.X509V3_EXT_d2i(ext)
+ )
+ assert gns != self._backend._ffi.NULL
+ gns = self._backend._ffi.gc(gns, self._backend._lib.GENERAL_NAMES_free)
+ return x509.GeneralNames(_decode_general_names(self._backend, gns))
+
+
+@utils.register_interface(x509.CertificateRevocationList)
+class _CertificateRevocationList(object):
+ def __init__(self, backend, x509_crl):
+ self._backend = backend
+ self._x509_crl = x509_crl
+
+ self.__revoked = None
+ self.__issuer = None
+ self.__next_update = None
+ self.__last_update = None
+
+ def __eq__(self, other):
+ if not isinstance(other, x509.CertificateRevocationList):
+ return NotImplemented
+
+ res = self._backend._lib.X509_CRL_cmp(self._x509_crl, other._x509_crl)
+ return res == 0
+
+ def __ne__(self, other):
+ return not self == other
+
+ def fingerprint(self, algorithm):
+ h = hashes.Hash(algorithm, self._backend)
+ bio = self._backend._create_mem_bio()
+ res = self._backend._lib.i2d_X509_CRL_bio(
+ bio, self._x509_crl
+ )
+ assert res == 1
+ der = self._backend._read_mem_bio(bio)
+ h.update(der)
+ return h.finalize()
+
+ @property
+ def signature_hash_algorithm(self):
+ oid = _obj2txt(self._backend, self._x509_crl.sig_alg.algorithm)
+ try:
+ return x509._SIG_OIDS_TO_HASH[oid]
+ except KeyError:
+ raise UnsupportedAlgorithm(
+ "Signature algorithm OID:{0} not recognized".format(oid)
+ )
+
+ @property
+ def issuer(self):
+ if self.__issuer:
+ return self.__issuer
+
+ issuer = self._backend._lib.X509_CRL_get_issuer(self._x509_crl)
+ assert issuer != self._backend._ffi.NULL
+ self.__issuer = _decode_x509_name(self._backend, issuer)
+ return self.__issuer
+
+ @property
+ def next_update(self):
+ if self.__next_update:
+ return self.__next_update
+
+ nu = self._backend._lib.X509_CRL_get_nextUpdate(self._x509_crl)
+ assert nu != self._backend._ffi.NULL
+ self.__next_update = self._backend._parse_asn1_time(nu)
+ return self.__next_update
+
+ @property
+ def last_update(self):
+ if self.__last_update:
+ return self.__last_update
+
+ lu = self._backend._lib.X509_CRL_get_lastUpdate(self._x509_crl)
+ assert lu != self._backend._ffi.NULL
+ self.__last_update = self._backend._parse_asn1_time(lu)
+ return self.__last_update
+
+ @property
+ def revoked_certificates(self):
+ if self.__revoked:
+ return self.__revoked
+
+ revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl)
+ assert revoked != self._backend._ffi.NULL
+
+ num = self._backend._lib.sk_X509_REVOKED_num(revoked)
+ revoked_list = []
+ for i in range(num):
+ r = self._backend._lib.sk_X509_REVOKED_value(revoked, i)
+ assert r != self._backend._ffi.NULL
+ revoked_list.append(_RevokedCertificate(self._backend, r))
+
+ self.__revoked = revoked_list
+ return self.__revoked
+
+ @property
+ def extensions(self):
+ raise NotImplementedError()
+
+
@utils.register_interface(x509.CertificateSigningRequest)
class _CertificateSigningRequest(object):
def __init__(self, backend, x509_req):
diff --git a/src/cryptography/x509/__init__.py b/src/cryptography/x509/__init__.py
index 1aa2598b..70e1d3da 100644
--- a/src/cryptography/x509/__init__.py
+++ b/src/cryptography/x509/__init__.py
@@ -8,8 +8,8 @@ from cryptography.x509.base import (
Certificate, CertificateBuilder, CertificateRevocationList,
CertificateSigningRequest, CertificateSigningRequestBuilder,
InvalidVersion, RevokedCertificate,
- Version, load_der_x509_certificate, load_der_x509_csr,
- load_pem_x509_certificate, load_pem_x509_csr,
+ Version, load_der_x509_certificate, load_der_x509_crl, load_der_x509_csr,
+ load_pem_x509_certificate, load_pem_x509_crl, load_pem_x509_csr,
)
from cryptography.x509.extensions import (
AccessDescription, AuthorityInformationAccess,
@@ -108,6 +108,8 @@ __all__ = [
"load_der_x509_certificate",
"load_pem_x509_csr",
"load_der_x509_csr",
+ "load_pem_x509_crl",
+ "load_der_x509_crl",
"InvalidVersion",
"DuplicateExtension",
"UnsupportedExtension",
diff --git a/src/cryptography/x509/base.py b/src/cryptography/x509/base.py
index 27eafac6..9dc49a60 100644
--- a/src/cryptography/x509/base.py
+++ b/src/cryptography/x509/base.py
@@ -40,6 +40,14 @@ def load_der_x509_csr(data, backend):
return backend.load_der_x509_csr(data)
+def load_pem_x509_crl(data, backend):
+ return backend.load_pem_x509_crl(data)
+
+
+def load_der_x509_crl(data, backend):
+ return backend.load_der_x509_crl(data)
+
+
class InvalidVersion(Exception):
def __init__(self, msg, parsed_version):
super(InvalidVersion, self).__init__(msg)