diff options
author | Paul Kehrer <paul.l.kehrer@gmail.com> | 2018-10-29 05:36:34 +0800 |
---|---|---|
committer | Alex Gaynor <alex.gaynor@gmail.com> | 2018-10-28 17:36:34 -0400 |
commit | e617c5a047b60204ab049a1ffe432310bb406055 (patch) | |
tree | e75b6af74c6c26beae2d96c0e69011cc52dfd179 | |
parent | 6e756aec9c91a8350208050b0a3775ea63891cfd (diff) | |
download | cryptography-e617c5a047b60204ab049a1ffe432310bb406055.tar.gz cryptography-e617c5a047b60204ab049a1ffe432310bb406055.tar.bz2 cryptography-e617c5a047b60204ab049a1ffe432310bb406055.zip |
OCSP response builder (#4485)
* ocsp response builder
* better prose
* review changes
-rw-r--r-- | docs/x509/ocsp.rst | 185 | ||||
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/backend.py | 104 | ||||
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/encode_asn1.py | 4 | ||||
-rw-r--r-- | src/cryptography/x509/ocsp.py | 181 | ||||
-rw-r--r-- | tests/x509/test_ocsp.py | 382 |
5 files changed, 846 insertions, 10 deletions
diff --git a/docs/x509/ocsp.rst b/docs/x509/ocsp.rst index 499e7a91..535ffdda 100644 --- a/docs/x509/ocsp.rst +++ b/docs/x509/ocsp.rst @@ -68,6 +68,24 @@ OCSP gP8L8mJMcCaY -----END CERTIFICATE----- """ + pem_responder_cert = b""" + -----BEGIN CERTIFICATE----- + MIIBPjCB5KADAgECAgQHW80VMAoGCCqGSM49BAMCMCcxCzAJBgNVBAYTAlVTMRgw + FgYDVQQDDA9DcnlwdG9ncmFwaHkgQ0EwHhcNMTgxMDA3MTIzNTEwWhcNMjgxMDA0 + MTIzNTEwWjAnMQswCQYDVQQGEwJVUzEYMBYGA1UEAwwPQ3J5cHRvZ3JhcGh5IENB + MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEbQ2E0N/E3R0zEG+qa+yAFXBY6Fte + QzyvFdq7EZHDktlyUllaVJBrbX1ItV0MlayFwwQPhZmuLPpQBzuVKyrUfTAKBggq + hkjOPQQDAgNJADBGAiEAo0NQRmfPvhWQpSvJzV+2Ag441Zeckk+bib7swduQIjIC + IQCqYD9pArB2SWfmhQCSZkNEATlsPIML8lvlSkbNcrmrqQ== + -----END CERTIFICATE----- + """ + pem_responder_key = b""" + -----BEGIN PRIVATE KEY----- + MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgO+vsRu8xDIVZE+xh + s8ESqJqcpJlwmj8CtF8HPHxrDSGhRANCAARtDYTQ38TdHTMQb6pr7IAVcFjoW15D + PK8V2rsRkcOS2XJSWVpUkGttfUi1XQyVrIXDBA+Fma4s+lAHO5UrKtR9 + -----END PRIVATE KEY----- + """ der_ocsp_req = ( b"0V0T0R0P0N0\t\x06\x05+\x0e\x03\x02\x1a\x05\x00\x04\x148\xcaF\x8c" b"\x07D\x8d\xf4\x81\x96\xc7mmLpQ\x9e`\xa7\xbd\x04\x14yu\xbb\x84:\xcb" @@ -183,6 +201,156 @@ Loading Responses OCSPResponseStatus.UNAUTHORIZED +Creating Responses +~~~~~~~~~~~~~~~~~~ + +.. class:: OCSPResponseBuilder + + .. versionadded:: 2.4 + + This class is used to create :class:`~cryptography.x509.ocsp.OCSPResponse` + objects. You cannot set ``produced_at`` on OCSP responses at this time. + Instead the field is set to current UTC time when calling ``sign``. For + unsuccessful statuses call the class method + :meth:`~cryptography.x509.ocsp.OCSPResponseBuilder.build_unsuccessful`. + + .. method:: add_response(cert, issuer, algorithm, cert_status, this_update, next_update, revocation_time, revocation_reason) + + This method adds status information about the certificate that was + requested to the response. + + :param cert: The :class:`~cryptography.x509.Certificate` whose validity + is being checked. + + :param issuer: The issuer :class:`~cryptography.x509.Certificate` of + the certificate that is being checked. + + :param algorithm: A + :class:`~cryptography.hazmat.primitives.hashes.HashAlgorithm` + instance. For OCSP only + :class:`~cryptography.hazmat.primitives.hashes.SHA1`, + :class:`~cryptography.hazmat.primitives.hashes.SHA224`, + :class:`~cryptography.hazmat.primitives.hashes.SHA256`, + :class:`~cryptography.hazmat.primitives.hashes.SHA384`, and + :class:`~cryptography.hazmat.primitives.hashes.SHA512` are allowed. + + :param cert_status: An item from the + :class:`~cryptography.x509.ocsp.OCSPCertStatus` enumeration. + + :param this_update: A naïve :class:`datetime.datetime` object + representing the most recent time in UTC at which the status being + indicated is known by the responder to be correct. + + :param next_update: A naïve :class:`datetime.datetime` object or + ``None``. The time in UTC at or before which newer information will + be available about the status of the certificate. + + :param revocation_time: A naïve :class:`datetime.datetime` object or + ``None`` if the ``cert`` is not revoked. The time in UTC at which + the certificate was revoked. + + :param revocation_reason: An item from the + :class:`~cryptography.x509.ReasonFlags` enumeration or ``None`` if + the ``cert`` is not revoked. + + .. method:: certificates(certs) + + Add additional certificates that should be used to verify the + signature on the response. This is typically used when the responder + utilizes an OCSP delegate. + + :param list certs: A list of :class:`~cryptography.x509.Certificate` + objects. + + .. method:: responder_id(encoding, responder_cert) + + Set the ``responderID`` on the OCSP response. This is the data a + client will use to determine what certificate signed the response. + + :param responder_cert: The :class:`~cryptography.x509.Certificate` + object for the certificate whose private key will sign the + OCSP response. If the certificate and key do not match an + error will be raised when calling ``sign``. + :param encoding: Either + :attr:`~cryptography.x509.ocsp.OCSPResponderEncoding.HASH` or + :attr:`~cryptography.x509.ocsp.OCSPResponderEncoding.NAME`. + + .. method:: add_extension(extension, critical) + + Adds an extension to the response. + + :param extension: An extension conforming to the + :class:`~cryptography.x509.ExtensionType` interface. + + :param critical: Set to ``True`` if the extension must be understood and + handled. + + .. method:: sign(private_key, algorithm) + + Creates the OCSP response that can then be serialized and sent to + clients. This method will create a + :attr:`~cryptography.x509.ocsp.OCSPResponseStatus.SUCCESSFUL` response. + + :param private_key: The + :class:`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey` + or + :class:`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey` + that will be used to sign the certificate. + + :param algorithm: The + :class:`~cryptography.hazmat.primitives.hashes.HashAlgorithm` that + will be used to generate the signature. + + :returns: A new :class:`~cryptography.x509.ocsp.OCSPResponse`. + + .. doctest:: + + >>> import datetime + >>> from cryptography.hazmat.backends import default_backend + >>> from cryptography.hazmat.primitives import hashes, serialization + >>> from cryptography.x509 import load_pem_x509_certificate, ocsp + >>> cert = load_pem_x509_certificate(pem_cert, default_backend()) + >>> issuer = load_pem_x509_certificate(pem_issuer, default_backend()) + >>> responder_cert = load_pem_x509_certificate(pem_responder_cert, default_backend()) + >>> responder_key = serialization.load_pem_private_key(pem_responder_key, None, default_backend()) + >>> builder = ocsp.OCSPResponseBuilder() + >>> # SHA1 is in this example because RFC 5019 mandates its use. + >>> builder = builder.add_response( + ... cert=cert, issuer=issuer, algorithm=hashes.SHA1(), + ... cert_status=ocsp.OCSPCertStatus.GOOD, + ... this_update=datetime.datetime.now(), + ... next_update=datetime.datetime.now(), + ... revocation_time=None, revocation_reason=None + ... ).responder_id( + ... ocsp.OCSPResponderEncoding.HASH, responder_cert + ... ) + >>> response = builder.sign(responder_key, hashes.SHA256()) + >>> response.certificate_status + <OCSPCertStatus.GOOD: 0> + + .. classmethod:: build_unsuccessful(response_status) + + Creates an unsigned OCSP response which can then be serialized and + sent to clients. ``build_unsuccessful`` may only be called with a + :class:`~cryptography.x509.ocsp.OCSPResponseStatus` that is not + ``SUCCESSFUL``. Since this is a class method note that no other + methods can or should be called as unsuccessful statuses do not + encode additional data. + + :returns: A new :class:`~cryptography.x509.ocsp.OCSPResponse`. + + .. doctest:: + + >>> from cryptography.hazmat.backends import default_backend + >>> from cryptography.hazmat.primitives import hashes, serialization + >>> from cryptography.x509 import load_pem_x509_certificate, ocsp + >>> response = ocsp.OCSPResponseBuilder.build_unsuccessful( + ... ocsp.OCSPResponseStatus.UNAUTHORIZED + ... ) + >>> response.response_status + <OCSPResponseStatus.UNAUTHORIZED: 6> + + Interfaces ~~~~~~~~~~ @@ -472,3 +640,20 @@ Interfaces .. attribute:: UNKNOWN The certificate being checked is not known to the OCSP responder. + +.. class:: OCSPResponderEncoding + + .. versionadded:: 2.4 + + An enumeration of ``responderID`` encodings that can be passed to + :meth:`~cryptography.x509.ocsp.OCSPResponseBuilder.responder_id`. + + .. attribute:: HASH + + Encode the hash of the public key whose corresponding private key + signed the response. + + .. attribute:: NAME + + Encode the X.509 ``Name`` of the certificate whose private key signed + the response. diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index 11d24b12..bd414fde 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -25,7 +25,9 @@ from cryptography.hazmat.backends.interfaces import ( from cryptography.hazmat.backends.openssl import aead from cryptography.hazmat.backends.openssl.ciphers import _CipherContext from cryptography.hazmat.backends.openssl.cmac import _CMACContext -from cryptography.hazmat.backends.openssl.decode_asn1 import _Integers +from cryptography.hazmat.backends.openssl.decode_asn1 import ( + _CRL_ENTRY_REASON_ENUM_TO_CODE, _Integers +) from cryptography.hazmat.backends.openssl.dh import ( _DHParameters, _DHPrivateKey, _DHPublicKey, _dh_params_dup ) @@ -38,6 +40,7 @@ from cryptography.hazmat.backends.openssl.ec import ( from cryptography.hazmat.backends.openssl.encode_asn1 import ( _CRL_ENTRY_EXTENSION_ENCODE_HANDLERS, _CRL_EXTENSION_ENCODE_HANDLERS, _EXTENSION_ENCODE_HANDLERS, + _OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS, _OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS, _encode_asn1_int_gc, _encode_asn1_str_gc, _encode_name_gc, _txt2obj_gc, ) @@ -69,6 +72,7 @@ from cryptography.hazmat.primitives.ciphers.modes import ( CBC, CFB, CFB8, CTR, ECB, GCM, OFB, XTS ) from cryptography.hazmat.primitives.kdf import scrypt +from cryptography.x509 import ocsp _MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) @@ -1466,6 +1470,104 @@ class Backend(object): ) return _OCSPRequest(self, ocsp_req) + def _create_ocsp_basic_response(self, builder, private_key, algorithm): + basic = self._lib.OCSP_BASICRESP_new() + self.openssl_assert(basic != self._ffi.NULL) + basic = self._ffi.gc(basic, self._lib.OCSP_BASICRESP_free) + evp_md = self._lib.EVP_get_digestbyname( + builder._response._algorithm.name.encode("ascii") + ) + self.openssl_assert(evp_md != self._ffi.NULL) + certid = self._lib.OCSP_cert_to_id( + evp_md, builder._response._cert._x509, + builder._response._issuer._x509 + ) + self.openssl_assert(certid != self._ffi.NULL) + certid = self._ffi.gc(certid, self._lib.OCSP_CERTID_free) + if builder._response._revocation_reason is None: + reason = -1 + else: + reason = _CRL_ENTRY_REASON_ENUM_TO_CODE[ + builder._response._revocation_reason + ] + if builder._response._revocation_time is None: + rev_time = self._ffi.NULL + else: + rev_time = self._create_asn1_time( + builder._response._revocation_time + ) + + next_update = self._ffi.NULL + if builder._response._next_update is not None: + next_update = self._create_asn1_time( + builder._response._next_update + ) + + this_update = self._create_asn1_time(builder._response._this_update) + + res = self._lib.OCSP_basic_add1_status( + basic, + certid, + builder._response._cert_status.value, + reason, + rev_time, + this_update, + next_update + ) + self.openssl_assert(res != self._ffi.NULL) + # okay, now sign the basic structure + evp_md = self._lib.EVP_get_digestbyname(algorithm.name.encode("ascii")) + self.openssl_assert(evp_md != self._ffi.NULL) + responder_cert, responder_encoding = builder._responder_id + flags = self._lib.OCSP_NOCERTS + if responder_encoding is ocsp.OCSPResponderEncoding.HASH: + flags |= self._lib.OCSP_RESPID_KEY + + if builder._certs is not None: + for cert in builder._certs: + res = self._lib.OCSP_basic_add1_cert(basic, cert._x509) + self.openssl_assert(res == 1) + + self._create_x509_extensions( + extensions=builder._extensions, + handlers=_OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS, + x509_obj=basic, + add_func=self._lib.OCSP_BASICRESP_add_ext, + gc=True, + ) + + res = self._lib.OCSP_basic_sign( + basic, responder_cert._x509, private_key._evp_pkey, + evp_md, self._ffi.NULL, flags + ) + if res != 1: + errors = self._consume_errors() + self.openssl_assert( + errors[0]._lib_reason_match( + self._lib.ERR_LIB_X509, + self._lib.X509_R_KEY_VALUES_MISMATCH + ) + ) + raise ValueError("responder_cert must be signed by private_key") + + return basic + + def create_ocsp_response(self, response_status, builder, private_key, + algorithm): + if response_status is ocsp.OCSPResponseStatus.SUCCESSFUL: + basic = self._create_ocsp_basic_response( + builder, private_key, algorithm + ) + else: + basic = self._ffi.NULL + + ocsp_resp = self._lib.OCSP_response_create( + response_status.value, basic + ) + self.openssl_assert(ocsp_resp != self._ffi.NULL) + ocsp_resp = self._ffi.gc(ocsp_resp, self._lib.OCSP_RESPONSE_free) + return _OCSPResponse(self, ocsp_resp) + def elliptic_curve_exchange_algorithm_supported(self, algorithm, curve): return ( self.elliptic_curve_supported(curve) and diff --git a/src/cryptography/hazmat/backends/openssl/encode_asn1.py b/src/cryptography/hazmat/backends/openssl/encode_asn1.py index c8b41a81..6ff1a9a4 100644 --- a/src/cryptography/hazmat/backends/openssl/encode_asn1.py +++ b/src/cryptography/hazmat/backends/openssl/encode_asn1.py @@ -614,3 +614,7 @@ _CRL_ENTRY_EXTENSION_ENCODE_HANDLERS = { _OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS = { OCSPExtensionOID.NONCE: _encode_nonce, } + +_OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS = { + OCSPExtensionOID.NONCE: _encode_nonce, +} diff --git a/src/cryptography/x509/ocsp.py b/src/cryptography/x509/ocsp.py index c89f12ce..2b0b1dc3 100644 --- a/src/cryptography/x509/ocsp.py +++ b/src/cryptography/x509/ocsp.py @@ -5,13 +5,16 @@ from __future__ import absolute_import, division, print_function import abc +import datetime from enum import Enum import six from cryptography import x509 from cryptography.hazmat.primitives import hashes -from cryptography.x509.base import _reject_duplicate_extension +from cryptography.x509.base import ( + _UNIX_EPOCH, _convert_to_naive_utc_time, _reject_duplicate_extension +) _OIDS_TO_HASH = { @@ -23,6 +26,11 @@ _OIDS_TO_HASH = { } +class OCSPResponderEncoding(Enum): + HASH = "By Hash" + NAME = "By Name" + + class OCSPResponseStatus(Enum): SUCCESSFUL = 0 MALFORMED_REQUEST = 1 @@ -33,6 +41,17 @@ class OCSPResponseStatus(Enum): _RESPONSE_STATUS_TO_ENUM = dict((x.value, x) for x in OCSPResponseStatus) +_ALLOWED_HASHES = ( + hashes.SHA1, hashes.SHA224, hashes.SHA256, + hashes.SHA384, hashes.SHA512 +) + + +def _verify_algorithm(algorithm): + if not isinstance(algorithm, _ALLOWED_HASHES): + raise ValueError( + "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512" + ) class OCSPCertStatus(Enum): @@ -63,14 +82,7 @@ class OCSPRequestBuilder(object): if self._request is not None: raise ValueError("Only one certificate can be added to a request") - allowed_hashes = ( - hashes.SHA1, hashes.SHA224, hashes.SHA256, - hashes.SHA384, hashes.SHA512 - ) - if not isinstance(algorithm, allowed_hashes): - raise ValueError( - "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512" - ) + _verify_algorithm(algorithm) if ( not isinstance(cert, x509.Certificate) or not isinstance(issuer, x509.Certificate) @@ -98,6 +110,157 @@ class OCSPRequestBuilder(object): return backend.create_ocsp_request(self) +class _SingleResponse(object): + def __init__(self, cert, issuer, algorithm, cert_status, this_update, + next_update, revocation_time, revocation_reason): + if ( + not isinstance(cert, x509.Certificate) or + not isinstance(issuer, x509.Certificate) + ): + raise TypeError("cert and issuer must be a Certificate") + + _verify_algorithm(algorithm) + if not isinstance(this_update, datetime.datetime): + raise TypeError("this_update must be a datetime object") + if ( + next_update is not None and + not isinstance(next_update, datetime.datetime) + ): + raise TypeError("next_update must be a datetime object or None") + + self._cert = cert + self._issuer = issuer + self._algorithm = algorithm + self._this_update = this_update + self._next_update = next_update + + if not isinstance(cert_status, OCSPCertStatus): + raise TypeError( + "cert_status must be an item from the OCSPCertStatus enum" + ) + if cert_status is not OCSPCertStatus.REVOKED: + if revocation_time is not None: + raise ValueError( + "revocation_time can only be provided if the certificate " + "is revoked" + ) + if revocation_reason is not None: + raise ValueError( + "revocation_reason can only be provided if the certificate" + " is revoked" + ) + else: + if not isinstance(revocation_time, datetime.datetime): + raise TypeError("revocation_time must be a datetime object") + + revocation_time = _convert_to_naive_utc_time(revocation_time) + if revocation_time <= _UNIX_EPOCH: + raise ValueError('The revocation_time must be after the unix' + ' epoch (1970 January 1).') + + if ( + revocation_reason is not None and + not isinstance(revocation_reason, x509.ReasonFlags) + ): + raise TypeError( + "revocation_reason must be an item from the ReasonFlags " + "enum or None" + ) + + self._cert_status = cert_status + self._revocation_time = revocation_time + self._revocation_reason = revocation_reason + + +class OCSPResponseBuilder(object): + def __init__(self, response=None, responder_id=None, certs=None, + extensions=[]): + self._response = response + self._responder_id = responder_id + self._certs = certs + self._extensions = extensions + + def add_response(self, cert, issuer, algorithm, cert_status, this_update, + next_update, revocation_time, revocation_reason): + if self._response is not None: + raise ValueError("Only one response per OCSPResponse.") + + singleresp = _SingleResponse( + cert, issuer, algorithm, cert_status, this_update, next_update, + revocation_time, revocation_reason + ) + return OCSPResponseBuilder( + singleresp, self._responder_id, + self._certs, self._extensions, + ) + + def responder_id(self, encoding, responder_cert): + if self._responder_id is not None: + raise ValueError("responder_id can only be set once") + if not isinstance(responder_cert, x509.Certificate): + raise TypeError("responder_cert must be a Certificate") + if not isinstance(encoding, OCSPResponderEncoding): + raise TypeError( + "encoding must be an element from OCSPResponderEncoding" + ) + + return OCSPResponseBuilder( + self._response, (responder_cert, encoding), + self._certs, self._extensions, + ) + + def certificates(self, certs): + if self._certs is not None: + raise ValueError("certificates may only be set once") + certs = list(certs) + if len(certs) == 0: + raise ValueError("certs must not be an empty list") + if not all(isinstance(x, x509.Certificate) for x in certs): + raise TypeError("certs must be a list of Certificates") + return OCSPResponseBuilder( + self._response, self._responder_id, + certs, self._extensions, + ) + + def add_extension(self, extension, critical): + if not isinstance(extension, x509.ExtensionType): + raise TypeError("extension must be an ExtensionType") + + extension = x509.Extension(extension.oid, critical, extension) + _reject_duplicate_extension(extension, self._extensions) + + return OCSPResponseBuilder( + self._response, self._responder_id, + self._certs, self._extensions + [extension], + ) + + def sign(self, private_key, algorithm): + from cryptography.hazmat.backends.openssl.backend import backend + if self._response is None: + raise ValueError("You must add a response before signing") + if self._responder_id is None: + raise ValueError("You must add a responder_id before signing") + + if not isinstance(algorithm, hashes.HashAlgorithm): + raise TypeError("Algorithm must be a registered hash algorithm.") + + return backend.create_ocsp_response( + OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm + ) + + @classmethod + def build_unsuccessful(cls, response_status): + from cryptography.hazmat.backends.openssl.backend import backend + if not isinstance(response_status, OCSPResponseStatus): + raise TypeError( + "response_status must be an item from OCSPResponseStatus" + ) + if response_status is OCSPResponseStatus.SUCCESSFUL: + raise ValueError("response_status cannot be SUCCESSFUL") + + return backend.create_ocsp_response(response_status, None, None, None) + + @six.add_metaclass(abc.ABCMeta) class OCSPRequest(object): @abc.abstractproperty diff --git a/tests/x509/test_ocsp.py b/tests/x509/test_ocsp.py index 3ee6a26e..fad48dab 100644 --- a/tests/x509/test_ocsp.py +++ b/tests/x509/test_ocsp.py @@ -13,10 +13,12 @@ import pytest from cryptography import x509 from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 from cryptography.x509 import ocsp from .test_x509 import _load_cert +from ..hazmat.primitives.fixtures_ec import EC_KEY_SECP256R1 from ..utils import load_vectors_from_file @@ -43,6 +45,33 @@ def _cert_and_issuer(): return cert, issuer +def _generate_root(): + from cryptography.hazmat.backends.openssl.backend import backend + + private_key = EC_KEY_SECP256R1.private_key(backend) + subject = x509.Name([ + x509.NameAttribute(x509.NameOID.COUNTRY_NAME, u'US'), + x509.NameAttribute(x509.NameOID.COMMON_NAME, u'Cryptography CA'), + ]) + + builder = x509.CertificateBuilder().serial_number( + 123456789 + ).issuer_name( + subject + ).subject_name( + subject + ).public_key( + private_key.public_key() + ).not_valid_before( + datetime.datetime.now() + ).not_valid_after( + datetime.datetime.now() + datetime.timedelta(days=3650) + ) + + cert = builder.sign(private_key, hashes.SHA256(), backend) + return cert, private_key + + class TestOCSPRequest(object): def test_bad_request(self): with pytest.raises(ValueError): @@ -182,6 +211,359 @@ class TestOCSPRequestBuilder(object): assert req.extensions[0].critical is critical +class TestOCSPResponseBuilder(object): + def test_add_response_twice(self): + cert, issuer = _cert_and_issuer() + time = datetime.datetime.now() + builder = ocsp.OCSPResponseBuilder() + builder = builder.add_response( + cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD, time, + time, None, None + ) + with pytest.raises(ValueError): + builder.add_response( + cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD, time, + time, None, None + ) + + def test_invalid_add_response(self): + cert, issuer = _cert_and_issuer() + time = datetime.datetime.utcnow() + reason = x509.ReasonFlags.cessation_of_operation + builder = ocsp.OCSPResponseBuilder() + with pytest.raises(TypeError): + builder.add_response( + 'bad', issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD, + time, time, None, None + ) + with pytest.raises(TypeError): + builder.add_response( + cert, 'bad', hashes.SHA256(), ocsp.OCSPCertStatus.GOOD, + time, time, None, None + ) + with pytest.raises(ValueError): + builder.add_response( + cert, issuer, 'notahash', ocsp.OCSPCertStatus.GOOD, + time, time, None, None + ) + with pytest.raises(TypeError): + builder.add_response( + cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD, + 'bad', time, None, None + ) + with pytest.raises(TypeError): + builder.add_response( + cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD, + time, 'bad', None, None + ) + + with pytest.raises(TypeError): + builder.add_response( + cert, issuer, hashes.SHA256(), 0, time, time, None, None + ) + with pytest.raises(ValueError): + builder.add_response( + cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD, + time, time, time, None + ) + with pytest.raises(ValueError): + builder.add_response( + cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD, + time, time, None, reason + ) + with pytest.raises(TypeError): + builder.add_response( + cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.REVOKED, + time, time, None, reason + ) + with pytest.raises(TypeError): + builder.add_response( + cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.REVOKED, + time, time, time, 0 + ) + with pytest.raises(ValueError): + builder.add_response( + cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.REVOKED, + time, time, time - datetime.timedelta(days=36500), None + ) + + def test_invalid_certificates(self): + builder = ocsp.OCSPResponseBuilder() + with pytest.raises(ValueError): + builder.certificates([]) + with pytest.raises(TypeError): + builder.certificates(['notacert']) + with pytest.raises(TypeError): + builder.certificates('invalid') + + _, issuer = _cert_and_issuer() + builder = builder.certificates([issuer]) + with pytest.raises(ValueError): + builder.certificates([issuer]) + + def test_invalid_responder_id(self): + builder = ocsp.OCSPResponseBuilder() + cert, _ = _cert_and_issuer() + with pytest.raises(TypeError): + builder.responder_id(ocsp.OCSPResponderEncoding.HASH, 'invalid') + with pytest.raises(TypeError): + builder.responder_id('notanenum', cert) + + builder = builder.responder_id(ocsp.OCSPResponderEncoding.NAME, cert) + with pytest.raises(ValueError): + builder.responder_id(ocsp.OCSPResponderEncoding.NAME, cert) + + def test_invalid_extension(self): + builder = ocsp.OCSPResponseBuilder() + with pytest.raises(TypeError): + builder.add_extension("notanextension", True) + + def test_sign_no_response(self): + builder = ocsp.OCSPResponseBuilder() + root_cert, private_key = _generate_root() + builder = builder.responder_id( + ocsp.OCSPResponderEncoding.NAME, root_cert + ) + with pytest.raises(ValueError): + builder.sign(private_key, hashes.SHA256()) + + def test_sign_no_responder_id(self): + builder = ocsp.OCSPResponseBuilder() + cert, issuer = _cert_and_issuer() + _, private_key = _generate_root() + current_time = datetime.datetime.utcnow().replace(microsecond=0) + this_update = current_time - datetime.timedelta(days=1) + next_update = this_update + datetime.timedelta(days=7) + builder = builder.add_response( + cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update, + next_update, None, None + ) + with pytest.raises(ValueError): + builder.sign(private_key, hashes.SHA256()) + + def test_sign_invalid_hash_algorithm(self): + builder = ocsp.OCSPResponseBuilder() + cert, issuer = _cert_and_issuer() + root_cert, private_key = _generate_root() + current_time = datetime.datetime.utcnow().replace(microsecond=0) + this_update = current_time - datetime.timedelta(days=1) + next_update = this_update + datetime.timedelta(days=7) + builder = builder.responder_id( + ocsp.OCSPResponderEncoding.NAME, root_cert + ).add_response( + cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update, + next_update, None, None + ) + with pytest.raises(TypeError): + builder.sign(private_key, 'notahash') + + def test_sign_good_cert(self): + builder = ocsp.OCSPResponseBuilder() + cert, issuer = _cert_and_issuer() + root_cert, private_key = _generate_root() + current_time = datetime.datetime.utcnow().replace(microsecond=0) + this_update = current_time - datetime.timedelta(days=1) + next_update = this_update + datetime.timedelta(days=7) + builder = builder.responder_id( + ocsp.OCSPResponderEncoding.NAME, root_cert + ).add_response( + cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update, + next_update, None, None + ) + resp = builder.sign(private_key, hashes.SHA256()) + assert resp.responder_name == root_cert.subject + assert resp.responder_key_hash is None + assert (current_time - resp.produced_at).total_seconds() < 10 + assert (resp.signature_algorithm_oid == + x509.SignatureAlgorithmOID.ECDSA_WITH_SHA256) + assert resp.certificate_status == ocsp.OCSPCertStatus.GOOD + assert resp.revocation_time is None + assert resp.revocation_reason is None + assert resp.this_update == this_update + assert resp.next_update == next_update + private_key.public_key().verify( + resp.signature, resp.tbs_response_bytes, ec.ECDSA(hashes.SHA256()) + ) + + def test_sign_revoked_cert(self): + builder = ocsp.OCSPResponseBuilder() + cert, issuer = _cert_and_issuer() + root_cert, private_key = _generate_root() + current_time = datetime.datetime.utcnow().replace(microsecond=0) + this_update = current_time - datetime.timedelta(days=1) + next_update = this_update + datetime.timedelta(days=7) + revoked_date = this_update - datetime.timedelta(days=300) + builder = builder.responder_id( + ocsp.OCSPResponderEncoding.NAME, root_cert + ).add_response( + cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.REVOKED, + this_update, next_update, revoked_date, None + ) + resp = builder.sign(private_key, hashes.SHA256()) + assert resp.certificate_status == ocsp.OCSPCertStatus.REVOKED + assert resp.revocation_time == revoked_date + assert resp.revocation_reason is None + assert resp.this_update == this_update + assert resp.next_update == next_update + private_key.public_key().verify( + resp.signature, resp.tbs_response_bytes, ec.ECDSA(hashes.SHA256()) + ) + + def test_sign_with_appended_certs(self): + builder = ocsp.OCSPResponseBuilder() + cert, issuer = _cert_and_issuer() + root_cert, private_key = _generate_root() + current_time = datetime.datetime.utcnow().replace(microsecond=0) + this_update = current_time - datetime.timedelta(days=1) + next_update = this_update + datetime.timedelta(days=7) + builder = builder.responder_id( + ocsp.OCSPResponderEncoding.NAME, root_cert + ).add_response( + cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update, + next_update, None, None + ).certificates( + [root_cert] + ) + resp = builder.sign(private_key, hashes.SHA256()) + assert resp.certificates == [root_cert] + + def test_sign_revoked_no_next_update(self): + builder = ocsp.OCSPResponseBuilder() + cert, issuer = _cert_and_issuer() + root_cert, private_key = _generate_root() + current_time = datetime.datetime.utcnow().replace(microsecond=0) + this_update = current_time - datetime.timedelta(days=1) + revoked_date = this_update - datetime.timedelta(days=300) + builder = builder.responder_id( + ocsp.OCSPResponderEncoding.NAME, root_cert + ).add_response( + cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.REVOKED, + this_update, None, revoked_date, None + ) + resp = builder.sign(private_key, hashes.SHA256()) + assert resp.certificate_status == ocsp.OCSPCertStatus.REVOKED + assert resp.revocation_time == revoked_date + assert resp.revocation_reason is None + assert resp.this_update == this_update + assert resp.next_update is None + private_key.public_key().verify( + resp.signature, resp.tbs_response_bytes, ec.ECDSA(hashes.SHA256()) + ) + + def test_sign_revoked_with_reason(self): + builder = ocsp.OCSPResponseBuilder() + cert, issuer = _cert_and_issuer() + root_cert, private_key = _generate_root() + current_time = datetime.datetime.utcnow().replace(microsecond=0) + this_update = current_time - datetime.timedelta(days=1) + next_update = this_update + datetime.timedelta(days=7) + revoked_date = this_update - datetime.timedelta(days=300) + builder = builder.responder_id( + ocsp.OCSPResponderEncoding.NAME, root_cert + ).add_response( + cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.REVOKED, + this_update, next_update, revoked_date, + x509.ReasonFlags.key_compromise + ) + resp = builder.sign(private_key, hashes.SHA256()) + assert resp.certificate_status == ocsp.OCSPCertStatus.REVOKED + assert resp.revocation_time == revoked_date + assert resp.revocation_reason is x509.ReasonFlags.key_compromise + assert resp.this_update == this_update + assert resp.next_update == next_update + private_key.public_key().verify( + resp.signature, resp.tbs_response_bytes, ec.ECDSA(hashes.SHA256()) + ) + + def test_sign_responder_id_key_hash(self): + builder = ocsp.OCSPResponseBuilder() + cert, issuer = _cert_and_issuer() + root_cert, private_key = _generate_root() + current_time = datetime.datetime.utcnow().replace(microsecond=0) + this_update = current_time - datetime.timedelta(days=1) + next_update = this_update + datetime.timedelta(days=7) + builder = builder.responder_id( + ocsp.OCSPResponderEncoding.HASH, root_cert + ).add_response( + cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update, + next_update, None, None + ) + resp = builder.sign(private_key, hashes.SHA256()) + assert resp.responder_name is None + assert resp.responder_key_hash == ( + b'\x8ca\x94\xe0\x948\xed\x89\xd8\xd4N\x89p\t\xd6\xf9^_\xec}' + ) + private_key.public_key().verify( + resp.signature, resp.tbs_response_bytes, ec.ECDSA(hashes.SHA256()) + ) + + def test_invalid_sign_responder_cert_does_not_match_private_key(self): + builder = ocsp.OCSPResponseBuilder() + cert, issuer = _cert_and_issuer() + root_cert, private_key = _generate_root() + current_time = datetime.datetime.utcnow().replace(microsecond=0) + this_update = current_time - datetime.timedelta(days=1) + next_update = this_update + datetime.timedelta(days=7) + builder = builder.responder_id( + ocsp.OCSPResponderEncoding.HASH, root_cert + ).add_response( + cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update, + next_update, None, None + ) + from cryptography.hazmat.backends.openssl.backend import backend + diff_key = ec.generate_private_key(ec.SECP256R1(), backend) + with pytest.raises(ValueError): + builder.sign(diff_key, hashes.SHA256()) + + def test_sign_with_extension(self): + builder = ocsp.OCSPResponseBuilder() + cert, issuer = _cert_and_issuer() + root_cert, private_key = _generate_root() + current_time = datetime.datetime.utcnow().replace(microsecond=0) + this_update = current_time - datetime.timedelta(days=1) + next_update = this_update + datetime.timedelta(days=7) + builder = builder.responder_id( + ocsp.OCSPResponderEncoding.HASH, root_cert + ).add_response( + cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update, + next_update, None, None + ).add_extension( + x509.OCSPNonce(b"012345"), False + ) + resp = builder.sign(private_key, hashes.SHA256()) + assert len(resp.extensions) == 1 + assert resp.extensions[0].value == x509.OCSPNonce(b"012345") + assert resp.extensions[0].critical is False + private_key.public_key().verify( + resp.signature, resp.tbs_response_bytes, ec.ECDSA(hashes.SHA256()) + ) + + @pytest.mark.parametrize( + ("status", "der"), + [ + (ocsp.OCSPResponseStatus.MALFORMED_REQUEST, b"0\x03\n\x01\x01"), + (ocsp.OCSPResponseStatus.INTERNAL_ERROR, b"0\x03\n\x01\x02"), + (ocsp.OCSPResponseStatus.TRY_LATER, b"0\x03\n\x01\x03"), + (ocsp.OCSPResponseStatus.SIG_REQUIRED, b"0\x03\n\x01\x05"), + (ocsp.OCSPResponseStatus.UNAUTHORIZED, b"0\x03\n\x01\x06"), + ] + ) + def test_build_non_successful_statuses(self, status, der): + resp = ocsp.OCSPResponseBuilder.build_unsuccessful(status) + assert resp.response_status is status + assert resp.public_bytes(serialization.Encoding.DER) == der + + def test_invalid_build_not_a_status(self): + with pytest.raises(TypeError): + ocsp.OCSPResponseBuilder.build_unsuccessful("notastatus") + + def test_invalid_build_successful_status(self): + with pytest.raises(ValueError): + ocsp.OCSPResponseBuilder.build_unsuccessful( + ocsp.OCSPResponseStatus.SUCCESSFUL + ) + + class TestOCSPResponse(object): def test_bad_response(self): with pytest.raises(ValueError): |