diff options
author | Paul Kehrer <paul.l.kehrer@gmail.com> | 2018-10-02 07:54:31 +0800 |
---|---|---|
committer | Alex Gaynor <alex.gaynor@gmail.com> | 2018-10-01 19:54:31 -0400 |
commit | a07de31096767abd3b4529ae29c0487c8f21310b (patch) | |
tree | 28eb5577ebb97209332c50b17c92e29e981ac644 | |
parent | 1717f8c998b22fbbebec4b5514aee42fb3a2f68d (diff) | |
download | cryptography-a07de31096767abd3b4529ae29c0487c8f21310b.tar.gz cryptography-a07de31096767abd3b4529ae29c0487c8f21310b.tar.bz2 cryptography-a07de31096767abd3b4529ae29c0487c8f21310b.zip |
support OCSP response parsing (#4452)
* support OCSP response parsing
* move the decorator to make pep8 happy
* add some missing docs
* review feedback
* more review feedback
-rw-r--r-- | docs/x509/ocsp.rst | 21 | ||||
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/backend.py | 14 | ||||
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/ocsp.py | 244 | ||||
-rw-r--r-- | src/cryptography/x509/ocsp.py | 8 | ||||
-rw-r--r-- | tests/x509/test_ocsp.py | 137 |
5 files changed, 420 insertions, 4 deletions
diff --git a/docs/x509/ocsp.rst b/docs/x509/ocsp.rst index 14d9bb84..b2030226 100644 --- a/docs/x509/ocsp.rst +++ b/docs/x509/ocsp.rst @@ -74,6 +74,7 @@ OCSP b",\xdez\t\xbe1\x1bC\xbc\x1c*MSX\x02\x15\x00\x98\xd9\xe5\xc0\xb4\xc3" b"sU-\xf7|]\x0f\x1e\xb5\x12\x8eIE\xf9" ) + der_ocsp_resp_unauth = b"0\x03\n\x01\x06" OCSP (Online Certificate Status Protocol) is a method of checking the revocation status of certificates. It is specified in :rfc:`6960`, as well @@ -151,6 +152,26 @@ Creating Requests >>> base64.b64encode(req.public_bytes(serialization.Encoding.DER)) b'MEMwQTA/MD0wOzAJBgUrDgMCGgUABBRAC0Z68eay0wmDug1gfn5ZN0gkxAQUw5zz/NNGCDS7zkZ/oHxb8+IIy1kCAj8g' +Loading Responses +~~~~~~~~~~~~~~~~~ + +.. function:: load_der_ocsp_response(data) + + .. versionadded:: 2.4 + + Deserialize an OCSP response from DER encoded data. + + :param bytes data: The DER encoded OCSP response data. + + :returns: An instance of :class:`~cryptography.x509.ocsp.OCSPResponse`. + + .. doctest:: + + >>> from cryptography.x509 import ocsp + >>> ocsp_resp = ocsp.load_der_ocsp_response(der_ocsp_resp_unauth) + >>> print(ocsp_resp.response_status) + OCSPResponseStatus.UNAUTHORIZED + Interfaces ~~~~~~~~~~ diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index 8b4792b4..f374a8e3 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -42,7 +42,9 @@ from cryptography.hazmat.backends.openssl.encode_asn1 import ( ) from cryptography.hazmat.backends.openssl.hashes import _HashContext from cryptography.hazmat.backends.openssl.hmac import _HMACContext -from cryptography.hazmat.backends.openssl.ocsp import _OCSPRequest +from cryptography.hazmat.backends.openssl.ocsp import ( + _OCSPRequest, _OCSPResponse +) from cryptography.hazmat.backends.openssl.rsa import ( _RSAPrivateKey, _RSAPublicKey ) @@ -1441,6 +1443,16 @@ class Backend(object): request = self._ffi.gc(request, self._lib.OCSP_REQUEST_free) return _OCSPRequest(self, request) + def load_der_ocsp_response(self, data): + mem_bio = self._bytes_to_bio(data) + response = self._lib.d2i_OCSP_RESPONSE_bio(mem_bio.bio, self._ffi.NULL) + if response == self._ffi.NULL: + self._consume_errors() + raise ValueError("Unable to load OCSP response") + + response = self._ffi.gc(response, self._lib.OCSP_RESPONSE_free) + return _OCSPResponse(self, response) + def create_ocsp_request(self, builder): ocsp_req = self._lib.OCSP_REQUEST_new() self.openssl_assert(ocsp_req != self._ffi.NULL) diff --git a/src/cryptography/hazmat/backends/openssl/ocsp.py b/src/cryptography/hazmat/backends/openssl/ocsp.py index 420d7eb6..f3f18cb0 100644 --- a/src/cryptography/hazmat/backends/openssl/ocsp.py +++ b/src/cryptography/hazmat/backends/openssl/ocsp.py @@ -4,13 +4,35 @@ from __future__ import absolute_import, division, print_function -from cryptography import utils +import functools + +from cryptography import utils, x509 from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat.backends.openssl.decode_asn1 import ( - _OCSP_REQ_EXT_PARSER, _asn1_integer_to_int, _asn1_string_to_bytes, _obj2txt + _CRL_ENTRY_REASON_CODE_TO_ENUM, _OCSP_REQ_EXT_PARSER, _asn1_integer_to_int, + _asn1_string_to_bytes, _decode_x509_name, _obj2txt, + _parse_asn1_generalized_time, ) +from cryptography.hazmat.backends.openssl.x509 import _Certificate from cryptography.hazmat.primitives import serialization -from cryptography.x509.ocsp import OCSPRequest, _OIDS_TO_HASH +from cryptography.x509.ocsp import ( + OCSPCertStatus, OCSPRequest, OCSPResponse, OCSPResponseStatus, + _CERT_STATUS_TO_ENUM, _OIDS_TO_HASH, _RESPONSE_STATUS_TO_ENUM, +) + + +def _requires_successful_response(func): + @functools.wraps(func) + def wrapper(self, *args): + if self.response_status != OCSPResponseStatus.SUCCESSFUL: + raise ValueError( + "OCSP response status is not successful so the property " + "has no value" + ) + else: + return func(self, *args) + + return wrapper def _issuer_key_hash(backend, cert_id): @@ -63,6 +85,222 @@ def _hash_algorithm(backend, cert_id): ) +@utils.register_interface(OCSPResponse) +class _OCSPResponse(object): + def __init__(self, backend, ocsp_response): + self._backend = backend + self._ocsp_response = ocsp_response + status = self._backend._lib.OCSP_response_status(self._ocsp_response) + self._backend.openssl_assert(status in _RESPONSE_STATUS_TO_ENUM) + self._status = _RESPONSE_STATUS_TO_ENUM[status] + if self._status is OCSPResponseStatus.SUCCESSFUL: + basic = self._backend._lib.OCSP_response_get1_basic( + self._ocsp_response + ) + self._backend.openssl_assert(basic != self._backend._ffi.NULL) + self._basic = self._backend._ffi.gc( + basic, self._backend._lib.OCSP_BASICRESP_free + ) + self._backend.openssl_assert( + self._backend._lib.OCSP_resp_count(self._basic) == 1 + ) + self._single = self._backend._lib.OCSP_resp_get0(self._basic, 0) + self._backend.openssl_assert( + self._single != self._backend._ffi.NULL + ) + self._cert_id = self._backend._lib.OCSP_SINGLERESP_get0_id( + self._single + ) + self._backend.openssl_assert( + self._cert_id != self._backend._ffi.NULL + ) + + response_status = utils.read_only_property("_status") + + @property + @_requires_successful_response + def signature_algorithm_oid(self): + alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic) + self._backend.openssl_assert(alg != self._backend._ffi.NULL) + oid = _obj2txt(self._backend, alg.algorithm) + return x509.ObjectIdentifier(oid) + + @property + @_requires_successful_response + def signature(self): + sig = self._backend._lib.OCSP_resp_get0_signature(self._basic) + self._backend.openssl_assert(sig != self._backend._ffi.NULL) + return _asn1_string_to_bytes(self._backend, sig) + + @property + @_requires_successful_response + def tbs_response_bytes(self): + respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic) + self._backend.openssl_assert(respdata != self._backend._ffi.NULL) + pp = self._backend._ffi.new("unsigned char **") + res = self._backend._lib.i2d_OCSP_RESPDATA(respdata, pp) + self._backend.openssl_assert(pp[0] != self._backend._ffi.NULL) + pp = self._backend._ffi.gc( + pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0]) + ) + self._backend.openssl_assert(res > 0) + return self._backend._ffi.buffer(pp[0], res)[:] + + @property + @_requires_successful_response + def certificates(self): + sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic) + num = self._backend._lib.sk_X509_num(sk_x509) + certs = [] + for i in range(num): + x509 = self._backend._lib.sk_X509_value(sk_x509, i) + self._backend.openssl_assert(x509 != self._backend._ffi.NULL) + cert = _Certificate(self._backend, x509) + # We need to keep the OCSP response that the certificate came from + # alive until the Certificate object itself goes out of scope, so + # we give it a private reference. + cert._ocsp_resp = self + certs.append(cert) + + return certs + + @property + @_requires_successful_response + def responder_key_hash(self): + _, asn1_string = self._responder_key_name() + if asn1_string == self._backend._ffi.NULL: + return None + else: + return _asn1_string_to_bytes(self._backend, asn1_string) + + @property + @_requires_successful_response + def responder_name(self): + x509_name, _ = self._responder_key_name() + if x509_name == self._backend._ffi.NULL: + return None + else: + return _decode_x509_name(self._backend, x509_name) + + def _responder_key_name(self): + asn1_string = self._backend._ffi.new("ASN1_OCTET_STRING **") + x509_name = self._backend._ffi.new("X509_NAME **") + res = self._backend._lib.OCSP_resp_get0_id( + self._basic, asn1_string, x509_name + ) + self._backend.openssl_assert(res == 1) + return x509_name[0], asn1_string[0] + + @property + @_requires_successful_response + def produced_at(self): + produced_at = self._backend._lib.OCSP_resp_get0_produced_at( + self._basic + ) + return _parse_asn1_generalized_time(self._backend, produced_at) + + @property + @_requires_successful_response + def certificate_status(self): + status = self._backend._lib.OCSP_single_get0_status( + self._single, + self._backend._ffi.NULL, + self._backend._ffi.NULL, + self._backend._ffi.NULL, + self._backend._ffi.NULL, + ) + self._backend.openssl_assert(status in _CERT_STATUS_TO_ENUM) + return _CERT_STATUS_TO_ENUM[status] + + @property + @_requires_successful_response + def revocation_time(self): + if self.certificate_status is not OCSPCertStatus.REVOKED: + return None + + asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") + self._backend._lib.OCSP_single_get0_status( + self._single, + self._backend._ffi.NULL, + asn1_time, + self._backend._ffi.NULL, + self._backend._ffi.NULL, + ) + self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL) + return _parse_asn1_generalized_time(self._backend, asn1_time[0]) + + @property + @_requires_successful_response + def revocation_reason(self): + if self.certificate_status is not OCSPCertStatus.REVOKED: + return None + + reason_ptr = self._backend._ffi.new("int *") + self._backend._lib.OCSP_single_get0_status( + self._single, + reason_ptr, + self._backend._ffi.NULL, + self._backend._ffi.NULL, + self._backend._ffi.NULL, + ) + # If no reason is encoded OpenSSL returns -1 + if reason_ptr[0] == -1: + return None + else: + self._backend.openssl_assert( + reason_ptr[0] in _CRL_ENTRY_REASON_CODE_TO_ENUM + ) + return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]] + + @property + @_requires_successful_response + def this_update(self): + asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") + self._backend._lib.OCSP_single_get0_status( + self._single, + self._backend._ffi.NULL, + self._backend._ffi.NULL, + asn1_time, + self._backend._ffi.NULL, + ) + self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL) + return _parse_asn1_generalized_time(self._backend, asn1_time[0]) + + @property + @_requires_successful_response + def next_update(self): + asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") + self._backend._lib.OCSP_single_get0_status( + self._single, + self._backend._ffi.NULL, + self._backend._ffi.NULL, + self._backend._ffi.NULL, + asn1_time, + ) + self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL) + return _parse_asn1_generalized_time(self._backend, asn1_time[0]) + + @property + @_requires_successful_response + def issuer_key_hash(self): + return _issuer_key_hash(self._backend, self._cert_id) + + @property + @_requires_successful_response + def issuer_name_hash(self): + return _issuer_name_hash(self._backend, self._cert_id) + + @property + @_requires_successful_response + def hash_algorithm(self): + return _hash_algorithm(self._backend, self._cert_id) + + @property + @_requires_successful_response + def serial_number(self): + return _serial_number(self._backend, self._cert_id) + + @utils.register_interface(OCSPRequest) class _OCSPRequest(object): def __init__(self, backend, ocsp_request): diff --git a/src/cryptography/x509/ocsp.py b/src/cryptography/x509/ocsp.py index 7535a0b3..7907bcae 100644 --- a/src/cryptography/x509/ocsp.py +++ b/src/cryptography/x509/ocsp.py @@ -40,11 +40,19 @@ class OCSPCertStatus(Enum): UNKNOWN = 2 +_CERT_STATUS_TO_ENUM = dict((x.value, x) for x in OCSPCertStatus) + + def load_der_ocsp_request(data): from cryptography.hazmat.backends.openssl.backend import backend return backend.load_der_ocsp_request(data) +def load_der_ocsp_response(data): + from cryptography.hazmat.backends.openssl.backend import backend + return backend.load_der_ocsp_response(data) + + class OCSPRequestBuilder(object): def __init__(self, request=None): self._request = request diff --git a/tests/x509/test_ocsp.py b/tests/x509/test_ocsp.py index a646f4b7..aeaa6e6c 100644 --- a/tests/x509/test_ocsp.py +++ b/tests/x509/test_ocsp.py @@ -5,6 +5,7 @@ from __future__ import absolute_import, division, print_function import base64 +import datetime import os import pytest @@ -12,6 +13,7 @@ import pytest from cryptography import x509 from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 from cryptography.x509 import ocsp from .test_x509 import _load_cert @@ -146,3 +148,138 @@ class TestOCSPRequestBuilder(object): b"MEMwQTA/MD0wOzAJBgUrDgMCGgUABBRAC0Z68eay0wmDug1gfn5ZN0gkxAQUw5zz" b"/NNGCDS7zkZ/oHxb8+IIy1kCAj8g" ) + + +class TestOCSPResponse(object): + def test_bad_response(self): + with pytest.raises(ValueError): + ocsp.load_der_ocsp_response(b"invalid") + + def test_load_response(self): + resp = _load_data( + os.path.join("x509", "ocsp", "resp-sha256.der"), + ocsp.load_der_ocsp_response, + ) + from cryptography.hazmat.backends.openssl.backend import backend + issuer = _load_cert( + os.path.join("x509", "letsencryptx3.pem"), + x509.load_pem_x509_certificate, + backend + ) + assert resp.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL + assert (resp.signature_algorithm_oid == + x509.SignatureAlgorithmOID.RSA_WITH_SHA256) + assert resp.signature == base64.b64decode( + b"I9KUlyLV/2LbNCVu1BQphxdNlU/jBzXsPYVscPjW5E93pCrSO84GkIWoOJtqsnt" + b"78DLcQPnF3W24NXGzSGKlSWfXIsyoXCxnBm0mIbD5ZMnKyXEnqSR33Z9He/A+ML" + b"A8gbrDUipGNPosesenkKUnOtFIzEGv29hV5E6AMP2ORPVsVlTAZegPJFbbVIWc0" + b"rZGFCXKxijDxtUtgWzBhpBAI50JbPHi+IVuaOe4aDJLYgZ0BIBNa6bDI+rScyoy" + b"5U0DToV7SZn6CoJ3U19X7BHdYn6TLX0xi43eXuzBGzdHnSzmsc7r/DvkAKJm3vb" + b"dVECXqe/gFlXJUBcZ25jhs70MUA==" + ) + assert resp.tbs_response_bytes == base64.b64decode( + b"MIHWoUwwSjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUxldCdzIEVuY3J5cHQxIzA" + b"hBgNVBAMTGkxldCdzIEVuY3J5cHQgQXV0aG9yaXR5IFgzGA8yMDE4MDgzMDExMT" + b"UwMFowdTBzMEswCQYFKw4DAhoFAAQUfuZq53Kas/z4oiBkbBahLWBxCF0EFKhKa" + b"mMEfd265tE5t6ZFZe/zqOyhAhIDHHh6fckClQB7xfIiCztSevCAABgPMjAxODA4" + b"MzAxMTAwMDBaoBEYDzIwMTgwOTA2MTEwMDAwWg==" + ) + issuer.public_key().verify( + resp.signature, + resp.tbs_response_bytes, + PKCS1v15(), + hashes.SHA256() + ) + assert resp.certificates == [] + assert resp.responder_key_hash is None + assert resp.responder_name == issuer.subject + assert resp.produced_at == datetime.datetime(2018, 8, 30, 11, 15) + assert resp.certificate_status == ocsp.OCSPCertStatus.GOOD + assert resp.revocation_time is None + assert resp.revocation_reason is None + assert resp.this_update == datetime.datetime(2018, 8, 30, 11, 0) + assert resp.next_update == datetime.datetime(2018, 9, 6, 11, 0) + assert resp.issuer_key_hash == ( + b'\xa8Jjc\x04}\xdd\xba\xe6\xd19\xb7\xa6Ee\xef\xf3\xa8\xec\xa1' + ) + assert resp.issuer_name_hash == ( + b'~\xe6j\xe7r\x9a\xb3\xfc\xf8\xa2 dl\x16\xa1-`q\x08]' + ) + assert isinstance(resp.hash_algorithm, hashes.SHA1) + assert resp.serial_number == 271024907440004808294641238224534273948400 + + def test_load_unauthorized(self): + resp = _load_data( + os.path.join("x509", "ocsp", "resp-unauthorized.der"), + ocsp.load_der_ocsp_response, + ) + assert resp.response_status == ocsp.OCSPResponseStatus.UNAUTHORIZED + with pytest.raises(ValueError): + assert resp.signature_algorithm_oid + with pytest.raises(ValueError): + assert resp.signature + with pytest.raises(ValueError): + assert resp.tbs_response_bytes + with pytest.raises(ValueError): + assert resp.certificates + with pytest.raises(ValueError): + assert resp.responder_key_hash + with pytest.raises(ValueError): + assert resp.responder_name + with pytest.raises(ValueError): + assert resp.produced_at + with pytest.raises(ValueError): + assert resp.certificate_status + with pytest.raises(ValueError): + assert resp.revocation_time + with pytest.raises(ValueError): + assert resp.revocation_reason + with pytest.raises(ValueError): + assert resp.this_update + with pytest.raises(ValueError): + assert resp.next_update + with pytest.raises(ValueError): + assert resp.issuer_key_hash + with pytest.raises(ValueError): + assert resp.issuer_name_hash + with pytest.raises(ValueError): + assert resp.hash_algorithm + with pytest.raises(ValueError): + assert resp.serial_number + + def test_load_revoked(self): + resp = _load_data( + os.path.join("x509", "ocsp", "resp-revoked.der"), + ocsp.load_der_ocsp_response, + ) + assert resp.certificate_status == ocsp.OCSPCertStatus.REVOKED + assert resp.revocation_time == datetime.datetime( + 2016, 9, 2, 21, 28, 48 + ) + assert resp.revocation_reason is None + + def test_load_delegate_unknown_cert(self): + resp = _load_data( + os.path.join("x509", "ocsp", "resp-delegate-unknown-cert.der"), + ocsp.load_der_ocsp_response, + ) + assert len(resp.certificates) == 1 + assert isinstance(resp.certificates[0], x509.Certificate) + assert resp.certificate_status == ocsp.OCSPCertStatus.UNKNOWN + + def test_load_responder_key_hash(self): + resp = _load_data( + os.path.join("x509", "ocsp", "resp-responder-key-hash.der"), + ocsp.load_der_ocsp_response, + ) + assert resp.responder_name is None + assert resp.responder_key_hash == ( + b'\x0f\x80a\x1c\x821a\xd5/(\xe7\x8dF8\xb4,\xe1\xc6\xd9\xe2' + ) + + def test_load_revoked_reason(self): + resp = _load_data( + os.path.join("x509", "ocsp", "resp-revoked-reason.der"), + ocsp.load_der_ocsp_response, + ) + assert resp.revocation_reason is x509.ReasonFlags.superseded |