diff options
author | Paul Kehrer <paul.l.kehrer@gmail.com> | 2018-10-29 05:36:34 +0800 |
---|---|---|
committer | Alex Gaynor <alex.gaynor@gmail.com> | 2018-10-28 17:36:34 -0400 |
commit | e617c5a047b60204ab049a1ffe432310bb406055 (patch) | |
tree | e75b6af74c6c26beae2d96c0e69011cc52dfd179 /src | |
parent | 6e756aec9c91a8350208050b0a3775ea63891cfd (diff) | |
download | cryptography-e617c5a047b60204ab049a1ffe432310bb406055.tar.gz cryptography-e617c5a047b60204ab049a1ffe432310bb406055.tar.bz2 cryptography-e617c5a047b60204ab049a1ffe432310bb406055.zip |
OCSP response builder (#4485)
* ocsp response builder
* better prose
* review changes
Diffstat (limited to 'src')
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/backend.py | 104 | ||||
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/encode_asn1.py | 4 | ||||
-rw-r--r-- | src/cryptography/x509/ocsp.py | 181 |
3 files changed, 279 insertions, 10 deletions
diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index 11d24b12..bd414fde 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -25,7 +25,9 @@ from cryptography.hazmat.backends.interfaces import ( from cryptography.hazmat.backends.openssl import aead from cryptography.hazmat.backends.openssl.ciphers import _CipherContext from cryptography.hazmat.backends.openssl.cmac import _CMACContext -from cryptography.hazmat.backends.openssl.decode_asn1 import _Integers +from cryptography.hazmat.backends.openssl.decode_asn1 import ( + _CRL_ENTRY_REASON_ENUM_TO_CODE, _Integers +) from cryptography.hazmat.backends.openssl.dh import ( _DHParameters, _DHPrivateKey, _DHPublicKey, _dh_params_dup ) @@ -38,6 +40,7 @@ from cryptography.hazmat.backends.openssl.ec import ( from cryptography.hazmat.backends.openssl.encode_asn1 import ( _CRL_ENTRY_EXTENSION_ENCODE_HANDLERS, _CRL_EXTENSION_ENCODE_HANDLERS, _EXTENSION_ENCODE_HANDLERS, + _OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS, _OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS, _encode_asn1_int_gc, _encode_asn1_str_gc, _encode_name_gc, _txt2obj_gc, ) @@ -69,6 +72,7 @@ from cryptography.hazmat.primitives.ciphers.modes import ( CBC, CFB, CFB8, CTR, ECB, GCM, OFB, XTS ) from cryptography.hazmat.primitives.kdf import scrypt +from cryptography.x509 import ocsp _MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) @@ -1466,6 +1470,104 @@ class Backend(object): ) return _OCSPRequest(self, ocsp_req) + def _create_ocsp_basic_response(self, builder, private_key, algorithm): + basic = self._lib.OCSP_BASICRESP_new() + self.openssl_assert(basic != self._ffi.NULL) + basic = self._ffi.gc(basic, self._lib.OCSP_BASICRESP_free) + evp_md = self._lib.EVP_get_digestbyname( + builder._response._algorithm.name.encode("ascii") + ) + self.openssl_assert(evp_md != self._ffi.NULL) + certid = self._lib.OCSP_cert_to_id( + evp_md, builder._response._cert._x509, + builder._response._issuer._x509 + ) + self.openssl_assert(certid != self._ffi.NULL) + certid = self._ffi.gc(certid, self._lib.OCSP_CERTID_free) + if builder._response._revocation_reason is None: + reason = -1 + else: + reason = _CRL_ENTRY_REASON_ENUM_TO_CODE[ + builder._response._revocation_reason + ] + if builder._response._revocation_time is None: + rev_time = self._ffi.NULL + else: + rev_time = self._create_asn1_time( + builder._response._revocation_time + ) + + next_update = self._ffi.NULL + if builder._response._next_update is not None: + next_update = self._create_asn1_time( + builder._response._next_update + ) + + this_update = self._create_asn1_time(builder._response._this_update) + + res = self._lib.OCSP_basic_add1_status( + basic, + certid, + builder._response._cert_status.value, + reason, + rev_time, + this_update, + next_update + ) + self.openssl_assert(res != self._ffi.NULL) + # okay, now sign the basic structure + evp_md = self._lib.EVP_get_digestbyname(algorithm.name.encode("ascii")) + self.openssl_assert(evp_md != self._ffi.NULL) + responder_cert, responder_encoding = builder._responder_id + flags = self._lib.OCSP_NOCERTS + if responder_encoding is ocsp.OCSPResponderEncoding.HASH: + flags |= self._lib.OCSP_RESPID_KEY + + if builder._certs is not None: + for cert in builder._certs: + res = self._lib.OCSP_basic_add1_cert(basic, cert._x509) + self.openssl_assert(res == 1) + + self._create_x509_extensions( + extensions=builder._extensions, + handlers=_OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS, + x509_obj=basic, + add_func=self._lib.OCSP_BASICRESP_add_ext, + gc=True, + ) + + res = self._lib.OCSP_basic_sign( + basic, responder_cert._x509, private_key._evp_pkey, + evp_md, self._ffi.NULL, flags + ) + if res != 1: + errors = self._consume_errors() + self.openssl_assert( + errors[0]._lib_reason_match( + self._lib.ERR_LIB_X509, + self._lib.X509_R_KEY_VALUES_MISMATCH + ) + ) + raise ValueError("responder_cert must be signed by private_key") + + return basic + + def create_ocsp_response(self, response_status, builder, private_key, + algorithm): + if response_status is ocsp.OCSPResponseStatus.SUCCESSFUL: + basic = self._create_ocsp_basic_response( + builder, private_key, algorithm + ) + else: + basic = self._ffi.NULL + + ocsp_resp = self._lib.OCSP_response_create( + response_status.value, basic + ) + self.openssl_assert(ocsp_resp != self._ffi.NULL) + ocsp_resp = self._ffi.gc(ocsp_resp, self._lib.OCSP_RESPONSE_free) + return _OCSPResponse(self, ocsp_resp) + def elliptic_curve_exchange_algorithm_supported(self, algorithm, curve): return ( self.elliptic_curve_supported(curve) and diff --git a/src/cryptography/hazmat/backends/openssl/encode_asn1.py b/src/cryptography/hazmat/backends/openssl/encode_asn1.py index c8b41a81..6ff1a9a4 100644 --- a/src/cryptography/hazmat/backends/openssl/encode_asn1.py +++ b/src/cryptography/hazmat/backends/openssl/encode_asn1.py @@ -614,3 +614,7 @@ _CRL_ENTRY_EXTENSION_ENCODE_HANDLERS = { _OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS = { OCSPExtensionOID.NONCE: _encode_nonce, } + +_OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS = { + OCSPExtensionOID.NONCE: _encode_nonce, +} diff --git a/src/cryptography/x509/ocsp.py b/src/cryptography/x509/ocsp.py index c89f12ce..2b0b1dc3 100644 --- a/src/cryptography/x509/ocsp.py +++ b/src/cryptography/x509/ocsp.py @@ -5,13 +5,16 @@ from __future__ import absolute_import, division, print_function import abc +import datetime from enum import Enum import six from cryptography import x509 from cryptography.hazmat.primitives import hashes -from cryptography.x509.base import _reject_duplicate_extension +from cryptography.x509.base import ( + _UNIX_EPOCH, _convert_to_naive_utc_time, _reject_duplicate_extension +) _OIDS_TO_HASH = { @@ -23,6 +26,11 @@ _OIDS_TO_HASH = { } +class OCSPResponderEncoding(Enum): + HASH = "By Hash" + NAME = "By Name" + + class OCSPResponseStatus(Enum): SUCCESSFUL = 0 MALFORMED_REQUEST = 1 @@ -33,6 +41,17 @@ class OCSPResponseStatus(Enum): _RESPONSE_STATUS_TO_ENUM = dict((x.value, x) for x in OCSPResponseStatus) +_ALLOWED_HASHES = ( + hashes.SHA1, hashes.SHA224, hashes.SHA256, + hashes.SHA384, hashes.SHA512 +) + + +def _verify_algorithm(algorithm): + if not isinstance(algorithm, _ALLOWED_HASHES): + raise ValueError( + "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512" + ) class OCSPCertStatus(Enum): @@ -63,14 +82,7 @@ class OCSPRequestBuilder(object): if self._request is not None: raise ValueError("Only one certificate can be added to a request") - allowed_hashes = ( - hashes.SHA1, hashes.SHA224, hashes.SHA256, - hashes.SHA384, hashes.SHA512 - ) - if not isinstance(algorithm, allowed_hashes): - raise ValueError( - "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512" - ) + _verify_algorithm(algorithm) if ( not isinstance(cert, x509.Certificate) or not isinstance(issuer, x509.Certificate) @@ -98,6 +110,157 @@ class OCSPRequestBuilder(object): return backend.create_ocsp_request(self) +class _SingleResponse(object): + def __init__(self, cert, issuer, algorithm, cert_status, this_update, + next_update, revocation_time, revocation_reason): + if ( + not isinstance(cert, x509.Certificate) or + not isinstance(issuer, x509.Certificate) + ): + raise TypeError("cert and issuer must be a Certificate") + + _verify_algorithm(algorithm) + if not isinstance(this_update, datetime.datetime): + raise TypeError("this_update must be a datetime object") + if ( + next_update is not None and + not isinstance(next_update, datetime.datetime) + ): + raise TypeError("next_update must be a datetime object or None") + + self._cert = cert + self._issuer = issuer + self._algorithm = algorithm + self._this_update = this_update + self._next_update = next_update + + if not isinstance(cert_status, OCSPCertStatus): + raise TypeError( + "cert_status must be an item from the OCSPCertStatus enum" + ) + if cert_status is not OCSPCertStatus.REVOKED: + if revocation_time is not None: + raise ValueError( + "revocation_time can only be provided if the certificate " + "is revoked" + ) + if revocation_reason is not None: + raise ValueError( + "revocation_reason can only be provided if the certificate" + " is revoked" + ) + else: + if not isinstance(revocation_time, datetime.datetime): + raise TypeError("revocation_time must be a datetime object") + + revocation_time = _convert_to_naive_utc_time(revocation_time) + if revocation_time <= _UNIX_EPOCH: + raise ValueError('The revocation_time must be after the unix' + ' epoch (1970 January 1).') + + if ( + revocation_reason is not None and + not isinstance(revocation_reason, x509.ReasonFlags) + ): + raise TypeError( + "revocation_reason must be an item from the ReasonFlags " + "enum or None" + ) + + self._cert_status = cert_status + self._revocation_time = revocation_time + self._revocation_reason = revocation_reason + + +class OCSPResponseBuilder(object): + def __init__(self, response=None, responder_id=None, certs=None, + extensions=[]): + self._response = response + self._responder_id = responder_id + self._certs = certs + self._extensions = extensions + + def add_response(self, cert, issuer, algorithm, cert_status, this_update, + next_update, revocation_time, revocation_reason): + if self._response is not None: + raise ValueError("Only one response per OCSPResponse.") + + singleresp = _SingleResponse( + cert, issuer, algorithm, cert_status, this_update, next_update, + revocation_time, revocation_reason + ) + return OCSPResponseBuilder( + singleresp, self._responder_id, + self._certs, self._extensions, + ) + + def responder_id(self, encoding, responder_cert): + if self._responder_id is not None: + raise ValueError("responder_id can only be set once") + if not isinstance(responder_cert, x509.Certificate): + raise TypeError("responder_cert must be a Certificate") + if not isinstance(encoding, OCSPResponderEncoding): + raise TypeError( + "encoding must be an element from OCSPResponderEncoding" + ) + + return OCSPResponseBuilder( + self._response, (responder_cert, encoding), + self._certs, self._extensions, + ) + + def certificates(self, certs): + if self._certs is not None: + raise ValueError("certificates may only be set once") + certs = list(certs) + if len(certs) == 0: + raise ValueError("certs must not be an empty list") + if not all(isinstance(x, x509.Certificate) for x in certs): + raise TypeError("certs must be a list of Certificates") + return OCSPResponseBuilder( + self._response, self._responder_id, + certs, self._extensions, + ) + + def add_extension(self, extension, critical): + if not isinstance(extension, x509.ExtensionType): + raise TypeError("extension must be an ExtensionType") + + extension = x509.Extension(extension.oid, critical, extension) + _reject_duplicate_extension(extension, self._extensions) + + return OCSPResponseBuilder( + self._response, self._responder_id, + self._certs, self._extensions + [extension], + ) + + def sign(self, private_key, algorithm): + from cryptography.hazmat.backends.openssl.backend import backend + if self._response is None: + raise ValueError("You must add a response before signing") + if self._responder_id is None: + raise ValueError("You must add a responder_id before signing") + + if not isinstance(algorithm, hashes.HashAlgorithm): + raise TypeError("Algorithm must be a registered hash algorithm.") + + return backend.create_ocsp_response( + OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm + ) + + @classmethod + def build_unsuccessful(cls, response_status): + from cryptography.hazmat.backends.openssl.backend import backend + if not isinstance(response_status, OCSPResponseStatus): + raise TypeError( + "response_status must be an item from OCSPResponseStatus" + ) + if response_status is OCSPResponseStatus.SUCCESSFUL: + raise ValueError("response_status cannot be SUCCESSFUL") + + return backend.create_ocsp_response(response_status, None, None, None) + + @six.add_metaclass(abc.ABCMeta) class OCSPRequest(object): @abc.abstractproperty |