diff options
-rw-r--r-- | docs/x509/index.rst | 1 | ||||
-rw-r--r-- | docs/x509/ocsp.rst | 94 | ||||
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/backend.py | 11 | ||||
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/ocsp.py | 119 | ||||
-rw-r--r-- | src/cryptography/x509/ocsp.py | 79 | ||||
-rw-r--r-- | tests/x509/test_ocsp.py | 115 |
6 files changed, 419 insertions, 0 deletions
diff --git a/docs/x509/index.rst b/docs/x509/index.rst index ec47fe62..ef51fbf6 100644 --- a/docs/x509/index.rst +++ b/docs/x509/index.rst @@ -10,6 +10,7 @@ certificates are commonly used in protocols like `TLS`_. tutorial certificate-transparency + ocsp reference .. _`public key infrastructure`: https://en.wikipedia.org/wiki/Public_key_infrastructure diff --git a/docs/x509/ocsp.rst b/docs/x509/ocsp.rst new file mode 100644 index 00000000..72227f07 --- /dev/null +++ b/docs/x509/ocsp.rst @@ -0,0 +1,94 @@ +OCSP +==== + +.. currentmodule:: cryptography.x509.ocsp + +.. testsetup:: + + der_ocsp_req = ( + b"0V0T0R0P0N0\t\x06\x05+\x0e\x03\x02\x1a\x05\x00\x04\x148\xcaF\x8c" + b"\x07D\x8d\xf4\x81\x96\xc7mmLpQ\x9e`\xa7\xbd\x04\x14yu\xbb\x84:\xcb" + b",\xdez\t\xbe1\x1bC\xbc\x1c*MSX\x02\x15\x00\x98\xd9\xe5\xc0\xb4\xc3" + b"sU-\xf7|]\x0f\x1e\xb5\x12\x8eIE\xf9" + ) + +OCSP (Online Certificate Status Protocol) is a method of checking the +revocation status of certificates. It is specified in :rfc:`6960`, as well +as other obsoleted RFCs. + + +Loading Requests +~~~~~~~~~~~~~~~~ + +.. function:: load_der_ocsp_request(data) + + .. versionadded:: 2.4 + + Deserialize an OCSP request from DER encoded data. + + :param bytes data: The DER encoded OCSP request data. + + :returns: An instance of :class:`~cryptography.x509.ocsp.OCSPRequest`. + + .. doctest:: + + >>> from cryptography.x509 import ocsp + >>> ocsp_req = ocsp.load_der_ocsp_request(der_ocsp_req) + >>> for request in ocsp_req: + ... print(request.serial_number) + 872625873161273451176241581705670534707360122361 + + +Interfaces +~~~~~~~~~~ + +.. class:: OCSPRequest + + .. versionadded:: 2.4 + + An ``OCSPRequest`` is an iterable containing one or more + :class:`~cryptography.x509.ocsp.Request` objects. + + .. method:: public_bytes(encoding) + + :param encoding: The encoding to use. Only + :attr:`~cryptography.hazmat.primitives.serialization.Encoding.DER` + is supported. + + :return bytes: The serialized OCSP request. + +.. class:: Request + + .. versionadded:: 2.4 + + A ``Request`` contains several attributes that create a unique identifier + for a certificate whose status is being checked. It may also contain + additional extensions (currently unsupported). + + .. attribute:: issuer_key_hash + + :type: bytes + + The hash of the certificate issuer's key. The hash algorithm used + is defined by the ``hash_algorithm`` property. + + .. attribute:: issuer_name_hash + + :type: bytes + + The hash of the certificate issuer's name. The hash algorithm used + is defined by the ``hash_algorithm`` property. + + .. attribute:: hash_algorithm + + :type: An instance of a + :class:`~cryptography.hazmat.primitives.hashes.Hash` + + The algorithm used to generate the ``issuer_key_hash`` and + ``issuer_name_hash``. + + .. attribute:: serial_number + + :type: int + + The serial number of the certificate to check. diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index af14bfaa..6a0446bc 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -40,6 +40,7 @@ from cryptography.hazmat.backends.openssl.encode_asn1 import ( ) from cryptography.hazmat.backends.openssl.hashes import _HashContext from cryptography.hazmat.backends.openssl.hmac import _HMACContext +from cryptography.hazmat.backends.openssl.ocsp import _OCSPRequest from cryptography.hazmat.backends.openssl.rsa import ( _RSAPrivateKey, _RSAPublicKey ) @@ -1419,6 +1420,16 @@ class Backend(object): return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) + def load_der_ocsp_request(self, data): + mem_bio = self._bytes_to_bio(data) + request = self._lib.d2i_OCSP_REQUEST_bio(mem_bio.bio, self._ffi.NULL) + if request == self._ffi.NULL: + self._consume_errors() + raise ValueError("Unable to load OCSP request") + + request = self._ffi.gc(request, self._lib.OCSP_REQUEST_free) + return _OCSPRequest(self, request) + def elliptic_curve_exchange_algorithm_supported(self, algorithm, curve): return ( self.elliptic_curve_supported(curve) and diff --git a/src/cryptography/hazmat/backends/openssl/ocsp.py b/src/cryptography/hazmat/backends/openssl/ocsp.py new file mode 100644 index 00000000..38e871ec --- /dev/null +++ b/src/cryptography/hazmat/backends/openssl/ocsp.py @@ -0,0 +1,119 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +from __future__ import absolute_import, division, print_function + +import operator + +from cryptography import utils +from cryptography.exceptions import UnsupportedAlgorithm +from cryptography.hazmat.backends.openssl.decode_asn1 import ( + _asn1_integer_to_int, _asn1_string_to_bytes, _obj2txt +) +from cryptography.hazmat.primitives import serialization +from cryptography.x509.ocsp import OCSPRequest, Request, _OIDS_TO_HASH + + +@utils.register_interface(Request) +class _Request(object): + def __init__(self, backend, ocsp_request, request): + self._backend = backend + self._ocsp_request = ocsp_request + self._request = request + self._cert_id = self._backend._lib.OCSP_onereq_get0_id(self._request) + self._backend.openssl_assert(self._cert_id != self._backend._ffi.NULL) + + @property + def issuer_key_hash(self): + key_hash = self._backend._ffi.new("ASN1_OCTET_STRING **") + res = self._backend._lib.OCSP_id_get0_info( + self._backend._ffi.NULL, self._backend._ffi.NULL, + key_hash, self._backend._ffi.NULL, self._cert_id + ) + self._backend.openssl_assert(res == 1) + self._backend.openssl_assert(key_hash[0] != self._backend._ffi.NULL) + return _asn1_string_to_bytes(self._backend, key_hash[0]) + + @property + def issuer_name_hash(self): + name_hash = self._backend._ffi.new("ASN1_OCTET_STRING **") + res = self._backend._lib.OCSP_id_get0_info( + name_hash, self._backend._ffi.NULL, + self._backend._ffi.NULL, self._backend._ffi.NULL, self._cert_id + ) + self._backend.openssl_assert(res == 1) + self._backend.openssl_assert(name_hash[0] != self._backend._ffi.NULL) + return _asn1_string_to_bytes(self._backend, name_hash[0]) + + @property + def serial_number(self): + num = self._backend._ffi.new("ASN1_INTEGER **") + res = self._backend._lib.OCSP_id_get0_info( + self._backend._ffi.NULL, self._backend._ffi.NULL, + self._backend._ffi.NULL, num, self._cert_id + ) + self._backend.openssl_assert(res == 1) + self._backend.openssl_assert(num[0] != self._backend._ffi.NULL) + return _asn1_integer_to_int(self._backend, num[0]) + + @property + def hash_algorithm(self): + asn1obj = self._backend._ffi.new("ASN1_OBJECT **") + res = self._backend._lib.OCSP_id_get0_info( + self._backend._ffi.NULL, asn1obj, + self._backend._ffi.NULL, self._backend._ffi.NULL, self._cert_id + ) + self._backend.openssl_assert(res == 1) + self._backend.openssl_assert(asn1obj[0] != self._backend._ffi.NULL) + oid = _obj2txt(self._backend, asn1obj[0]) + try: + return _OIDS_TO_HASH[oid] + except KeyError: + raise UnsupportedAlgorithm( + "Signature algorithm OID: {0} not recognized".format(oid) + ) + + +@utils.register_interface(OCSPRequest) +class _OCSPRequest(object): + def __init__(self, backend, ocsp_request): + self._backend = backend + self._ocsp_request = ocsp_request + + def __len__(self): + return self._backend._lib.OCSP_request_onereq_count(self._ocsp_request) + + def _request(self, idx): + request = self._backend._lib.OCSP_request_onereq_get0( + self._ocsp_request, idx + ) + self._backend.openssl_assert(request != self._backend._ffi.NULL) + return _Request(self._backend, self._ocsp_request, request) + + def public_bytes(self, encoding): + if encoding is not serialization.Encoding.DER: + raise ValueError( + "The only allowed encoding value is Encoding.DER" + ) + + bio = self._backend._create_mem_bio_gc() + res = self._backend._lib.i2d_OCSP_REQUEST_bio(bio, self._ocsp_request) + self._backend.openssl_assert(res > 0) + return self._backend._read_mem_bio(bio) + + def __iter__(self): + for i in range(len(self)): + yield self._request(i) + + def __getitem__(self, idx): + if isinstance(idx, slice): + start, stop, step = idx.indices(len(self)) + return [self._request(i) for i in range(start, stop, step)] + else: + idx = operator.index(idx) + if idx < 0: + idx += len(self) + if not 0 <= idx < len(self): + raise IndexError + return self._request(idx) diff --git a/src/cryptography/x509/ocsp.py b/src/cryptography/x509/ocsp.py new file mode 100644 index 00000000..22894dde --- /dev/null +++ b/src/cryptography/x509/ocsp.py @@ -0,0 +1,79 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +from __future__ import absolute_import, division, print_function + +import abc + +import six + +from cryptography.hazmat.primitives import hashes + + +_OIDS_TO_HASH = { + "1.3.14.3.2.26": hashes.SHA1(), + "2.16.840.1.101.3.4.2.4": hashes.SHA224(), + "2.16.840.1.101.3.4.2.1": hashes.SHA256(), + "2.16.840.1.101.3.4.2.2": hashes.SHA384(), + "2.16.840.1.101.3.4.2.3": hashes.SHA512(), +} + + +def load_der_ocsp_request(data): + from cryptography.hazmat.backends.openssl.backend import backend + return backend.load_der_ocsp_request(data) + + +@six.add_metaclass(abc.ABCMeta) +class OCSPRequest(object): + @abc.abstractmethod + def __iter__(self): + """ + Iteration of Requests + """ + + @abc.abstractmethod + def __len__(self): + """ + Number of Requests inside the OCSPRequest object + """ + + @abc.abstractmethod + def __getitem__(self, idx): + """ + Returns a Request or range of Requests + """ + + @abc.abstractmethod + def public_bytes(self, encoding): + """ + Serializes the request to DER + """ + + +@six.add_metaclass(abc.ABCMeta) +class Request(object): + @abc.abstractproperty + def issuer_key_hash(self): + """ + The hash of the issuer public key + """ + + @abc.abstractproperty + def issuer_name_hash(self): + """ + The hash of the issuer name + """ + + @abc.abstractproperty + def hash_algorithm(self): + """ + The hash algorithm used in the issuer name and key hashes + """ + + @abc.abstractproperty + def serial_number(self): + """ + The serial number of the cert whose status is being checked + """ diff --git a/tests/x509/test_ocsp.py b/tests/x509/test_ocsp.py new file mode 100644 index 00000000..22f34df2 --- /dev/null +++ b/tests/x509/test_ocsp.py @@ -0,0 +1,115 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +from __future__ import absolute_import, division, print_function + +import os + +import pytest + +from cryptography.exceptions import UnsupportedAlgorithm +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.x509 import ocsp + +from ..utils import load_vectors_from_file + + +def _load_data(filename, loader): + return load_vectors_from_file( + filename=filename, + loader=lambda data: loader(data.read()), + mode="rb" + ) + + +class TestOCSPRequest(object): + def test_bad_request(self): + with pytest.raises(ValueError): + ocsp.load_der_ocsp_request(b"invalid") + + def test_load_request_one_item(self): + req = _load_data( + os.path.join("x509", "ocsp", "req-sha1.der"), + ocsp.load_der_ocsp_request, + ) + assert len(req) == 1 + assert req[0].issuer_name_hash == (b"8\xcaF\x8c\x07D\x8d\xf4\x81\x96" + b"\xc7mmLpQ\x9e`\xa7\xbd") + assert req[0].issuer_key_hash == (b"yu\xbb\x84:\xcb,\xdez\t\xbe1" + b"\x1bC\xbc\x1c*MSX") + assert isinstance(req[0].hash_algorithm, hashes.SHA1) + assert req[0].serial_number == int( + "98D9E5C0B4C373552DF77C5D0F1EB5128E4945F9", 16 + ) + + def test_load_request_multiple_items(self): + req = _load_data( + os.path.join("x509", "ocsp", "req-multi-sha1.der"), + ocsp.load_der_ocsp_request, + ) + assert len(req) == 2 + assert req[0].issuer_name_hash == (b"8\xcaF\x8c\x07D\x8d\xf4\x81\x96" + b"\xc7mmLpQ\x9e`\xa7\xbd") + assert req[0].issuer_key_hash == (b"yu\xbb\x84:\xcb,\xdez\t\xbe1" + b"\x1bC\xbc\x1c*MSX") + assert isinstance(req[0].hash_algorithm, hashes.SHA1) + assert req[0].serial_number == int( + "98D9E5C0B4C373552DF77C5D0F1EB5128E4945F9", 16 + ) + assert req[1].issuer_name_hash == (b"8\xcaF\x8c\x07D\x8d\xf4\x81\x96" + b"\xc7mmLpQ\x9e`\xa7\xbd") + assert req[1].issuer_key_hash == (b"yu\xbb\x84:\xcb,\xdez\t\xbe1" + b"\x1bC\xbc\x1c*MSX") + assert isinstance(req[1].hash_algorithm, hashes.SHA1) + assert req[1].serial_number == int( + "98D9E5C0B4C373552DF77C5D0F1EB5128E4945F0", 16 + ) + + def test_iter(self): + req = _load_data( + os.path.join("x509", "ocsp", "req-multi-sha1.der"), + ocsp.load_der_ocsp_request, + ) + for request in req: + assert isinstance(request, ocsp.Request) + + def test_indexing_ocsp_request(self): + req = _load_data( + os.path.join("x509", "ocsp", "req-multi-sha1.der"), + ocsp.load_der_ocsp_request, + ) + assert req[1].serial_number == req[-1].serial_number + assert len(req[0:2]) == 2 + assert req[1:2][0].serial_number == int( + "98D9E5C0B4C373552DF77C5D0F1EB5128E4945F0", 16 + ) + with pytest.raises(IndexError): + req[10] + + def test_invalid_hash_algorithm(self): + req = _load_data( + os.path.join("x509", "ocsp", "req-invalid-hash-alg.der"), + ocsp.load_der_ocsp_request, + ) + with pytest.raises(UnsupportedAlgorithm): + req[0].hash_algorithm + + def test_serialize_request(self): + req_bytes = load_vectors_from_file( + filename=os.path.join("x509", "ocsp", "req-sha1.der"), + loader=lambda data: data.read(), + mode="rb" + ) + req = ocsp.load_der_ocsp_request(req_bytes) + assert req.public_bytes(serialization.Encoding.DER) == req_bytes + + def test_invalid_serialize_encoding(self): + req = _load_data( + os.path.join("x509", "ocsp", "req-sha1.der"), + ocsp.load_der_ocsp_request, + ) + with pytest.raises(ValueError): + req.public_bytes("invalid") + with pytest.raises(ValueError): + req.public_bytes(serialization.Encoding.PEM) |