aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndre Caron <andre.l.caron@gmail.com>2015-05-18 13:53:43 -0400
committerIan Cordasco <graffatcolmingov@gmail.com>2015-06-24 13:35:48 -0500
commit0ef595f1d9b5336872dc24d7d67c8cd127b31cea (patch)
tree70f497d2583b45840811f16f2c2a7f1667489e73
parent77c98e3c4ef69d0cfee665cd0835670f4ac44242 (diff)
downloadcryptography-0ef595f1d9b5336872dc24d7d67c8cd127b31cea.tar.gz
cryptography-0ef595f1d9b5336872dc24d7d67c8cd127b31cea.tar.bz2
cryptography-0ef595f1d9b5336872dc24d7d67c8cd127b31cea.zip
Adds CSR builder.
-rw-r--r--CHANGELOG.rst2
-rw-r--r--docs/x509.rst41
-rw-r--r--src/cryptography/hazmat/backends/openssl/backend.py145
-rw-r--r--src/cryptography/x509.py43
-rw-r--r--tests/test_x509.py147
5 files changed, 377 insertions, 1 deletions
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index 5c9d08ea..bc95cf74 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -24,6 +24,8 @@ Changelog
and :class:`~cryptography.hazmat.primitives.kdf.concatkdf.ConcatKDFHMAC`.
* Raise a ``TypeError`` when passing objects that are not text as the value to
:class:`~cryptography.x509.NameAttribute`.
+* Add support for creating certificate signing requests with
+ :class:`~cryptography.x509.CertificateSigningRequestBuilder`.
0.9.1 - 2015-06-06
~~~~~~~~~~~~~~~~~~
diff --git a/docs/x509.rst b/docs/x509.rst
index b8e3c8ee..8507edc1 100644
--- a/docs/x509.rst
+++ b/docs/x509.rst
@@ -468,6 +468,47 @@ X.509 Revoked Certificate Object
The extensions encoded in the revoked certificate.
+X.509 CSR (Certificate Signing Request) Builder Object
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. class:: CertificateSigningRequestBuilder
+
+ .. method:: __init__()
+
+ Creates an empty certificate signing request.
+
+ .. method:: set_version(version)
+
+ :param version: The :class:`Version` of the X.509 protocol.
+
+ .. method:: set_subject_name(name)
+
+ :param name: The :class:`Name` of the certificate subject.
+
+ .. method:: add_extension(extension)
+
+ :param extension: The :class:`Extension` to add to the request.
+
+ .. method:: sign(backend, private_key, algorithm)
+
+ :param backend: Backend that will be used to sign the request.
+ Must support the
+ :class:`~cryptography.hazmat.backends.interfaces.X509Backend`
+ interface.
+
+ :param private_key: The
+ :class:`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey`
+ that will be used to sign the request. When the request is
+ signed by a certificate authority, the private key's associated
+ public key will be stored in the resulting certificate.
+
+ :param algorithm: The
+ :class:`~cryptography.hazmat.primitives.hashes.HashAlgorithm`
+ that will be used to generate the request signature.
+
+ :type: :class:`CertificateSigningRequest`
+
+
.. class:: Name
.. versionadded:: 0.8
diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py
index 4d469c40..f03ca077 100644
--- a/src/cryptography/hazmat/backends/openssl/backend.py
+++ b/src/cryptography/hazmat/backends/openssl/backend.py
@@ -10,7 +10,7 @@ from contextlib import contextmanager
import six
-from cryptography import utils
+from cryptography import utils, x509
from cryptography.exceptions import (
InternalError, UnsupportedAlgorithm, _Reasons
)
@@ -56,6 +56,88 @@ _OpenSSLError = collections.namedtuple("_OpenSSLError",
["code", "lib", "func", "reason"])
+def _encode_asn1_int(backend, x):
+ i = backend._lib.ASN1_INTEGER_new()
+ # i = backend._ffi.gc(i, backend._lib.ASN1_INTEGER_free)
+ backend._lib.ASN1_INTEGER_set(i, x)
+ return i
+
+
+def _encode_asn1_str(backend, x, n):
+ s = backend._lib.ASN1_OCTET_STRING_new()
+ # s = backend._ffi.gc(s, backend._lib.ASN1_OCTET_STRING_free)
+ backend._lib.ASN1_OCTET_STRING_set(s, x, n)
+ return s
+
+
+def _encode_name(backend, attributes):
+ resolve = {
+ x509.OID_COMMON_NAME: b'CN',
+ x509.OID_COUNTRY_NAME: b'C',
+ x509.OID_STATE_OR_PROVINCE_NAME: b'ST',
+ x509.OID_LOCALITY_NAME: b'L',
+ x509.OID_ORGANIZATION_NAME: b'O',
+ x509.OID_ORGANIZATIONAL_UNIT_NAME: b'OU',
+ }
+ subject = backend._lib.X509_NAME_new()
+ subject = backend._ffi.gc(subject, backend._lib.X509_NAME_free)
+ for attribute in attributes:
+ value = attribute.value
+ if isinstance(value, six.text_type):
+ value = value.encode('ascii')
+ res = backend._lib.X509_NAME_add_entry_by_txt(
+ subject,
+ resolve[attribute.oid],
+ backend._lib.MBSTRING_ASC,
+ value,
+ -1, -1, 0,
+ )
+ assert res == 1
+ return subject
+
+
+def _txt2obj(backend, name):
+ if isinstance(name, six.text_type):
+ name = name.encode('ascii')
+ obj = backend._lib.OBJ_txt2obj(name, 1)
+ assert obj != backend._ffi.NULL
+ # NOTE: not sure if we should GC...
+ return obj
+
+
+def _encode_basic_constraints(backend, ca=False, pathlen=0, critical=False):
+ obj = _txt2obj(backend, x509.OID_BASIC_CONSTRAINTS.dotted_string)
+ assert obj is not None
+ constraints = backend._lib.BASIC_CONSTRAINTS_new()
+ constraints.ca = 255 if ca else 0
+ if ca:
+ constraints.pathlen = _encode_asn1_int(backend, pathlen)
+
+ # Allocate a buffer for encoded payload.
+ cdata = backend._ffi.new(
+ 'unsigned char[]',
+ 2048, # TODO: shrink to fit!
+ )
+ assert cdata != backend._ffi.NULL
+
+ # Fetch the encoded payload.
+ p = backend._ffi.new('unsigned char*[1]')
+ assert p != backend._ffi.NULL
+ p[0] = cdata
+ r = backend._lib.i2d_BASIC_CONSTRAINTS(constraints, p)
+ assert r > 0
+
+ # Wrap that in an X509 extension object.
+ extension = backend._lib.X509_EXTENSION_create_by_OBJ(
+ backend._ffi.NULL,
+ obj,
+ 1 if critical else 0,
+ _encode_asn1_str(backend, cdata, r),
+ )
+ assert extension != backend._ffi.NULL
+ return extension
+
+
@utils.register_interface(CipherBackend)
@utils.register_interface(CMACBackend)
@utils.register_interface(DERSerializationBackend)
@@ -710,6 +792,67 @@ class Backend(object):
def create_cmac_ctx(self, algorithm):
return _CMACContext(self, algorithm)
+ def sign_x509_request(self, builder, private_key, algorithm):
+ # TODO: check type of private key parameter.
+ if not isinstance(algorithm, hashes.HashAlgorithm):
+ raise TypeError('Algorithm must be a registered hash algorithm.')
+
+ # Resolve the signature algorithm.
+ evp_md = self._lib.EVP_get_digestbyname(
+ algorithm.name.encode('ascii')
+ )
+ assert evp_md != self._ffi.NULL
+
+ # Create an empty request.
+ x509_req = self._lib.X509_REQ_new()
+ assert x509_req != self._ffi.NULL
+
+ # Set x509 version.
+ res = self._lib.X509_REQ_set_version(x509_req, builder._version.value)
+ assert res == 1
+
+ # Set subject name.
+ res = self._lib.X509_REQ_set_subject_name(
+ x509_req, _encode_name(self, list(builder._subject_name))
+ )
+ assert res == 1
+
+ # Set subject public key.
+ public_key = private_key.public_key()
+ res = self._lib.X509_REQ_set_pubkey(
+ x509_req, public_key._evp_pkey
+ )
+ assert res == 1
+
+ # Add extensions.
+ extensions = self._lib.sk_X509_EXTENSION_new_null()
+ extensions = self._ffi.gc(
+ extensions,
+ self._lib.sk_X509_EXTENSION_free,
+ )
+ for extension in builder._extensions:
+ if isinstance(extension.value, x509.BasicConstraints):
+ extension = _encode_basic_constraints(
+ self,
+ extension.value.ca,
+ extension.value.path_length,
+ extension.critical
+ )
+ else:
+ raise ValueError('Extension not yet supported.')
+ res = self._lib.sk_X509_EXTENSION_push(extensions, extension)
+ assert res == 1
+ res = self._lib.X509_REQ_add_extensions(x509_req, extensions)
+ assert res == 1
+
+ # Sign the request using the requester's private key.
+ res = self._lib.X509_REQ_sign(
+ x509_req, private_key._evp_pkey, evp_md
+ )
+ assert res > 0
+
+ return _CertificateSigningRequest(self, x509_req)
+
def load_pem_private_key(self, data, password):
return self._load_key(
self._lib.PEM_read_bio_PrivateKey,
diff --git a/src/cryptography/x509.py b/src/cryptography/x509.py
index 4b030ca9..012c13ba 100644
--- a/src/cryptography/x509.py
+++ b/src/cryptography/x509.py
@@ -1442,3 +1442,46 @@ class RevokedCertificate(object):
"""
Returns an Extensions object containing a list of Revoked extensions.
"""
+
+
+class CertificateSigningRequestBuilder(object):
+ def __init__(self):
+ """
+ Creates an empty X.509 certificate request (v1).
+ """
+ self._version = Version.v1
+ self._subject_name = None
+ self._extensions = []
+
+ def set_version(self, version):
+ """
+ Sets the X.509 version.
+ """
+ if not isinstance(version, Version):
+ raise TypeError('Expecting x509.Version object.')
+ self._version = version
+
+ def set_subject_name(self, name):
+ """
+ Sets the certificate requestor's distinguished name.
+ """
+ if not isinstance(name, Name):
+ raise TypeError('Expecting x509.Name object.')
+ self._subject_name = name
+
+ def add_extension(self, extension):
+ """
+ Adds an X.509 extension to the certificate request.
+ """
+ if not isinstance(extension, Extension):
+ raise TypeError('Expecting x509.Extension object.')
+ for e in self._extensions:
+ if e.oid == extension.oid:
+ raise ValueError('This extension has already been set.')
+ self._extensions.append(extension)
+
+ def sign(self, backend, private_key, algorithm):
+ """
+ Signs the request using the requestor's private key.
+ """
+ return backend.sign_x509_request(self, private_key, algorithm)
diff --git a/tests/test_x509.py b/tests/test_x509.py
index cf3499bf..85ef4b5c 100644
--- a/tests/test_x509.py
+++ b/tests/test_x509.py
@@ -679,6 +679,153 @@ class TestRSACertificateRequest(object):
assert serialized == request_bytes
+@pytest.mark.requires_backend_interface(interface=RSABackend)
+@pytest.mark.requires_backend_interface(interface=X509Backend)
+class TestCertificateSigningRequestBuilder(object):
+ def test_sign_invalid_hash_algorithm(self, backend):
+ private_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=backend,
+ )
+ builder = x509.CertificateSigningRequestBuilder()
+ with pytest.raises(TypeError):
+ builder.sign(backend, private_key, 'NotAHash')
+
+ def test_build_ca_request(self, backend):
+ private_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=backend,
+ )
+
+ builder = x509.CertificateSigningRequestBuilder()
+ builder.set_version(x509.Version.v3)
+ builder.set_subject_name(x509.Name([
+ x509.NameAttribute(x509.OID_COUNTRY_NAME, 'US'),
+ x509.NameAttribute(x509.OID_STATE_OR_PROVINCE_NAME, 'Texas'),
+ x509.NameAttribute(x509.OID_LOCALITY_NAME, 'Austin'),
+ x509.NameAttribute(x509.OID_ORGANIZATION_NAME, 'PyCA'),
+ x509.NameAttribute(x509.OID_COMMON_NAME, 'cryptography.io'),
+ ]))
+ builder.add_extension(x509.Extension(
+ x509.OID_BASIC_CONSTRAINTS,
+ True,
+ x509.BasicConstraints(True, 2),
+ ))
+ request = builder.sign(backend, private_key, hashes.SHA1())
+
+ assert isinstance(request.signature_hash_algorithm, hashes.SHA1)
+ public_key = request.public_key()
+ assert isinstance(public_key, rsa.RSAPublicKey)
+ subject = request.subject
+ assert isinstance(subject, x509.Name)
+ assert list(subject) == [
+ x509.NameAttribute(x509.OID_COUNTRY_NAME, 'US'),
+ x509.NameAttribute(x509.OID_STATE_OR_PROVINCE_NAME, 'Texas'),
+ x509.NameAttribute(x509.OID_LOCALITY_NAME, 'Austin'),
+ x509.NameAttribute(x509.OID_ORGANIZATION_NAME, 'PyCA'),
+ x509.NameAttribute(x509.OID_COMMON_NAME, 'cryptography.io'),
+ ]
+ basic_constraints = request.extensions.get_extension_for_oid(
+ x509.OID_BASIC_CONSTRAINTS
+ )
+ assert basic_constraints.value.ca is True
+ assert basic_constraints.value.path_length == 2
+
+ def test_build_nonca_request(self, backend):
+ private_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=backend,
+ )
+
+ builder = x509.CertificateSigningRequestBuilder()
+ builder.set_version(x509.Version.v3)
+ builder.set_subject_name(x509.Name([
+ x509.NameAttribute(x509.OID_COUNTRY_NAME, 'US'),
+ x509.NameAttribute(x509.OID_STATE_OR_PROVINCE_NAME, 'Texas'),
+ x509.NameAttribute(x509.OID_LOCALITY_NAME, 'Austin'),
+ x509.NameAttribute(x509.OID_ORGANIZATION_NAME, 'PyCA'),
+ x509.NameAttribute(x509.OID_COMMON_NAME, 'cryptography.io'),
+ ]))
+ builder.add_extension(x509.Extension(
+ x509.OID_BASIC_CONSTRAINTS,
+ True,
+ x509.BasicConstraints(False, None),
+ ))
+ request = builder.sign(backend, private_key, hashes.SHA1())
+
+ assert isinstance(request.signature_hash_algorithm, hashes.SHA1)
+ public_key = request.public_key()
+ assert isinstance(public_key, rsa.RSAPublicKey)
+ subject = request.subject
+ assert isinstance(subject, x509.Name)
+ assert list(subject) == [
+ x509.NameAttribute(x509.OID_COUNTRY_NAME, 'US'),
+ x509.NameAttribute(x509.OID_STATE_OR_PROVINCE_NAME, 'Texas'),
+ x509.NameAttribute(x509.OID_LOCALITY_NAME, 'Austin'),
+ x509.NameAttribute(x509.OID_ORGANIZATION_NAME, 'PyCA'),
+ x509.NameAttribute(x509.OID_COMMON_NAME, 'cryptography.io'),
+ ]
+ basic_constraints = request.extensions.get_extension_for_oid(
+ x509.OID_BASIC_CONSTRAINTS
+ )
+ assert basic_constraints.value.ca is False
+ assert basic_constraints.value.path_length is None
+
+ def test_add_duplicate_extension(self, backend):
+ builder = x509.CertificateSigningRequestBuilder()
+ builder.add_extension(x509.Extension(
+ x509.OID_BASIC_CONSTRAINTS,
+ True,
+ x509.BasicConstraints(True, 2),
+ ))
+ with pytest.raises(ValueError):
+ builder.add_extension(x509.Extension(
+ x509.OID_BASIC_CONSTRAINTS,
+ True,
+ x509.BasicConstraints(True, 2),
+ ))
+
+ def test_add_invalid_extension(self, backend):
+ builder = x509.CertificateSigningRequestBuilder()
+ with pytest.raises(TypeError):
+ builder.add_extension('NotAnExtension')
+
+ def test_set_invalid_subject(self, backend):
+ builder = x509.CertificateSigningRequestBuilder()
+ with pytest.raises(TypeError):
+ builder.set_subject_name('NotAName')
+
+ def test_set_invalid_version(self, backend):
+ builder = x509.CertificateSigningRequestBuilder()
+ with pytest.raises(TypeError):
+ builder.set_version('NotAVersion')
+
+ def test_add_unsupported_extension(self, backend):
+ private_key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=backend,
+ )
+ builder = x509.CertificateSigningRequestBuilder()
+ builder.set_subject_name(x509.Name([
+ x509.NameAttribute(x509.OID_COUNTRY_NAME, u'US'),
+ x509.NameAttribute(x509.OID_STATE_OR_PROVINCE_NAME, u'Texas'),
+ x509.NameAttribute(x509.OID_LOCALITY_NAME, u'Austin'),
+ x509.NameAttribute(x509.OID_ORGANIZATION_NAME, u'PyCA'),
+ x509.NameAttribute(x509.OID_COMMON_NAME, u'cryptography.io'),
+ ]))
+ builder.add_extension(x509.Extension(
+ x509.ObjectIdentifier('1.2.3.4'),
+ False,
+ 'value',
+ ))
+ with pytest.raises(ValueError):
+ builder.sign(backend, private_key, hashes.SHA1())
+
+
@pytest.mark.requires_backend_interface(interface=DSABackend)
@pytest.mark.requires_backend_interface(interface=X509Backend)
class TestDSACertificate(object):