aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-x.travis/upload_coverage.sh3
-rw-r--r--CHANGELOG.rst7
-rw-r--r--docs/hazmat/primitives/asymmetric/ec.rst8
-rw-r--r--docs/hazmat/primitives/index.rst1
-rw-r--r--docs/hazmat/primitives/keywrap.rst59
-rw-r--r--docs/x509/reference.rst121
-rw-r--r--src/_cffi_src/openssl/asn1.py19
-rw-r--r--src/_cffi_src/openssl/bignum.py19
-rw-r--r--src/_cffi_src/openssl/engine.py2
-rw-r--r--src/cryptography/hazmat/backends/multibackend.py18
-rw-r--r--src/cryptography/hazmat/backends/openssl/backend.py118
-rw-r--r--src/cryptography/hazmat/backends/openssl/x509.py224
-rw-r--r--src/cryptography/hazmat/bindings/openssl/binding.py21
-rw-r--r--src/cryptography/hazmat/primitives/keywrap.py85
-rw-r--r--src/cryptography/x509/__init__.py6
-rw-r--r--src/cryptography/x509/base.py14
-rw-r--r--src/cryptography/x509/extensions.py5
-rw-r--r--src/cryptography/x509/name.py8
-rw-r--r--tests/hazmat/backends/test_multibackend.py12
-rw-r--r--tests/hazmat/bindings/test_openssl.py4
-rw-r--r--tests/hazmat/primitives/test_keywrap.py116
-rw-r--r--tests/test_x509.py355
-rw-r--r--tests/test_x509_ext.py14
23 files changed, 1165 insertions, 74 deletions
diff --git a/.travis/upload_coverage.sh b/.travis/upload_coverage.sh
index 62489560..113dbef8 100755
--- a/.travis/upload_coverage.sh
+++ b/.travis/upload_coverage.sh
@@ -6,6 +6,5 @@ set -x
NO_COVERAGE_TOXENVS=(pypy pypy3 pep8 py3pep8 docs)
if ! [[ "${NO_COVERAGE_TOXENVS[*]}" =~ "${TOXENV}" ]]; then
source ~/.venv/bin/activate
- wget https://codecov.io/bash -O codecov.sh
- bash codecov.sh -e TRAVIS_OS_NAME,TOXENV,OPENSSL
+ codecov --env TRAVIS_OS_NAME,TOXENV,OPENSSL
fi
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index ec27596c..08ac1093 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -9,6 +9,13 @@ Changelog
* Added support for Elliptic Curve Diffie-Hellman with
:class:`~cryptography.hazmat.primitives.asymmetric.ec.ECDH`.
* Added :class:`~cryptography.hazmat.primitives.kdf.x963kdf.X963KDF`.
+* Added support for parsing certificate revocation lists (CRLs) using
+ :func:`~cryptography.x509.load_pem_x509_crl` and
+ :func:`~cryptography.x509.load_der_x509_crl`.
+* Add support for AES key wrapping with
+ :func:`~cryptography.hazmat.primitives.keywrap.aes_key_wrap` and
+ :func:`~cryptography.hazmat.primitives.keywrap.aes_key_unwrap`.
+* Added an ``__hash__`` method to :class:`~cryptography.x509.Name`.
1.0.2 - 2015-09-27
~~~~~~~~~~~~~~~~~~
diff --git a/docs/hazmat/primitives/asymmetric/ec.rst b/docs/hazmat/primitives/asymmetric/ec.rst
index e4df9b10..90e73711 100644
--- a/docs/hazmat/primitives/asymmetric/ec.rst
+++ b/docs/hazmat/primitives/asymmetric/ec.rst
@@ -147,6 +147,11 @@ Elliptic Curve Key Exchange algorithm
... ).public_key()
>>> shared_key = private_key.exchange(ec.ECDH(), peer_public_key)
+ ECDHE (or EECDH), the ephemeral form of this exchange, is **strongly
+ preferred** over simple ECDH and provides `forward secrecy`_ when used.
+ You must generate a new private key using :func:`generate_private_key` for
+ each :meth:`~EllipticCurvePrivateKey.exchange` when performing an ECDHE key
+ exchange.
Elliptic Curves
---------------
@@ -342,6 +347,8 @@ Key Interfaces
.. method:: exchange(algorithm, peer_public_key)
+ .. versionadded:: 1.1
+
Perform's a key exchange operation using the provided algorithm with
the peer's public key.
@@ -470,3 +477,4 @@ Key Interfaces
.. _`SafeCurves`: http://safecurves.cr.yp.to/
.. _`ECDSA`: https://en.wikipedia.org/wiki/ECDSA
.. _`EdDSA`: https://en.wikipedia.org/wiki/EdDSA
+.. _`forward secrecy`: https://en.wikipedia.org/wiki/Forward_secrecy
diff --git a/docs/hazmat/primitives/index.rst b/docs/hazmat/primitives/index.rst
index a9ab38a0..cf27622a 100644
--- a/docs/hazmat/primitives/index.rst
+++ b/docs/hazmat/primitives/index.rst
@@ -11,6 +11,7 @@ Primitives
symmetric-encryption
padding
key-derivation-functions
+ keywrap
asymmetric/index
constant-time
interfaces
diff --git a/docs/hazmat/primitives/keywrap.rst b/docs/hazmat/primitives/keywrap.rst
new file mode 100644
index 00000000..e4f9ffeb
--- /dev/null
+++ b/docs/hazmat/primitives/keywrap.rst
@@ -0,0 +1,59 @@
+.. hazmat::
+
+.. module:: cryptography.hazmat.primitives.keywrap
+
+Key wrapping
+============
+
+Key wrapping is a cryptographic construct that uses symmetric encryption to
+encapsulate key material. Key wrapping algorithms are occasionally utilized
+to protect keys at rest or transmit them over insecure networks. Many of the
+protections offered by key wrapping are also offered by using authenticated
+:doc:`symmetric encryption </hazmat/primitives/symmetric-encryption>`.
+
+.. function:: aes_key_wrap(wrapping_key, key_to_wrap, backend)
+
+ .. versionadded:: 1.1
+
+ This function performs AES key wrap (without padding) as specified in
+ :rfc:`3394`.
+
+ :param bytes wrapping_key: The wrapping key.
+
+ :param bytes key_to_wrap: The key to wrap.
+
+ :param backend: A
+ :class:`~cryptography.hazmat.backends.interfaces.CipherBackend`
+ provider that supports
+ :class:`~cryptography.hazmat.primitives.ciphers.algorithms.AES`.
+
+ :return bytes: The wrapped key as bytes.
+
+.. function:: aes_key_unwrap(wrapping_key, wrapped_key, backend)
+
+ .. versionadded:: 1.1
+
+ This function performs AES key unwrap (without padding) as specified in
+ :rfc:`3394`.
+
+ :param bytes wrapping_key: The wrapping key.
+
+ :param bytes wrapped_key: The wrapped key.
+
+ :param backend: A
+ :class:`~cryptography.hazmat.backends.interfaces.CipherBackend`
+ provider that supports
+ :class:`~cryptography.hazmat.primitives.ciphers.algorithms.AES`.
+
+ :return bytes: The unwrapped key as bytes.
+
+ :raises cryptography.hazmat.primitives.keywrap.InvalidUnwrap: This is
+ raised if the key is not successfully unwrapped.
+
+Exceptions
+~~~~~~~~~~
+
+.. class:: InvalidUnwrap
+
+ This is raised when a wrapped key fails to unwrap. It can be caused by a
+ corrupted or invalid wrapped key or an invalid wrapping key.
diff --git a/docs/x509/reference.rst b/docs/x509/reference.rst
index 97224c9f..e7e02de3 100644
--- a/docs/x509/reference.rst
+++ b/docs/x509/reference.rst
@@ -5,6 +5,21 @@ X.509 Reference
.. testsetup::
+ pem_crl_data = b"""
+ -----BEGIN X509 CRL-----
+ MIIBtDCBnQIBAjANBgkqhkiG9w0BAQsFADAnMQswCQYDVQQGEwJVUzEYMBYGA1UE
+ AwwPY3J5cHRvZ3JhcGh5LmlvGA8yMDE1MDEwMTAwMDAwMFoYDzIwMTYwMTAxMDAw
+ MDAwWjA+MDwCAQAYDzIwMTUwMTAxMDAwMDAwWjAmMBgGA1UdGAQRGA8yMDE1MDEw
+ MTAwMDAwMFowCgYDVR0VBAMKAQEwDQYJKoZIhvcNAQELBQADggEBABRA4ww50Lz5
+ zk1j2+aluC4HPHqb7o06h4pTDcCGeXUKXIGeP5ntGGmIoxa26sNoLeOr8+5b43Gf
+ yWraHertllOwaOpNFEe+YZFaE9femtoDbf+GLMvRx/0wDfd3KxPoXnXKMXb2d1w4
+ RCLgmkYx6JyvS+5ciuLQVIKC+l7jwIUeZFLJMUJ8msM4pFYoGameeZmtjMbd/TNg
+ cVBfmZxNMHuLladJxvSo2esARo0TYPhYsgrREKoHwhpzSxdynjn4bOVkILfguwsN
+ qtEEMZFEv5Kb0GqRp2+Iagv2S6dg9JGvxVdsoGjaB6EbYSZ3Psx4aODasIn11uwo
+ X4B9vUQNXqc=
+ -----END X509 CRL-----
+ """.strip()
+
pem_req_data = b"""
-----BEGIN CERTIFICATE REQUEST-----
MIIC0zCCAbsCAQAwWTELMAkGA1UEBhMCVVMxETAPBgNVBAgMCElsbGlub2lzMRAw
@@ -129,6 +144,51 @@ Loading Certificates
>>> cert.serial
2
+Loading Certificate Revocation Lists
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. function:: load_pem_x509_crl(data, backend)
+
+ .. versionadded:: 1.1
+
+ Deserialize a certificate revocation list (CRL) from PEM encoded data. PEM
+ requests are base64 decoded and have delimiters that look like
+ ``-----BEGIN X509 CRL-----``.
+
+ :param bytes data: The PEM encoded request data.
+
+ :param backend: A backend supporting the
+ :class:`~cryptography.hazmat.backends.interfaces.X509Backend`
+ interface.
+
+ :returns: An instance of
+ :class:`~cryptography.x509.CertificateRevocationList`.
+
+.. function:: load_der_x509_crl(data, backend)
+
+ .. versionadded:: 1.1
+
+ Deserialize a certificate revocation list (CRL) from DER encoded data. DER
+ is a binary format.
+
+ :param bytes data: The DER encoded request data.
+
+ :param backend: A backend supporting the
+ :class:`~cryptography.hazmat.backends.interfaces.X509Backend`
+ interface.
+
+ :returns: An instance of
+ :class:`~cryptography.x509.CertificateRevocationList`.
+
+.. doctest::
+
+ >>> from cryptography import x509
+ >>> from cryptography.hazmat.backends import default_backend
+ >>> from cryptography.hazmat.primitives import hashes
+ >>> crl = x509.load_pem_x509_crl(pem_crl_data, default_backend())
+ >>> isinstance(crl.signature_hash_algorithm, hashes.SHA256)
+ True
+
Loading Certificate Signing Requests
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -340,6 +400,21 @@ X.509 CRL (Certificate Revocation List) Object
.. versionadded:: 1.0
+ A CertificateRevocationList is an object representing a list of revoked
+ certificates. The object is iterable and will yield the RevokedCertificate
+ objects stored in this CRL.
+
+ .. doctest::
+
+ >>> len(crl)
+ 1
+ >>> revoked_certificate = crl[0]
+ >>> type(revoked_certificate)
+ <class 'cryptography.hazmat.backends.openssl.x509._RevokedCertificate'>
+ >>> for r in crl:
+ ... print(r.serial_number)
+ 0
+
.. method:: fingerprint(algorithm)
:param algorithm: The
@@ -349,6 +424,12 @@ X.509 CRL (Certificate Revocation List) Object
:return bytes: The fingerprint using the supplied hash algorithm, as
bytes.
+ .. doctest::
+
+ >>> from cryptography.hazmat.primitives import hashes
+ >>> crl.fingerprint(hashes.SHA256())
+ 'e\xcf.\xc4:\x83?1\xdc\xf3\xfc\x95\xd7\xb3\x87\xb3\x8e\xf8\xb93!\x87\x07\x9d\x1b\xb4!\xb9\xe4W\xf4\x1f'
+
.. attribute:: signature_hash_algorithm
:type: :class:`~cryptography.hazmat.primitives.hashes.HashAlgorithm`
@@ -357,12 +438,23 @@ X.509 CRL (Certificate Revocation List) Object
:class:`~cryptography.hazmat.primitives.hashes.HashAlgorithm` which
was used in signing this CRL.
+ .. doctest::
+
+ >>> from cryptography.hazmat.primitives import hashes
+ >>> isinstance(crl.signature_hash_algorithm, hashes.SHA256)
+ True
+
.. attribute:: issuer
:type: :class:`Name`
The :class:`Name` of the issuer.
+ .. doctest::
+
+ >>> crl.issuer
+ <Name([<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.6, name=countryName)>, value=u'US')>, <NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.3, name=commonName)>, value=u'cryptography.io')>])>
+
.. attribute:: next_update
:type: :class:`datetime.datetime`
@@ -370,17 +462,21 @@ X.509 CRL (Certificate Revocation List) Object
A naïve datetime representing when the next update to this CRL is
expected.
+ .. doctest::
+
+ >>> crl.next_update
+ datetime.datetime(2016, 1, 1, 0, 0)
+
.. attribute:: last_update
:type: :class:`datetime.datetime`
A naïve datetime representing when the this CRL was last updated.
- .. attribute:: revoked_certificates
-
- :type: list of :class:`RevokedCertificate`
+ .. doctest::
- The revoked certificates listed in this CRL.
+ >>> crl.last_update
+ datetime.datetime(2015, 1, 1, 0, 0)
.. attribute:: extensions
@@ -611,18 +707,35 @@ X.509 Revoked Certificate Object
An integer representing the serial number of the revoked certificate.
+ .. doctest::
+
+ >>> revoked_certificate.serial_number
+ 0
+
.. attribute:: revocation_date
:type: :class:`datetime.datetime`
A naïve datetime representing the date this certificates was revoked.
+ .. doctest::
+
+ >>> revoked_certificate.revocation_date
+ datetime.datetime(2015, 1, 1, 0, 0)
+
.. attribute:: extensions
:type: :class:`Extensions`
The extensions encoded in the revoked certificate.
+ .. doctest::
+
+ >>> for ext in revoked_certificate.extensions:
+ ... print(ext)
+ <Extension(oid=<ObjectIdentifier(oid=2.5.29.24, name=invalidityDate)>, critical=False, value=2015-01-01 00:00:00)>
+ <Extension(oid=<ObjectIdentifier(oid=2.5.29.21, name=cRLReason)>, critical=False, value=ReasonFlags.key_compromise)>
+
X.509 CSR (Certificate Signing Request) Builder Object
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/src/_cffi_src/openssl/asn1.py b/src/_cffi_src/openssl/asn1.py
index bbbffd8f..259adf19 100644
--- a/src/_cffi_src/openssl/asn1.py
+++ b/src/_cffi_src/openssl/asn1.py
@@ -9,24 +9,7 @@ INCLUDES = """
"""
TYPES = """
-/*
- * TODO: This typedef is wrong.
- *
- * This is due to limitations of cffi.
- * See https://bitbucket.org/cffi/cffi/issue/69
- *
- * For another possible work-around (not used here because it involves more
- * complicated use of the cffi API which falls outside the general pattern used
- * by this package), see
- * http://paste.pound-python.org/show/iJcTUMkKeBeS6yXpZWUU/
- *
- * The work-around used here is to just be sure to declare a type that is at
- * least as large as the real type. Maciej explains:
- *
- * <fijal> I think you want to declare your value too large (e.g. long)
- * <fijal> that way you'll never pass garbage
- */
-typedef intptr_t time_t;
+typedef int... time_t;
typedef int ASN1_BOOLEAN;
typedef ... ASN1_INTEGER;
diff --git a/src/_cffi_src/openssl/bignum.py b/src/_cffi_src/openssl/bignum.py
index 843e5119..ae035007 100644
--- a/src/_cffi_src/openssl/bignum.py
+++ b/src/_cffi_src/openssl/bignum.py
@@ -11,24 +11,7 @@ INCLUDES = """
TYPES = """
typedef ... BN_CTX;
typedef ... BIGNUM;
-/*
- * TODO: This typedef is wrong.
- *
- * This is due to limitations of cffi.
- * See https://bitbucket.org/cffi/cffi/issue/69
- *
- * For another possible work-around (not used here because it involves more
- * complicated use of the cffi API which falls outside the general pattern used
- * by this package), see
- * http://paste.pound-python.org/show/iJcTUMkKeBeS6yXpZWUU/
- *
- * The work-around used here is to just be sure to declare a type that is at
- * least as large as the real type. Maciej explains:
- *
- * <fijal> I think you want to declare your value too large (e.g. long)
- * <fijal> that way you'll never pass garbage
- */
-typedef uintptr_t BN_ULONG;
+typedef int... BN_ULONG;
"""
FUNCTIONS = """
diff --git a/src/_cffi_src/openssl/engine.py b/src/_cffi_src/openssl/engine.py
index 011f6692..60c6f3e2 100644
--- a/src/_cffi_src/openssl/engine.py
+++ b/src/_cffi_src/openssl/engine.py
@@ -44,6 +44,8 @@ static const unsigned int ENGINE_METHOD_DIGESTS;
static const unsigned int ENGINE_METHOD_STORE;
static const unsigned int ENGINE_METHOD_ALL;
static const unsigned int ENGINE_METHOD_NONE;
+
+static const int ENGINE_R_CONFLICTING_ENGINE_ID;
"""
FUNCTIONS = """
diff --git a/src/cryptography/hazmat/backends/multibackend.py b/src/cryptography/hazmat/backends/multibackend.py
index c4d2c133..bbaaf424 100644
--- a/src/cryptography/hazmat/backends/multibackend.py
+++ b/src/cryptography/hazmat/backends/multibackend.py
@@ -331,6 +331,24 @@ class MultiBackend(object):
_Reasons.UNSUPPORTED_X509
)
+ def load_pem_x509_crl(self, data):
+ for b in self._filtered_backends(X509Backend):
+ return b.load_pem_x509_crl(data)
+
+ raise UnsupportedAlgorithm(
+ "This backend does not support X.509.",
+ _Reasons.UNSUPPORTED_X509
+ )
+
+ def load_der_x509_crl(self, data):
+ for b in self._filtered_backends(X509Backend):
+ return b.load_der_x509_crl(data)
+
+ raise UnsupportedAlgorithm(
+ "This backend does not support X.509.",
+ _Reasons.UNSUPPORTED_X509
+ )
+
def load_der_x509_csr(self, data):
for b in self._filtered_backends(X509Backend):
return b.load_der_x509_csr(data)
diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py
index 775430d4..8e302a99 100644
--- a/src/cryptography/hazmat/backends/openssl/backend.py
+++ b/src/cryptography/hazmat/backends/openssl/backend.py
@@ -37,8 +37,8 @@ from cryptography.hazmat.backends.openssl.rsa import (
_RSAPrivateKey, _RSAPublicKey
)
from cryptography.hazmat.backends.openssl.x509 import (
- _Certificate, _CertificateSigningRequest, _DISTPOINT_TYPE_FULLNAME,
- _DISTPOINT_TYPE_RELATIVENAME
+ _Certificate, _CertificateRevocationList, _CertificateSigningRequest,
+ _DISTPOINT_TYPE_FULLNAME, _DISTPOINT_TYPE_RELATIVENAME
)
from cryptography.hazmat.bindings.openssl import binding
from cryptography.hazmat.primitives import hashes, serialization
@@ -94,6 +94,20 @@ def _encode_asn1_str(backend, data, length):
return s
+def _encode_asn1_utf8_str(backend, string):
+ """
+ Create an ASN1_UTF8STRING from a Python unicode string.
+ This object will be a ASN1_STRING with UTF8 type in OpenSSL and
+ can be decoded with ASN1_STRING_to_UTF8.
+ """
+ s = backend._lib.ASN1_UTF8STRING_new()
+ res = backend._lib.ASN1_STRING_set(
+ s, string.encode("utf8"), len(string.encode("utf8"))
+ )
+ backend.openssl_assert(res == 1)
+ return s
+
+
def _encode_asn1_str_gc(backend, data, length):
s = _encode_asn1_str(backend, data, length)
s = backend._ffi.gc(s, backend._lib.ASN1_OCTET_STRING_free)
@@ -138,6 +152,81 @@ def _encode_name_gc(backend, attributes):
return subject
+def _encode_certificate_policies(backend, certificate_policies):
+ cp = backend._lib.sk_POLICYINFO_new_null()
+ backend.openssl_assert(cp != backend._ffi.NULL)
+ cp = backend._ffi.gc(cp, backend._lib.sk_POLICYINFO_free)
+ for policy_info in certificate_policies:
+ pi = backend._lib.POLICYINFO_new()
+ backend.openssl_assert(pi != backend._ffi.NULL)
+ res = backend._lib.sk_POLICYINFO_push(cp, pi)
+ backend.openssl_assert(res >= 1)
+ oid = _txt2obj(backend, policy_info.policy_identifier.dotted_string)
+ pi.policyid = oid
+ if policy_info.policy_qualifiers:
+ pqis = backend._lib.sk_POLICYQUALINFO_new_null()
+ backend.openssl_assert(pqis != backend._ffi.NULL)
+ for qualifier in policy_info.policy_qualifiers:
+ pqi = backend._lib.POLICYQUALINFO_new()
+ backend.openssl_assert(pqi != backend._ffi.NULL)
+ res = backend._lib.sk_POLICYQUALINFO_push(pqis, pqi)
+ backend.openssl_assert(res >= 1)
+ if isinstance(qualifier, six.text_type):
+ pqi.pqualid = _txt2obj(
+ backend, x509.OID_CPS_QUALIFIER.dotted_string
+ )
+ pqi.d.cpsuri = _encode_asn1_str(
+ backend,
+ qualifier.encode("ascii"),
+ len(qualifier.encode("ascii"))
+ )
+ else:
+ assert isinstance(qualifier, x509.UserNotice)
+ pqi.pqualid = _txt2obj(
+ backend, x509.OID_CPS_USER_NOTICE.dotted_string
+ )
+ un = backend._lib.USERNOTICE_new()
+ backend.openssl_assert(un != backend._ffi.NULL)
+ pqi.d.usernotice = un
+ if qualifier.explicit_text:
+ un.exptext = _encode_asn1_utf8_str(
+ backend, qualifier.explicit_text
+ )
+
+ un.noticeref = _encode_notice_reference(
+ backend, qualifier.notice_reference
+ )
+
+ pi.qualifiers = pqis
+
+ pp = backend._ffi.new('unsigned char **')
+ r = backend._lib.i2d_CERTIFICATEPOLICIES(cp, pp)
+ backend.openssl_assert(r > 0)
+ pp = backend._ffi.gc(
+ pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0])
+ )
+ return pp, r
+
+
+def _encode_notice_reference(backend, notice):
+ if notice is None:
+ return backend._ffi.NULL
+ else:
+ nr = backend._lib.NOTICEREF_new()
+ backend.openssl_assert(nr != backend._ffi.NULL)
+ # organization is a required field
+ nr.organization = _encode_asn1_utf8_str(backend, notice.organization)
+
+ notice_stack = backend._lib.sk_ASN1_INTEGER_new_null()
+ nr.noticenos = notice_stack
+ for number in notice.notice_numbers:
+ num = _encode_asn1_int(backend, number)
+ res = backend._lib.sk_ASN1_INTEGER_push(notice_stack, num)
+ backend.openssl_assert(res >= 1)
+
+ return nr
+
+
def _txt2obj(backend, name):
"""
Converts a Python string with an ASN.1 object ID in dotted form to a
@@ -489,6 +578,7 @@ _EXTENSION_ENCODE_HANDLERS = {
ExtensionOID.ISSUER_ALTERNATIVE_NAME: _encode_alt_name,
ExtensionOID.EXTENDED_KEY_USAGE: _encode_extended_key_usage,
ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _encode_authority_key_identifier,
+ ExtensionOID.CERTIFICATE_POLICIES: _encode_certificate_policies,
ExtensionOID.AUTHORITY_INFORMATION_ACCESS: (
_encode_authority_information_access
),
@@ -526,8 +616,6 @@ class Backend(object):
res = self._lib.ASN1_STRING_set_default_mask_asc(b"utf8only")
self.openssl_assert(res == 1)
- self._binding.init_static_locks()
-
self._cipher_registry = {}
self._register_default_ciphers()
self.activate_osrandom_engine()
@@ -1454,6 +1542,28 @@ class Backend(object):
x509 = self._ffi.gc(x509, self._lib.X509_free)
return _Certificate(self, x509)
+ def load_pem_x509_crl(self, data):
+ mem_bio = self._bytes_to_bio(data)
+ x509_crl = self._lib.PEM_read_bio_X509_CRL(
+ mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
+ )
+ if x509_crl == self._ffi.NULL:
+ self._consume_errors()
+ raise ValueError("Unable to load CRL")
+
+ x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
+ return _CertificateRevocationList(self, x509_crl)
+
+ def load_der_x509_crl(self, data):
+ mem_bio = self._bytes_to_bio(data)
+ x509_crl = self._lib.d2i_X509_CRL_bio(mem_bio.bio, self._ffi.NULL)
+ if x509_crl == self._ffi.NULL:
+ self._consume_errors()
+ raise ValueError("Unable to load CRL")
+
+ x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
+ return _CertificateRevocationList(self, x509_crl)
+
def load_pem_x509_csr(self, data):
mem_bio = self._bytes_to_bio(data)
x509_req = self._lib.PEM_read_bio_X509_REQ(
diff --git a/src/cryptography/hazmat/backends/openssl/x509.py b/src/cryptography/hazmat/backends/openssl/x509.py
index 2de5a8c7..1ba59b68 100644
--- a/src/cryptography/hazmat/backends/openssl/x509.py
+++ b/src/cryptography/hazmat/backends/openssl/x509.py
@@ -4,7 +4,9 @@
from __future__ import absolute_import, division, print_function
+import datetime
import ipaddress
+
from email.utils import parseaddr
import idna
@@ -16,7 +18,9 @@ from six.moves import urllib_parse
from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.primitives import hashes, serialization
-from cryptography.x509.oid import CertificatePoliciesOID, ExtensionOID
+from cryptography.x509.oid import (
+ CRLExtensionOID, CertificatePoliciesOID, ExtensionOID
+)
def _obj2txt(backend, obj):
@@ -176,10 +180,11 @@ def _decode_ocsp_no_check(backend, ext):
class _X509ExtensionParser(object):
- def __init__(self, ext_count, get_ext, handlers):
+ def __init__(self, ext_count, get_ext, handlers, unsupported_exts=None):
self.ext_count = ext_count
self.get_ext = get_ext
self.handlers = handlers
+ self.unsupported_exts = unsupported_exts
def parse(self, backend, x509_obj):
extensions = []
@@ -199,18 +204,25 @@ class _X509ExtensionParser(object):
except KeyError:
if critical:
raise x509.UnsupportedExtension(
- "{0} is not currently supported".format(oid), oid
+ "Critical extension {0} is not currently supported"
+ .format(oid), oid
)
else:
- d2i = backend._lib.X509V3_EXT_d2i(ext)
- if d2i == backend._ffi.NULL:
- backend._consume_errors()
- raise ValueError(
- "The {0} extension is invalid and can't be "
- "parsed".format(oid)
- )
-
- value = handler(backend, d2i)
+ # For extensions which are not supported by OpenSSL we pass the
+ # extension object directly to the parsing routine so it can
+ # be decoded manually.
+ if self.unsupported_exts and oid in self.unsupported_exts:
+ ext_data = ext
+ else:
+ ext_data = backend._lib.X509V3_EXT_d2i(ext)
+ if ext_data == backend._ffi.NULL:
+ backend._consume_errors()
+ raise ValueError(
+ "The {0} extension is invalid and can't be "
+ "parsed".format(oid)
+ )
+
+ value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
@@ -646,6 +658,178 @@ def _decode_inhibit_any_policy(backend, asn1_int):
return x509.InhibitAnyPolicy(skip_certs)
+_CRL_REASON_CODE_TO_ENUM = {
+ 0: x509.ReasonFlags.unspecified,
+ 1: x509.ReasonFlags.key_compromise,
+ 2: x509.ReasonFlags.ca_compromise,
+ 3: x509.ReasonFlags.affiliation_changed,
+ 4: x509.ReasonFlags.superseded,
+ 5: x509.ReasonFlags.cessation_of_operation,
+ 6: x509.ReasonFlags.certificate_hold,
+ 8: x509.ReasonFlags.remove_from_crl,
+ 9: x509.ReasonFlags.privilege_withdrawn,
+ 10: x509.ReasonFlags.aa_compromise,
+}
+
+
+def _decode_crl_reason(backend, enum):
+ enum = backend._ffi.cast("ASN1_ENUMERATED *", enum)
+ enum = backend._ffi.gc(enum, backend._lib.ASN1_ENUMERATED_free)
+ code = backend._lib.ASN1_ENUMERATED_get(enum)
+
+ try:
+ return _CRL_REASON_CODE_TO_ENUM[code]
+ except KeyError:
+ raise ValueError("Unsupported reason code: {0}".format(code))
+
+
+def _decode_invalidity_date(backend, inv_date):
+ generalized_time = backend._ffi.cast(
+ "ASN1_GENERALIZEDTIME *", inv_date
+ )
+ generalized_time = backend._ffi.gc(
+ generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free
+ )
+ time = backend._ffi.string(
+ backend._lib.ASN1_STRING_data(
+ backend._ffi.cast("ASN1_STRING *", generalized_time)
+ )
+ ).decode("ascii")
+ return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ")
+
+
+def _decode_cert_issuer(backend, ext):
+ """
+ This handler decodes the CertificateIssuer entry extension directly
+ from the X509_EXTENSION object. This is necessary because this entry
+ extension is not directly supported by OpenSSL 0.9.8.
+ """
+
+ data_ptr_ptr = backend._ffi.new("const unsigned char **")
+ data_ptr_ptr[0] = ext.value.data
+ gns = backend._lib.d2i_GENERAL_NAMES(
+ backend._ffi.NULL, data_ptr_ptr, ext.value.length
+ )
+
+ # Check the result of d2i_GENERAL_NAMES() is valid. Usually this is covered
+ # in _X509ExtensionParser but since we are responsible for decoding this
+ # entry extension ourselves, we have to this here.
+ if gns == backend._ffi.NULL:
+ backend._consume_errors()
+ raise ValueError(
+ "The {0} extension is corrupted and can't be parsed".format(
+ CRLExtensionOID.CERTIFICATE_ISSUER))
+
+ gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free)
+ return x509.GeneralNames(_decode_general_names(backend, gns))
+
+
+@utils.register_interface(x509.RevokedCertificate)
+class _RevokedCertificate(object):
+ def __init__(self, backend, x509_revoked):
+ self._backend = backend
+ self._x509_revoked = x509_revoked
+
+ @property
+ def serial_number(self):
+ asn1_int = self._x509_revoked.serialNumber
+ self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
+ return self._backend._asn1_integer_to_int(asn1_int)
+
+ @property
+ def revocation_date(self):
+ return self._backend._parse_asn1_time(
+ self._x509_revoked.revocationDate)
+
+ @property
+ def extensions(self):
+ return _REVOKED_CERTIFICATE_EXTENSION_PARSER.parse(
+ self._backend, self._x509_revoked
+ )
+
+
+@utils.register_interface(x509.CertificateRevocationList)
+class _CertificateRevocationList(object):
+ def __init__(self, backend, x509_crl):
+ self._backend = backend
+ self._x509_crl = x509_crl
+
+ def __eq__(self, other):
+ if not isinstance(other, x509.CertificateRevocationList):
+ return NotImplemented
+
+ res = self._backend._lib.X509_CRL_cmp(self._x509_crl, other._x509_crl)
+ return res == 0
+
+ def __ne__(self, other):
+ return not self == other
+
+ def fingerprint(self, algorithm):
+ h = hashes.Hash(algorithm, self._backend)
+ bio = self._backend._create_mem_bio()
+ res = self._backend._lib.i2d_X509_CRL_bio(
+ bio, self._x509_crl
+ )
+ self._backend.openssl_assert(res == 1)
+ der = self._backend._read_mem_bio(bio)
+ h.update(der)
+ return h.finalize()
+
+ @property
+ def signature_hash_algorithm(self):
+ oid = _obj2txt(self._backend, self._x509_crl.sig_alg.algorithm)
+ try:
+ return x509._SIG_OIDS_TO_HASH[oid]
+ except KeyError:
+ raise UnsupportedAlgorithm(
+ "Signature algorithm OID:{0} not recognized".format(oid)
+ )
+
+ @property
+ def issuer(self):
+ issuer = self._backend._lib.X509_CRL_get_issuer(self._x509_crl)
+ self._backend.openssl_assert(issuer != self._backend._ffi.NULL)
+ return _decode_x509_name(self._backend, issuer)
+
+ @property
+ def next_update(self):
+ nu = self._backend._lib.X509_CRL_get_nextUpdate(self._x509_crl)
+ self._backend.openssl_assert(nu != self._backend._ffi.NULL)
+ return self._backend._parse_asn1_time(nu)
+
+ @property
+ def last_update(self):
+ lu = self._backend._lib.X509_CRL_get_lastUpdate(self._x509_crl)
+ self._backend.openssl_assert(lu != self._backend._ffi.NULL)
+ return self._backend._parse_asn1_time(lu)
+
+ def _revoked_certificates(self):
+ revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl)
+ self._backend.openssl_assert(revoked != self._backend._ffi.NULL)
+
+ num = self._backend._lib.sk_X509_REVOKED_num(revoked)
+ revoked_list = []
+ for i in range(num):
+ r = self._backend._lib.sk_X509_REVOKED_value(revoked, i)
+ self._backend.openssl_assert(r != self._backend._ffi.NULL)
+ revoked_list.append(_RevokedCertificate(self._backend, r))
+
+ return revoked_list
+
+ def __iter__(self):
+ return iter(self._revoked_certificates())
+
+ def __getitem__(self, idx):
+ return self._revoked_certificates()[idx]
+
+ def __len__(self):
+ return len(self._revoked_certificates())
+
+ @property
+ def extensions(self):
+ raise NotImplementedError()
+
+
@utils.register_interface(x509.CertificateSigningRequest)
class _CertificateSigningRequest(object):
def __init__(self, backend, x509_req):
@@ -726,6 +910,15 @@ _EXTENSION_HANDLERS = {
ExtensionOID.NAME_CONSTRAINTS: _decode_name_constraints,
}
+_REVOKED_EXTENSION_HANDLERS = {
+ CRLExtensionOID.CRL_REASON: _decode_crl_reason,
+ CRLExtensionOID.INVALIDITY_DATE: _decode_invalidity_date,
+ CRLExtensionOID.CERTIFICATE_ISSUER: _decode_cert_issuer,
+}
+
+_REVOKED_UNSUPPORTED_EXTENSIONS = set([
+ CRLExtensionOID.CERTIFICATE_ISSUER,
+])
_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser(
ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x),
@@ -738,3 +931,10 @@ _CSR_EXTENSION_PARSER = _X509ExtensionParser(
get_ext=lambda backend, x, i: backend._lib.sk_X509_EXTENSION_value(x, i),
handlers=_EXTENSION_HANDLERS
)
+
+_REVOKED_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser(
+ ext_count=lambda backend, x: backend._lib.X509_REVOKED_get_ext_count(x),
+ get_ext=lambda backend, x, i: backend._lib.X509_REVOKED_get_ext(x, i),
+ handlers=_REVOKED_EXTENSION_HANDLERS,
+ unsupported_exts=_REVOKED_UNSUPPORTED_EXTENSIONS
+)
diff --git a/src/cryptography/hazmat/bindings/openssl/binding.py b/src/cryptography/hazmat/bindings/openssl/binding.py
index 47b1d6e2..a750cd6b 100644
--- a/src/cryptography/hazmat/bindings/openssl/binding.py
+++ b/src/cryptography/hazmat/bindings/openssl/binding.py
@@ -97,11 +97,6 @@ class Binding(object):
@classmethod
def _register_osrandom_engine(cls):
_openssl_assert(cls.lib, cls.lib.ERR_peek_error() == 0)
- looked_up_engine = cls.lib.ENGINE_by_id(cls._osrandom_engine_id)
- if looked_up_engine != ffi.NULL:
- raise RuntimeError("osrandom engine already registered")
-
- cls.lib.ERR_clear_error()
engine = cls.lib.ENGINE_new()
_openssl_assert(cls.lib, engine != cls.ffi.NULL)
@@ -113,7 +108,13 @@ class Binding(object):
result = cls.lib.ENGINE_set_RAND(engine, cls._osrandom_method)
_openssl_assert(cls.lib, result == 1)
result = cls.lib.ENGINE_add(engine)
- _openssl_assert(cls.lib, result == 1)
+ if result != 1:
+ errors = _consume_errors(cls.lib)
+ _openssl_assert(
+ cls.lib,
+ errors[0].reason == cls.lib.ENGINE_R_CONFLICTING_ENGINE_ID
+ )
+
finally:
result = cls.lib.ENGINE_free(engine)
_openssl_assert(cls.lib, result == 1)
@@ -171,3 +172,11 @@ class Binding(object):
mode, n, file, line
)
)
+
+
+# OpenSSL is not thread safe until the locks are initialized. We call this
+# method in module scope so that it executes with the import lock. On
+# Pythons < 3.4 this import lock is a global lock, which can prevent a race
+# condition registering the OpenSSL locks. On Python 3.4+ the import lock
+# is per module so this approach will not work.
+Binding.init_static_locks()
diff --git a/src/cryptography/hazmat/primitives/keywrap.py b/src/cryptography/hazmat/primitives/keywrap.py
new file mode 100644
index 00000000..6e79ab6b
--- /dev/null
+++ b/src/cryptography/hazmat/primitives/keywrap.py
@@ -0,0 +1,85 @@
+# This file is dual licensed under the terms of the Apache License, Version
+# 2.0, and the BSD License. See the LICENSE file in the root of this repository
+# for complete details.
+
+from __future__ import absolute_import, division, print_function
+
+import struct
+
+from cryptography.hazmat.primitives.ciphers import Cipher
+from cryptography.hazmat.primitives.ciphers.algorithms import AES
+from cryptography.hazmat.primitives.ciphers.modes import ECB
+from cryptography.hazmat.primitives.constant_time import bytes_eq
+
+
+def aes_key_wrap(wrapping_key, key_to_wrap, backend):
+ if len(wrapping_key) not in [16, 24, 32]:
+ raise ValueError("The wrapping key must be a valid AES key length")
+
+ if len(key_to_wrap) < 16:
+ raise ValueError("The key to wrap must be at least 16 bytes")
+
+ if len(key_to_wrap) % 8 != 0:
+ raise ValueError("The key to wrap must be a multiple of 8 bytes")
+
+ # RFC 3394 Key Wrap - 2.2.1 (index method)
+ encryptor = Cipher(AES(wrapping_key), ECB(), backend).encryptor()
+ a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
+ r = [key_to_wrap[i:i + 8] for i in range(0, len(key_to_wrap), 8)]
+ n = len(r)
+ for j in range(6):
+ for i in range(n):
+ # every encryption operation is a discrete 16 byte chunk (because
+ # AES has a 128-bit block size) and since we're using ECB it is
+ # safe to reuse the encryptor for the entire operation
+ b = encryptor.update(a + r[i])
+ # pack/unpack are safe as these are always 64-bit chunks
+ a = struct.pack(
+ ">Q", struct.unpack(">Q", b[:8])[0] ^ ((n * j) + i + 1)
+ )
+ r[i] = b[-8:]
+
+ assert encryptor.finalize() == b""
+
+ return a + b"".join(r)
+
+
+def aes_key_unwrap(wrapping_key, wrapped_key, backend):
+ if len(wrapped_key) < 24:
+ raise ValueError("Must be at least 24 bytes")
+
+ if len(wrapped_key) % 8 != 0:
+ raise ValueError("The wrapped key must be a multiple of 8 bytes")
+
+ if len(wrapping_key) not in [16, 24, 32]:
+ raise ValueError("The wrapping key must be a valid AES key length")
+
+ # Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
+ decryptor = Cipher(AES(wrapping_key), ECB(), backend).decryptor()
+ aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
+
+ r = [wrapped_key[i:i + 8] for i in range(0, len(wrapped_key), 8)]
+ a = r.pop(0)
+ n = len(r)
+ for j in reversed(range(6)):
+ for i in reversed(range(n)):
+ # pack/unpack are safe as these are always 64-bit chunks
+ atr = struct.pack(
+ ">Q", struct.unpack(">Q", a)[0] ^ ((n * j) + i + 1)
+ ) + r[i]
+ # every decryption operation is a discrete 16 byte chunk so
+ # it is safe to reuse the decryptor for the entire operation
+ b = decryptor.update(atr)
+ a = b[:8]
+ r[i] = b[-8:]
+
+ assert decryptor.finalize() == b""
+
+ if not bytes_eq(a, aiv):
+ raise InvalidUnwrap()
+
+ return b"".join(r)
+
+
+class InvalidUnwrap(Exception):
+ pass
diff --git a/src/cryptography/x509/__init__.py b/src/cryptography/x509/__init__.py
index 1aa2598b..70e1d3da 100644
--- a/src/cryptography/x509/__init__.py
+++ b/src/cryptography/x509/__init__.py
@@ -8,8 +8,8 @@ from cryptography.x509.base import (
Certificate, CertificateBuilder, CertificateRevocationList,
CertificateSigningRequest, CertificateSigningRequestBuilder,
InvalidVersion, RevokedCertificate,
- Version, load_der_x509_certificate, load_der_x509_csr,
- load_pem_x509_certificate, load_pem_x509_csr,
+ Version, load_der_x509_certificate, load_der_x509_crl, load_der_x509_csr,
+ load_pem_x509_certificate, load_pem_x509_crl, load_pem_x509_csr,
)
from cryptography.x509.extensions import (
AccessDescription, AuthorityInformationAccess,
@@ -108,6 +108,8 @@ __all__ = [
"load_der_x509_certificate",
"load_pem_x509_csr",
"load_der_x509_csr",
+ "load_pem_x509_crl",
+ "load_der_x509_crl",
"InvalidVersion",
"DuplicateExtension",
"UnsupportedExtension",
diff --git a/src/cryptography/x509/base.py b/src/cryptography/x509/base.py
index 27eafac6..01eadfcb 100644
--- a/src/cryptography/x509/base.py
+++ b/src/cryptography/x509/base.py
@@ -40,6 +40,14 @@ def load_der_x509_csr(data, backend):
return backend.load_der_x509_csr(data)
+def load_pem_x509_crl(data, backend):
+ return backend.load_pem_x509_crl(data)
+
+
+def load_der_x509_crl(data, backend):
+ return backend.load_der_x509_crl(data)
+
+
class InvalidVersion(Exception):
def __init__(self, msg, parsed_version):
super(InvalidVersion, self).__init__(msg)
@@ -169,12 +177,6 @@ class CertificateRevocationList(object):
"""
@abc.abstractproperty
- def revoked_certificates(self):
- """
- Returns a list of RevokedCertificate objects for this CRL.
- """
-
- @abc.abstractproperty
def extensions(self):
"""
Returns an Extensions object containing a list of CRL extensions.
diff --git a/src/cryptography/x509/extensions.py b/src/cryptography/x509/extensions.py
index cd75ecdc..46ba5a28 100644
--- a/src/cryptography/x509/extensions.py
+++ b/src/cryptography/x509/extensions.py
@@ -104,6 +104,11 @@ class Extensions(object):
def __len__(self):
return len(self._extensions)
+ def __repr__(self):
+ return (
+ "<Extensions({0})>".format(self._extensions)
+ )
+
@utils.register_interface(ExtensionType)
class AuthorityKeyIdentifier(object):
diff --git a/src/cryptography/x509/name.py b/src/cryptography/x509/name.py
index 992786ef..9d93ece1 100644
--- a/src/cryptography/x509/name.py
+++ b/src/cryptography/x509/name.py
@@ -40,6 +40,9 @@ class NameAttribute(object):
def __ne__(self, other):
return not self == other
+ def __hash__(self):
+ return hash((self.oid, self.value))
+
def __repr__(self):
return "<NameAttribute(oid={0.oid}, value={0.value!r})>".format(self)
@@ -60,6 +63,11 @@ class Name(object):
def __ne__(self, other):
return not self == other
+ def __hash__(self):
+ # TODO: this is relatively expensive, if this looks like a bottleneck
+ # for you, consider optimizing!
+ return hash(tuple(self._attributes))
+
def __iter__(self):
return iter(self._attributes)
diff --git a/tests/hazmat/backends/test_multibackend.py b/tests/hazmat/backends/test_multibackend.py
index 2a533750..81a64ce0 100644
--- a/tests/hazmat/backends/test_multibackend.py
+++ b/tests/hazmat/backends/test_multibackend.py
@@ -200,6 +200,12 @@ class DummyX509Backend(object):
def load_der_x509_certificate(self, data):
pass
+ def load_pem_x509_crl(self, data):
+ pass
+
+ def load_der_x509_crl(self, data):
+ pass
+
def load_pem_x509_csr(self, data):
pass
@@ -502,6 +508,8 @@ class TestMultiBackend(object):
backend.load_pem_x509_certificate(b"certdata")
backend.load_der_x509_certificate(b"certdata")
+ backend.load_pem_x509_crl(b"crldata")
+ backend.load_der_x509_crl(b"crldata")
backend.load_pem_x509_csr(b"reqdata")
backend.load_der_x509_csr(b"reqdata")
backend.create_x509_csr(object(), b"privatekey", hashes.SHA1())
@@ -513,6 +521,10 @@ class TestMultiBackend(object):
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509):
backend.load_der_x509_certificate(b"certdata")
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509):
+ backend.load_pem_x509_crl(b"crldata")
+ with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509):
+ backend.load_der_x509_crl(b"crldata")
+ with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509):
backend.load_pem_x509_csr(b"reqdata")
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509):
backend.load_der_x509_csr(b"reqdata")
diff --git a/tests/hazmat/bindings/test_openssl.py b/tests/hazmat/bindings/test_openssl.py
index 20171fa7..76a9218b 100644
--- a/tests/hazmat/bindings/test_openssl.py
+++ b/tests/hazmat/bindings/test_openssl.py
@@ -89,8 +89,8 @@ class TestOpenSSL(object):
def test_add_engine_more_than_once(self):
b = Binding()
- with pytest.raises(RuntimeError):
- b._register_osrandom_engine()
+ b._register_osrandom_engine()
+ assert b.lib.ERR_get_error() == 0
def test_ssl_ctx_options(self):
# Test that we're properly handling 32-bit unsigned on all platforms.
diff --git a/tests/hazmat/primitives/test_keywrap.py b/tests/hazmat/primitives/test_keywrap.py
new file mode 100644
index 00000000..f1238c9a
--- /dev/null
+++ b/tests/hazmat/primitives/test_keywrap.py
@@ -0,0 +1,116 @@
+# This file is dual licensed under the terms of the Apache License, Version
+# 2.0, and the BSD License. See the LICENSE file in the root of this repository
+# for complete details.
+
+from __future__ import absolute_import, division, print_function
+
+import binascii
+import os
+
+import pytest
+
+from cryptography.hazmat.backends.interfaces import CipherBackend
+from cryptography.hazmat.primitives import keywrap
+from cryptography.hazmat.primitives.ciphers import algorithms, modes
+
+from .utils import _load_all_params
+from ...utils import load_nist_vectors
+
+
+@pytest.mark.requires_backend_interface(interface=CipherBackend)
+class TestAESKeyWrap(object):
+ @pytest.mark.parametrize(
+ "params",
+ _load_all_params(
+ os.path.join("keywrap", "kwtestvectors"),
+ ["KW_AE_128.txt", "KW_AE_192.txt", "KW_AE_256.txt"],
+ load_nist_vectors
+ )
+ )
+ @pytest.mark.supported(
+ only_if=lambda backend: backend.cipher_supported(
+ algorithms.AES("\x00" * 16), modes.ECB()
+ ),
+ skip_message="Does not support AES key wrap (RFC 3394) because AES-ECB"
+ " is unsupported",
+ )
+ def test_wrap(self, backend, params):
+ wrapping_key = binascii.unhexlify(params["k"])
+ key_to_wrap = binascii.unhexlify(params["p"])
+ wrapped_key = keywrap.aes_key_wrap(wrapping_key, key_to_wrap, backend)
+ assert params["c"] == binascii.hexlify(wrapped_key)
+
+ @pytest.mark.parametrize(
+ "params",
+ _load_all_params(
+ os.path.join("keywrap", "kwtestvectors"),
+ ["KW_AD_128.txt", "KW_AD_192.txt", "KW_AD_256.txt"],
+ load_nist_vectors
+ )
+ )
+ @pytest.mark.supported(
+ only_if=lambda backend: backend.cipher_supported(
+ algorithms.AES("\x00" * 16), modes.ECB()
+ ),
+ skip_message="Does not support AES key wrap (RFC 3394) because AES-ECB"
+ " is unsupported",
+ )
+ def test_unwrap(self, backend, params):
+ wrapping_key = binascii.unhexlify(params["k"])
+ wrapped_key = binascii.unhexlify(params["c"])
+ if params.get("fail") is True:
+ with pytest.raises(keywrap.InvalidUnwrap):
+ keywrap.aes_key_unwrap(wrapping_key, wrapped_key, backend)
+ else:
+ unwrapped_key = keywrap.aes_key_unwrap(
+ wrapping_key, wrapped_key, backend
+ )
+ assert params["p"] == binascii.hexlify(unwrapped_key)
+
+ @pytest.mark.supported(
+ only_if=lambda backend: backend.cipher_supported(
+ algorithms.AES("\x00" * 16), modes.ECB()
+ ),
+ skip_message="Does not support AES key wrap (RFC 3394) because AES-ECB"
+ " is unsupported",
+ )
+ def test_wrap_invalid_key_length(self, backend):
+ # The wrapping key must be of length [16, 24, 32]
+ with pytest.raises(ValueError):
+ keywrap.aes_key_wrap(b"badkey", b"sixteen_byte_key", backend)
+
+ @pytest.mark.supported(
+ only_if=lambda backend: backend.cipher_supported(
+ algorithms.AES("\x00" * 16), modes.ECB()
+ ),
+ skip_message="Does not support AES key wrap (RFC 3394) because AES-ECB"
+ " is unsupported",
+ )
+ def test_unwrap_invalid_key_length(self, backend):
+ with pytest.raises(ValueError):
+ keywrap.aes_key_unwrap(b"badkey", b"\x00" * 24, backend)
+
+ @pytest.mark.supported(
+ only_if=lambda backend: backend.cipher_supported(
+ algorithms.AES("\x00" * 16), modes.ECB()
+ ),
+ skip_message="Does not support AES key wrap (RFC 3394) because AES-ECB"
+ " is unsupported",
+ )
+ def test_wrap_invalid_key_to_wrap_length(self, backend):
+ # Keys to wrap must be at least 16 bytes long
+ with pytest.raises(ValueError):
+ keywrap.aes_key_wrap(b"sixteen_byte_key", b"\x00" * 15, backend)
+
+ # Keys to wrap must be a multiple of 8 bytes
+ with pytest.raises(ValueError):
+ keywrap.aes_key_wrap(b"sixteen_byte_key", b"\x00" * 23, backend)
+
+ def test_unwrap_invalid_wrapped_key_length(self, backend):
+ # Keys to unwrap must be at least 24 bytes
+ with pytest.raises(ValueError):
+ keywrap.aes_key_unwrap(b"sixteen_byte_key", b"\x00" * 16, backend)
+
+ # Keys to unwrap must be a multiple of 8 bytes
+ with pytest.raises(ValueError):
+ keywrap.aes_key_unwrap(b"sixteen_byte_key", b"\x00" * 27, backend)
diff --git a/tests/test_x509.py b/tests/test_x509.py
index 43eca472..4072ef15 100644
--- a/tests/test_x509.py
+++ b/tests/test_x509.py
@@ -56,6 +56,255 @@ def _load_cert(filename, loader, backend):
return cert
+@pytest.mark.requires_backend_interface(interface=X509Backend)
+class TestCertificateRevocationList(object):
+ def test_load_pem_crl(self, backend):
+ crl = _load_cert(
+ os.path.join("x509", "custom", "crl_all_reasons.pem"),
+ x509.load_pem_x509_crl,
+ backend
+ )
+
+ assert isinstance(crl, x509.CertificateRevocationList)
+ fingerprint = binascii.hexlify(crl.fingerprint(hashes.SHA1()))
+ assert fingerprint == b"3234b0cb4c0cedf6423724b736729dcfc9e441ef"
+ assert isinstance(crl.signature_hash_algorithm, hashes.SHA256)
+
+ def test_load_der_crl(self, backend):
+ crl = _load_cert(
+ os.path.join("x509", "PKITS_data", "crls", "GoodCACRL.crl"),
+ x509.load_der_x509_crl,
+ backend
+ )
+
+ assert isinstance(crl, x509.CertificateRevocationList)
+ fingerprint = binascii.hexlify(crl.fingerprint(hashes.SHA1()))
+ assert fingerprint == b"dd3db63c50f4c4a13e090f14053227cb1011a5ad"
+ assert isinstance(crl.signature_hash_algorithm, hashes.SHA256)
+
+ def test_invalid_pem(self, backend):
+ with pytest.raises(ValueError):
+ x509.load_pem_x509_crl(b"notacrl", backend)
+
+ def test_invalid_der(self, backend):
+ with pytest.raises(ValueError):
+ x509.load_der_x509_crl(b"notacrl", backend)
+
+ def test_unknown_signature_algorithm(self, backend):
+ crl = _load_cert(
+ os.path.join(
+ "x509", "custom", "crl_md2_unknown_crit_entry_ext.pem"
+ ),
+ x509.load_pem_x509_crl,
+ backend
+ )
+
+ with pytest.raises(UnsupportedAlgorithm):
+ crl.signature_hash_algorithm()
+
+ def test_issuer(self, backend):
+ crl = _load_cert(
+ os.path.join("x509", "PKITS_data", "crls", "GoodCACRL.crl"),
+ x509.load_der_x509_crl,
+ backend
+ )
+
+ assert isinstance(crl.issuer, x509.Name)
+ assert list(crl.issuer) == [
+ x509.NameAttribute(x509.OID_COUNTRY_NAME, u'US'),
+ x509.NameAttribute(
+ x509.OID_ORGANIZATION_NAME, u'Test Certificates 2011'
+ ),
+ x509.NameAttribute(x509.OID_COMMON_NAME, u'Good CA')
+ ]
+ assert crl.issuer.get_attributes_for_oid(x509.OID_COMMON_NAME) == [
+ x509.NameAttribute(x509.OID_COMMON_NAME, u'Good CA')
+ ]
+
+ def test_equality(self, backend):
+ crl1 = _load_cert(
+ os.path.join("x509", "PKITS_data", "crls", "GoodCACRL.crl"),
+ x509.load_der_x509_crl,
+ backend
+ )
+
+ crl2 = _load_cert(
+ os.path.join("x509", "PKITS_data", "crls", "GoodCACRL.crl"),
+ x509.load_der_x509_crl,
+ backend
+ )
+
+ crl3 = _load_cert(
+ os.path.join("x509", "custom", "crl_all_reasons.pem"),
+ x509.load_pem_x509_crl,
+ backend
+ )
+
+ assert crl1 == crl2
+ assert crl1 != crl3
+ assert crl1 != object()
+
+ def test_update_dates(self, backend):
+ crl = _load_cert(
+ os.path.join("x509", "custom", "crl_all_reasons.pem"),
+ x509.load_pem_x509_crl,
+ backend
+ )
+
+ assert isinstance(crl.next_update, datetime.datetime)
+ assert isinstance(crl.last_update, datetime.datetime)
+
+ assert crl.next_update.isoformat() == "2016-01-01T00:00:00"
+ assert crl.last_update.isoformat() == "2015-01-01T00:00:00"
+
+ def test_revoked_cert_retrieval(self, backend):
+ crl = _load_cert(
+ os.path.join("x509", "custom", "crl_all_reasons.pem"),
+ x509.load_pem_x509_crl,
+ backend
+ )
+
+ for r in crl:
+ assert isinstance(r, x509.RevokedCertificate)
+
+ # Check that len() works for CRLs.
+ assert len(crl) == 12
+
+ def test_extensions(self, backend):
+ crl = _load_cert(
+ os.path.join("x509", "custom", "crl_all_reasons.pem"),
+ x509.load_pem_x509_crl,
+ backend
+ )
+
+ # CRL extensions are currently not supported in the OpenSSL backend.
+ with pytest.raises(NotImplementedError):
+ crl.extensions
+
+
+@pytest.mark.requires_backend_interface(interface=X509Backend)
+class TestRevokedCertificate(object):
+
+ def test_revoked_basics(self, backend):
+ crl = _load_cert(
+ os.path.join("x509", "custom", "crl_all_reasons.pem"),
+ x509.load_pem_x509_crl,
+ backend
+ )
+
+ for i, rev in enumerate(crl):
+ assert isinstance(rev, x509.RevokedCertificate)
+ assert isinstance(rev.serial_number, int)
+ assert isinstance(rev.revocation_date, datetime.datetime)
+ assert isinstance(rev.extensions, x509.Extensions)
+
+ assert rev.serial_number == i
+ assert rev.revocation_date.isoformat() == "2015-01-01T00:00:00"
+
+ def test_revoked_extensions(self, backend):
+ crl = _load_cert(
+ os.path.join("x509", "custom", "crl_all_reasons.pem"),
+ x509.load_pem_x509_crl,
+ backend
+ )
+
+ exp_issuer = x509.GeneralNames([
+ x509.DirectoryName(x509.Name([
+ x509.NameAttribute(x509.OID_COUNTRY_NAME, u"US"),
+ x509.NameAttribute(x509.OID_COMMON_NAME, u"cryptography.io"),
+ ]))
+ ])
+
+ # First revoked cert doesn't have extensions, test if it is handled
+ # correctly.
+ rev0 = crl[0]
+ # It should return an empty Extensions object.
+ assert isinstance(rev0.extensions, x509.Extensions)
+ assert len(rev0.extensions) == 0
+ with pytest.raises(x509.ExtensionNotFound):
+ rev0.extensions.get_extension_for_oid(x509.OID_CRL_REASON)
+ with pytest.raises(x509.ExtensionNotFound):
+ rev0.extensions.get_extension_for_oid(x509.OID_CERTIFICATE_ISSUER)
+ with pytest.raises(x509.ExtensionNotFound):
+ rev0.extensions.get_extension_for_oid(x509.OID_INVALIDITY_DATE)
+
+ # Test manual retrieval of extension values.
+ rev1 = crl[1]
+ assert isinstance(rev1.extensions, x509.Extensions)
+
+ reason = rev1.extensions.get_extension_for_oid(
+ x509.OID_CRL_REASON).value
+ assert reason == x509.ReasonFlags.unspecified
+
+ issuer = rev1.extensions.get_extension_for_oid(
+ x509.OID_CERTIFICATE_ISSUER).value
+ assert issuer == exp_issuer
+
+ date = rev1.extensions.get_extension_for_oid(
+ x509.OID_INVALIDITY_DATE).value
+ assert isinstance(date, datetime.datetime)
+ assert date.isoformat() == "2015-01-01T00:00:00"
+
+ # Check if all reason flags can be found in the CRL.
+ flags = set(x509.ReasonFlags)
+ for rev in crl:
+ try:
+ r = rev.extensions.get_extension_for_oid(x509.OID_CRL_REASON)
+ except x509.ExtensionNotFound:
+ # Not all revoked certs have a reason extension.
+ pass
+ else:
+ flags.discard(r.value)
+
+ assert len(flags) == 0
+
+ def test_duplicate_entry_ext(self, backend):
+ crl = _load_cert(
+ os.path.join("x509", "custom", "crl_dup_entry_ext.pem"),
+ x509.load_pem_x509_crl,
+ backend
+ )
+
+ with pytest.raises(x509.DuplicateExtension):
+ crl[0].extensions
+
+ def test_unsupported_crit_entry_ext(self, backend):
+ crl = _load_cert(
+ os.path.join(
+ "x509", "custom", "crl_md2_unknown_crit_entry_ext.pem"
+ ),
+ x509.load_pem_x509_crl,
+ backend
+ )
+
+ with pytest.raises(x509.UnsupportedExtension):
+ crl[0].extensions
+
+ def test_unsupported_reason(self, backend):
+ crl = _load_cert(
+ os.path.join(
+ "x509", "custom", "crl_unsupported_reason.pem"
+ ),
+ x509.load_pem_x509_crl,
+ backend
+ )
+
+ with pytest.raises(ValueError):
+ crl[0].extensions
+
+ def test_invalid_cert_issuer_ext(self, backend):
+ crl = _load_cert(
+ os.path.join(
+ "x509", "custom", "crl_inval_cert_issuer_entry_ext.pem"
+ ),
+ x509.load_pem_x509_crl,
+ backend
+ )
+
+ with pytest.raises(ValueError):
+ crl[0].extensions
+
+
@pytest.mark.requires_backend_interface(interface=RSABackend)
@pytest.mark.requires_backend_interface(interface=X509Backend)
class TestRSACertificate(object):
@@ -1486,6 +1735,95 @@ class TestCertificateBuilder(object):
with pytest.raises(ValueError):
builder.sign(issuer_private_key, hashes.SHA512(), backend)
+ @pytest.mark.parametrize(
+ "cp",
+ [
+ x509.CertificatePolicies([
+ x509.PolicyInformation(
+ x509.ObjectIdentifier("2.16.840.1.12345.1.2.3.4.1"),
+ [u"http://other.com/cps"]
+ )
+ ]),
+ x509.CertificatePolicies([
+ x509.PolicyInformation(
+ x509.ObjectIdentifier("2.16.840.1.12345.1.2.3.4.1"),
+ None
+ )
+ ]),
+ x509.CertificatePolicies([
+ x509.PolicyInformation(
+ x509.ObjectIdentifier("2.16.840.1.12345.1.2.3.4.1"),
+ [
+ u"http://example.com/cps",
+ u"http://other.com/cps",
+ x509.UserNotice(
+ x509.NoticeReference(u"my org", [1, 2, 3, 4]),
+ u"thing"
+ )
+ ]
+ )
+ ]),
+ x509.CertificatePolicies([
+ x509.PolicyInformation(
+ x509.ObjectIdentifier("2.16.840.1.12345.1.2.3.4.1"),
+ [
+ u"http://example.com/cps",
+ x509.UserNotice(
+ x509.NoticeReference(u"UTF8\u2122'", [1, 2, 3, 4]),
+ u"We heart UTF8!\u2122"
+ )
+ ]
+ )
+ ]),
+ x509.CertificatePolicies([
+ x509.PolicyInformation(
+ x509.ObjectIdentifier("2.16.840.1.12345.1.2.3.4.1"),
+ [x509.UserNotice(None, u"thing")]
+ )
+ ]),
+ x509.CertificatePolicies([
+ x509.PolicyInformation(
+ x509.ObjectIdentifier("2.16.840.1.12345.1.2.3.4.1"),
+ [
+ x509.UserNotice(
+ x509.NoticeReference(u"my org", [1, 2, 3, 4]),
+ None
+ )
+ ]
+ )
+ ])
+ ]
+ )
+ @pytest.mark.requires_backend_interface(interface=RSABackend)
+ @pytest.mark.requires_backend_interface(interface=X509Backend)
+ def test_certificate_policies(self, cp, backend):
+ issuer_private_key = RSA_KEY_2048.private_key(backend)
+ subject_private_key = RSA_KEY_2048.private_key(backend)
+
+ not_valid_before = datetime.datetime(2002, 1, 1, 12, 1)
+ not_valid_after = datetime.datetime(2030, 12, 31, 8, 30)
+
+ cert = x509.CertificateBuilder().subject_name(
+ x509.Name([x509.NameAttribute(x509.OID_COUNTRY_NAME, u'US')])
+ ).issuer_name(
+ x509.Name([x509.NameAttribute(x509.OID_COUNTRY_NAME, u'US')])
+ ).not_valid_before(
+ not_valid_before
+ ).not_valid_after(
+ not_valid_after
+ ).public_key(
+ subject_private_key.public_key()
+ ).serial_number(
+ 123
+ ).add_extension(
+ cp, critical=False
+ ).sign(issuer_private_key, hashes.SHA256(), backend)
+
+ ext = cert.extensions.get_extension_for_oid(
+ x509.OID_CERTIFICATE_POLICIES
+ )
+ assert ext.value == cp
+
@pytest.mark.requires_backend_interface(interface=RSABackend)
@pytest.mark.requires_backend_interface(interface=X509Backend)
def test_issuer_alt_name(self, backend):
@@ -2547,6 +2885,23 @@ class TestName(object):
assert name1 != name2
assert name1 != object()
+ def test_hash(self):
+ name1 = x509.Name([
+ x509.NameAttribute(x509.ObjectIdentifier('oid'), u'value1'),
+ x509.NameAttribute(x509.ObjectIdentifier('oid2'), u'value2'),
+ ])
+ name2 = x509.Name([
+ x509.NameAttribute(x509.ObjectIdentifier('oid'), u'value1'),
+ x509.NameAttribute(x509.ObjectIdentifier('oid2'), u'value2'),
+ ])
+ name3 = x509.Name([
+ x509.NameAttribute(x509.ObjectIdentifier('oid2'), u'value2'),
+ x509.NameAttribute(x509.ObjectIdentifier('oid'), u'value1'),
+ ])
+
+ assert hash(name1) == hash(name2)
+ assert hash(name1) != hash(name3)
+
def test_repr(self):
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'cryptography.io'),
diff --git a/tests/test_x509_ext.py b/tests/test_x509_ext.py
index 1bc14620..8f469366 100644
--- a/tests/test_x509_ext.py
+++ b/tests/test_x509_ext.py
@@ -857,6 +857,20 @@ class TestExtensions(object):
assert ext is not None
assert isinstance(ext.value, x509.BasicConstraints)
+ def test_repr(self, backend):
+ cert = _load_cert(
+ os.path.join(
+ "x509", "custom", "basic_constraints_not_critical.pem"
+ ),
+ x509.load_pem_x509_certificate,
+ backend
+ )
+ assert repr(cert.extensions) == (
+ "<Extensions([<Extension(oid=<ObjectIdentifier(oid=2.5.29.19, name"
+ "=basicConstraints)>, critical=False, value=<BasicConstraints(ca=F"
+ "alse, path_length=None)>)>])>"
+ )
+
@pytest.mark.requires_backend_interface(interface=RSABackend)
@pytest.mark.requires_backend_interface(interface=X509Backend)