aboutsummaryrefslogtreecommitdiffstats
path: root/src/cryptography/hazmat/backends/openssl/backend.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/cryptography/hazmat/backends/openssl/backend.py')
-rw-r--r--src/cryptography/hazmat/backends/openssl/backend.py1036
1 files changed, 1036 insertions, 0 deletions
diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py
new file mode 100644
index 00000000..bb1a3f3d
--- /dev/null
+++ b/src/cryptography/hazmat/backends/openssl/backend.py
@@ -0,0 +1,1036 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
+import collections
+import itertools
+import warnings
+from contextlib import contextmanager
+
+import six
+
+from cryptography import utils
+from cryptography.exceptions import (
+ InternalError, UnsupportedAlgorithm, _Reasons
+)
+from cryptography.hazmat.backends.interfaces import (
+ CMACBackend, CipherBackend, DSABackend, EllipticCurveBackend, HMACBackend,
+ HashBackend, PBKDF2HMACBackend, PEMSerializationBackend,
+ PKCS8SerializationBackend, RSABackend,
+ TraditionalOpenSSLSerializationBackend
+)
+from cryptography.hazmat.backends.openssl.ciphers import (
+ _AESCTRCipherContext, _CipherContext
+)
+from cryptography.hazmat.backends.openssl.cmac import _CMACContext
+from cryptography.hazmat.backends.openssl.dsa import (
+ _DSAParameters, _DSAPrivateKey, _DSAPublicKey
+)
+from cryptography.hazmat.backends.openssl.ec import (
+ _EllipticCurvePrivateKey, _EllipticCurvePublicKey
+)
+from cryptography.hazmat.backends.openssl.hashes import _HashContext
+from cryptography.hazmat.backends.openssl.hmac import _HMACContext
+from cryptography.hazmat.backends.openssl.rsa import (
+ _RSAPrivateKey, _RSAPublicKey
+)
+from cryptography.hazmat.bindings.openssl.binding import Binding
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
+from cryptography.hazmat.primitives.asymmetric.padding import (
+ MGF1, OAEP, PKCS1v15, PSS
+)
+from cryptography.hazmat.primitives.ciphers.algorithms import (
+ AES, ARC4, Blowfish, CAST5, Camellia, IDEA, SEED, TripleDES
+)
+from cryptography.hazmat.primitives.ciphers.modes import (
+ CBC, CFB, CFB8, CTR, ECB, GCM, OFB
+)
+
+
+_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
+_OpenSSLError = collections.namedtuple("_OpenSSLError",
+ ["code", "lib", "func", "reason"])
+
+
+@utils.register_interface(CipherBackend)
+@utils.register_interface(CMACBackend)
+@utils.register_interface(DSABackend)
+@utils.register_interface(EllipticCurveBackend)
+@utils.register_interface(HashBackend)
+@utils.register_interface(HMACBackend)
+@utils.register_interface(PBKDF2HMACBackend)
+@utils.register_interface(PKCS8SerializationBackend)
+@utils.register_interface(RSABackend)
+@utils.register_interface(TraditionalOpenSSLSerializationBackend)
+@utils.register_interface(PEMSerializationBackend)
+class Backend(object):
+ """
+ OpenSSL API binding interfaces.
+ """
+ name = "openssl"
+
+ def __init__(self):
+ self._binding = Binding()
+ self._ffi = self._binding.ffi
+ self._lib = self._binding.lib
+
+ self._binding.init_static_locks()
+
+ # adds all ciphers/digests for EVP
+ self._lib.OpenSSL_add_all_algorithms()
+ # registers available SSL/TLS ciphers and digests
+ self._lib.SSL_library_init()
+ # loads error strings for libcrypto and libssl functions
+ self._lib.SSL_load_error_strings()
+
+ self._cipher_registry = {}
+ self._register_default_ciphers()
+ self.activate_osrandom_engine()
+
+ def activate_builtin_random(self):
+ # Obtain a new structural reference.
+ e = self._lib.ENGINE_get_default_RAND()
+ if e != self._ffi.NULL:
+ self._lib.ENGINE_unregister_RAND(e)
+ # Reset the RNG to use the new engine.
+ self._lib.RAND_cleanup()
+ # decrement the structural reference from get_default_RAND
+ res = self._lib.ENGINE_finish(e)
+ assert res == 1
+
+ def activate_osrandom_engine(self):
+ # Unregister and free the current engine.
+ self.activate_builtin_random()
+ # Fetches an engine by id and returns it. This creates a structural
+ # reference.
+ e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id)
+ assert e != self._ffi.NULL
+ # Initialize the engine for use. This adds a functional reference.
+ res = self._lib.ENGINE_init(e)
+ assert res == 1
+ # Set the engine as the default RAND provider.
+ res = self._lib.ENGINE_set_default_RAND(e)
+ assert res == 1
+ # Decrement the structural ref incremented by ENGINE_by_id.
+ res = self._lib.ENGINE_free(e)
+ assert res == 1
+ # Decrement the functional ref incremented by ENGINE_init.
+ res = self._lib.ENGINE_finish(e)
+ assert res == 1
+ # Reset the RNG to use the new engine.
+ self._lib.RAND_cleanup()
+
+ def openssl_version_text(self):
+ """
+ Friendly string name of the loaded OpenSSL library. This is not
+ necessarily the same version as it was compiled against.
+
+ Example: OpenSSL 1.0.1e 11 Feb 2013
+ """
+ return self._ffi.string(
+ self._lib.SSLeay_version(self._lib.SSLEAY_VERSION)
+ ).decode("ascii")
+
+ def create_hmac_ctx(self, key, algorithm):
+ return _HMACContext(self, key, algorithm)
+
+ def hash_supported(self, algorithm):
+ digest = self._lib.EVP_get_digestbyname(algorithm.name.encode("ascii"))
+ return digest != self._ffi.NULL
+
+ def hmac_supported(self, algorithm):
+ return self.hash_supported(algorithm)
+
+ def create_hash_ctx(self, algorithm):
+ return _HashContext(self, algorithm)
+
+ def cipher_supported(self, cipher, mode):
+ if self._evp_cipher_supported(cipher, mode):
+ return True
+ elif isinstance(mode, CTR) and isinstance(cipher, AES):
+ return True
+ else:
+ return False
+
+ def _evp_cipher_supported(self, cipher, mode):
+ try:
+ adapter = self._cipher_registry[type(cipher), type(mode)]
+ except KeyError:
+ return False
+ evp_cipher = adapter(self, cipher, mode)
+ return self._ffi.NULL != evp_cipher
+
+ def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
+ if (cipher_cls, mode_cls) in self._cipher_registry:
+ raise ValueError("Duplicate registration for: {0} {1}.".format(
+ cipher_cls, mode_cls)
+ )
+ self._cipher_registry[cipher_cls, mode_cls] = adapter
+
+ def _register_default_ciphers(self):
+ for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8]:
+ self.register_cipher_adapter(
+ AES,
+ mode_cls,
+ GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
+ )
+ for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
+ self.register_cipher_adapter(
+ Camellia,
+ mode_cls,
+ GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
+ )
+ for mode_cls in [CBC, CFB, CFB8, OFB]:
+ self.register_cipher_adapter(
+ TripleDES,
+ mode_cls,
+ GetCipherByName("des-ede3-{mode.name}")
+ )
+ self.register_cipher_adapter(
+ TripleDES,
+ ECB,
+ GetCipherByName("des-ede3")
+ )
+ for mode_cls in [CBC, CFB, OFB, ECB]:
+ self.register_cipher_adapter(
+ Blowfish,
+ mode_cls,
+ GetCipherByName("bf-{mode.name}")
+ )
+ for mode_cls in [CBC, CFB, OFB, ECB]:
+ self.register_cipher_adapter(
+ SEED,
+ mode_cls,
+ GetCipherByName("seed-{mode.name}")
+ )
+ for cipher_cls, mode_cls in itertools.product(
+ [CAST5, IDEA],
+ [CBC, OFB, CFB, ECB],
+ ):
+ self.register_cipher_adapter(
+ cipher_cls,
+ mode_cls,
+ GetCipherByName("{cipher.name}-{mode.name}")
+ )
+ self.register_cipher_adapter(
+ ARC4,
+ type(None),
+ GetCipherByName("rc4")
+ )
+ self.register_cipher_adapter(
+ AES,
+ GCM,
+ GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
+ )
+
+ def create_symmetric_encryption_ctx(self, cipher, mode):
+ if (isinstance(mode, CTR) and isinstance(cipher, AES)
+ and not self._evp_cipher_supported(cipher, mode)):
+ # This is needed to provide support for AES CTR mode in OpenSSL
+ # 0.9.8. It can be removed when we drop 0.9.8 support (RHEL 5
+ # extended life ends 2020).
+ return _AESCTRCipherContext(self, cipher, mode)
+ else:
+ return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
+
+ def create_symmetric_decryption_ctx(self, cipher, mode):
+ if (isinstance(mode, CTR) and isinstance(cipher, AES)
+ and not self._evp_cipher_supported(cipher, mode)):
+ # This is needed to provide support for AES CTR mode in OpenSSL
+ # 0.9.8. It can be removed when we drop 0.9.8 support (RHEL 5
+ # extended life ends 2020).
+ return _AESCTRCipherContext(self, cipher, mode)
+ else:
+ return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
+
+ def pbkdf2_hmac_supported(self, algorithm):
+ if self._lib.Cryptography_HAS_PBKDF2_HMAC:
+ return self.hmac_supported(algorithm)
+ else:
+ # OpenSSL < 1.0.0 has an explicit PBKDF2-HMAC-SHA1 function,
+ # so if the PBKDF2_HMAC function is missing we only support
+ # SHA1 via PBKDF2_HMAC_SHA1.
+ return isinstance(algorithm, hashes.SHA1)
+
+ def derive_pbkdf2_hmac(self, algorithm, length, salt, iterations,
+ key_material):
+ buf = self._ffi.new("char[]", length)
+ if self._lib.Cryptography_HAS_PBKDF2_HMAC:
+ evp_md = self._lib.EVP_get_digestbyname(
+ algorithm.name.encode("ascii"))
+ assert evp_md != self._ffi.NULL
+ res = self._lib.PKCS5_PBKDF2_HMAC(
+ key_material,
+ len(key_material),
+ salt,
+ len(salt),
+ iterations,
+ evp_md,
+ length,
+ buf
+ )
+ assert res == 1
+ else:
+ if not isinstance(algorithm, hashes.SHA1):
+ raise UnsupportedAlgorithm(
+ "This version of OpenSSL only supports PBKDF2HMAC with "
+ "SHA1.",
+ _Reasons.UNSUPPORTED_HASH
+ )
+ res = self._lib.PKCS5_PBKDF2_HMAC_SHA1(
+ key_material,
+ len(key_material),
+ salt,
+ len(salt),
+ iterations,
+ length,
+ buf
+ )
+ assert res == 1
+
+ return self._ffi.buffer(buf)[:]
+
+ def _err_string(self, code):
+ err_buf = self._ffi.new("char[]", 256)
+ self._lib.ERR_error_string_n(code, err_buf, 256)
+ return self._ffi.string(err_buf, 256)[:]
+
+ def _consume_errors(self):
+ errors = []
+ while True:
+ code = self._lib.ERR_get_error()
+ if code == 0:
+ break
+
+ lib = self._lib.ERR_GET_LIB(code)
+ func = self._lib.ERR_GET_FUNC(code)
+ reason = self._lib.ERR_GET_REASON(code)
+
+ errors.append(_OpenSSLError(code, lib, func, reason))
+ return errors
+
+ def _unknown_error(self, error):
+ return InternalError(
+ "Unknown error code {0} from OpenSSL, "
+ "you should probably file a bug. {1}.".format(
+ error.code, self._err_string(error.code)
+ )
+ )
+
+ def _bn_to_int(self, bn):
+ if six.PY3:
+ # Python 3 has constant time from_bytes, so use that.
+
+ bn_num_bytes = (self._lib.BN_num_bits(bn) + 7) // 8
+ bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
+ bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
+ assert bin_len > 0
+ assert bin_ptr != self._ffi.NULL
+ return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
+
+ else:
+ # Under Python 2 the best we can do is hex()
+
+ hex_cdata = self._lib.BN_bn2hex(bn)
+ assert hex_cdata != self._ffi.NULL
+ hex_str = self._ffi.string(hex_cdata)
+ self._lib.OPENSSL_free(hex_cdata)
+ return int(hex_str, 16)
+
+ def _int_to_bn(self, num, bn=None):
+ """
+ Converts a python integer to a BIGNUM. The returned BIGNUM will not
+ be garbage collected (to support adding them to structs that take
+ ownership of the object). Be sure to register it for GC if it will
+ be discarded after use.
+ """
+
+ if bn is None:
+ bn = self._ffi.NULL
+
+ if six.PY3:
+ # Python 3 has constant time to_bytes, so use that.
+
+ binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big")
+ bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn)
+ assert bn_ptr != self._ffi.NULL
+ return bn_ptr
+
+ else:
+ # Under Python 2 the best we can do is hex()
+
+ hex_num = hex(num).rstrip("L").lstrip("0x").encode("ascii") or b"0"
+ bn_ptr = self._ffi.new("BIGNUM **")
+ bn_ptr[0] = bn
+ res = self._lib.BN_hex2bn(bn_ptr, hex_num)
+ assert res != 0
+ assert bn_ptr[0] != self._ffi.NULL
+ return bn_ptr[0]
+
+ def generate_rsa_private_key(self, public_exponent, key_size):
+ rsa._verify_rsa_parameters(public_exponent, key_size)
+
+ rsa_cdata = self._lib.RSA_new()
+ assert rsa_cdata != self._ffi.NULL
+ rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
+
+ bn = self._int_to_bn(public_exponent)
+ bn = self._ffi.gc(bn, self._lib.BN_free)
+
+ res = self._lib.RSA_generate_key_ex(
+ rsa_cdata, key_size, bn, self._ffi.NULL
+ )
+ assert res == 1
+
+ return _RSAPrivateKey(self, rsa_cdata)
+
+ def generate_rsa_parameters_supported(self, public_exponent, key_size):
+ return (public_exponent >= 3 and public_exponent & 1 != 0 and
+ key_size >= 512)
+
+ def load_rsa_private_numbers(self, numbers):
+ rsa._check_private_key_components(
+ numbers.p,
+ numbers.q,
+ numbers.d,
+ numbers.dmp1,
+ numbers.dmq1,
+ numbers.iqmp,
+ numbers.public_numbers.e,
+ numbers.public_numbers.n
+ )
+ rsa_cdata = self._lib.RSA_new()
+ assert rsa_cdata != self._ffi.NULL
+ rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
+ rsa_cdata.p = self._int_to_bn(numbers.p)
+ rsa_cdata.q = self._int_to_bn(numbers.q)
+ rsa_cdata.d = self._int_to_bn(numbers.d)
+ rsa_cdata.dmp1 = self._int_to_bn(numbers.dmp1)
+ rsa_cdata.dmq1 = self._int_to_bn(numbers.dmq1)
+ rsa_cdata.iqmp = self._int_to_bn(numbers.iqmp)
+ rsa_cdata.e = self._int_to_bn(numbers.public_numbers.e)
+ rsa_cdata.n = self._int_to_bn(numbers.public_numbers.n)
+ res = self._lib.RSA_blinding_on(rsa_cdata, self._ffi.NULL)
+ assert res == 1
+
+ return _RSAPrivateKey(self, rsa_cdata)
+
+ def load_rsa_public_numbers(self, numbers):
+ rsa._check_public_key_components(numbers.e, numbers.n)
+ rsa_cdata = self._lib.RSA_new()
+ assert rsa_cdata != self._ffi.NULL
+ rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
+ rsa_cdata.e = self._int_to_bn(numbers.e)
+ rsa_cdata.n = self._int_to_bn(numbers.n)
+ res = self._lib.RSA_blinding_on(rsa_cdata, self._ffi.NULL)
+ assert res == 1
+
+ return _RSAPublicKey(self, rsa_cdata)
+
+ def _bytes_to_bio(self, data):
+ """
+ Return a _MemoryBIO namedtuple of (BIO, char*).
+
+ The char* is the storage for the BIO and it must stay alive until the
+ BIO is finished with.
+ """
+ data_char_p = self._ffi.new("char[]", data)
+ bio = self._lib.BIO_new_mem_buf(
+ data_char_p, len(data)
+ )
+ assert bio != self._ffi.NULL
+
+ return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_char_p)
+
+ def _evp_pkey_to_private_key(self, evp_pkey):
+ """
+ Return the appropriate type of PrivateKey given an evp_pkey cdata
+ pointer.
+ """
+
+ type = evp_pkey.type
+
+ if type == self._lib.EVP_PKEY_RSA:
+ rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
+ assert rsa_cdata != self._ffi.NULL
+ rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
+ return _RSAPrivateKey(self, rsa_cdata)
+ elif type == self._lib.EVP_PKEY_DSA:
+ dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
+ assert dsa_cdata != self._ffi.NULL
+ dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
+ return _DSAPrivateKey(self, dsa_cdata)
+ elif (self._lib.Cryptography_HAS_EC == 1 and
+ type == self._lib.EVP_PKEY_EC):
+ ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
+ assert ec_cdata != self._ffi.NULL
+ ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
+ return _EllipticCurvePrivateKey(self, ec_cdata)
+ else:
+ raise UnsupportedAlgorithm("Unsupported key type.")
+
+ def _evp_pkey_to_public_key(self, evp_pkey):
+ """
+ Return the appropriate type of PublicKey given an evp_pkey cdata
+ pointer.
+ """
+
+ type = evp_pkey.type
+
+ if type == self._lib.EVP_PKEY_RSA:
+ rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
+ assert rsa_cdata != self._ffi.NULL
+ rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
+ return _RSAPublicKey(self, rsa_cdata)
+ elif type == self._lib.EVP_PKEY_DSA:
+ dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
+ assert dsa_cdata != self._ffi.NULL
+ dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
+ return _DSAPublicKey(self, dsa_cdata)
+ elif (self._lib.Cryptography_HAS_EC == 1 and
+ type == self._lib.EVP_PKEY_EC):
+ ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
+ assert ec_cdata != self._ffi.NULL
+ ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
+ return _EllipticCurvePublicKey(self, ec_cdata)
+ else:
+ raise UnsupportedAlgorithm("Unsupported key type.")
+
+ def _pem_password_cb(self, password):
+ """
+ Generate a pem_password_cb function pointer that copied the password to
+ OpenSSL as required and returns the number of bytes copied.
+
+ typedef int pem_password_cb(char *buf, int size,
+ int rwflag, void *userdata);
+
+ Useful for decrypting PKCS8 files and so on.
+
+ Returns a tuple of (cdata function pointer, callback function).
+ """
+
+ def pem_password_cb(buf, size, writing, userdata):
+ pem_password_cb.called += 1
+
+ if not password:
+ pem_password_cb.exception = TypeError(
+ "Password was not given but private key is encrypted."
+ )
+ return 0
+ elif len(password) < size:
+ pw_buf = self._ffi.buffer(buf, size)
+ pw_buf[:len(password)] = password
+ return len(password)
+ else:
+ pem_password_cb.exception = ValueError(
+ "Passwords longer than {0} bytes are not supported "
+ "by this backend.".format(size - 1)
+ )
+ return 0
+
+ pem_password_cb.called = 0
+ pem_password_cb.exception = None
+
+ return (
+ self._ffi.callback("int (char *, int, int, void *)",
+ pem_password_cb),
+ pem_password_cb
+ )
+
+ def _mgf1_hash_supported(self, algorithm):
+ if self._lib.Cryptography_HAS_MGF1_MD:
+ return self.hash_supported(algorithm)
+ else:
+ return isinstance(algorithm, hashes.SHA1)
+
+ def rsa_padding_supported(self, padding):
+ if isinstance(padding, PKCS1v15):
+ return True
+ elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1):
+ return self._mgf1_hash_supported(padding._mgf._algorithm)
+ elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1):
+ return isinstance(padding._mgf._algorithm, hashes.SHA1)
+ else:
+ return False
+
+ def generate_dsa_parameters(self, key_size):
+ if key_size not in (1024, 2048, 3072):
+ raise ValueError(
+ "Key size must be 1024 or 2048 or 3072 bits.")
+
+ if (self._lib.OPENSSL_VERSION_NUMBER < 0x1000000f and
+ key_size > 1024):
+ raise ValueError(
+ "Key size must be 1024 because OpenSSL < 1.0.0 doesn't "
+ "support larger key sizes.")
+
+ ctx = self._lib.DSA_new()
+ assert ctx != self._ffi.NULL
+ ctx = self._ffi.gc(ctx, self._lib.DSA_free)
+
+ res = self._lib.DSA_generate_parameters_ex(
+ ctx, key_size, self._ffi.NULL, 0,
+ self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
+ )
+
+ assert res == 1
+
+ return _DSAParameters(self, ctx)
+
+ def generate_dsa_private_key(self, parameters):
+ ctx = self._lib.DSA_new()
+ assert ctx != self._ffi.NULL
+ ctx = self._ffi.gc(ctx, self._lib.DSA_free)
+ ctx.p = self._lib.BN_dup(parameters._dsa_cdata.p)
+ ctx.q = self._lib.BN_dup(parameters._dsa_cdata.q)
+ ctx.g = self._lib.BN_dup(parameters._dsa_cdata.g)
+
+ self._lib.DSA_generate_key(ctx)
+
+ return _DSAPrivateKey(self, ctx)
+
+ def generate_dsa_private_key_and_parameters(self, key_size):
+ parameters = self.generate_dsa_parameters(key_size)
+ return self.generate_dsa_private_key(parameters)
+
+ def load_dsa_private_numbers(self, numbers):
+ dsa._check_dsa_private_numbers(numbers)
+ parameter_numbers = numbers.public_numbers.parameter_numbers
+
+ dsa_cdata = self._lib.DSA_new()
+ assert dsa_cdata != self._ffi.NULL
+ dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
+
+ dsa_cdata.p = self._int_to_bn(parameter_numbers.p)
+ dsa_cdata.q = self._int_to_bn(parameter_numbers.q)
+ dsa_cdata.g = self._int_to_bn(parameter_numbers.g)
+ dsa_cdata.pub_key = self._int_to_bn(numbers.public_numbers.y)
+ dsa_cdata.priv_key = self._int_to_bn(numbers.x)
+
+ return _DSAPrivateKey(self, dsa_cdata)
+
+ def load_dsa_public_numbers(self, numbers):
+ dsa._check_dsa_parameters(numbers.parameter_numbers)
+ dsa_cdata = self._lib.DSA_new()
+ assert dsa_cdata != self._ffi.NULL
+ dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
+
+ dsa_cdata.p = self._int_to_bn(numbers.parameter_numbers.p)
+ dsa_cdata.q = self._int_to_bn(numbers.parameter_numbers.q)
+ dsa_cdata.g = self._int_to_bn(numbers.parameter_numbers.g)
+ dsa_cdata.pub_key = self._int_to_bn(numbers.y)
+
+ return _DSAPublicKey(self, dsa_cdata)
+
+ def load_dsa_parameter_numbers(self, numbers):
+ dsa._check_dsa_parameters(numbers)
+ dsa_cdata = self._lib.DSA_new()
+ assert dsa_cdata != self._ffi.NULL
+ dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
+
+ dsa_cdata.p = self._int_to_bn(numbers.p)
+ dsa_cdata.q = self._int_to_bn(numbers.q)
+ dsa_cdata.g = self._int_to_bn(numbers.g)
+
+ return _DSAParameters(self, dsa_cdata)
+
+ def dsa_hash_supported(self, algorithm):
+ if self._lib.OPENSSL_VERSION_NUMBER < 0x1000000f:
+ return isinstance(algorithm, hashes.SHA1)
+ else:
+ return self.hash_supported(algorithm)
+
+ def dsa_parameters_supported(self, p, q, g):
+ if self._lib.OPENSSL_VERSION_NUMBER < 0x1000000f:
+ return (utils.bit_length(p) <= 1024 and utils.bit_length(q) <= 160)
+ else:
+ return True
+
+ def cmac_algorithm_supported(self, algorithm):
+ return (
+ self._lib.Cryptography_HAS_CMAC == 1
+ and self.cipher_supported(algorithm, CBC(
+ b"\x00" * algorithm.block_size))
+ )
+
+ def create_cmac_ctx(self, algorithm):
+ return _CMACContext(self, algorithm)
+
+ def load_pem_private_key(self, data, password):
+ return self._load_key(
+ self._lib.PEM_read_bio_PrivateKey,
+ self._evp_pkey_to_private_key,
+ data,
+ password,
+ )
+
+ def load_pem_public_key(self, data):
+ return self._load_key(
+ self._lib.PEM_read_bio_PUBKEY,
+ self._evp_pkey_to_public_key,
+ data,
+ None,
+ )
+
+ def load_traditional_openssl_pem_private_key(self, data, password):
+ warnings.warn(
+ "load_traditional_openssl_pem_private_key is deprecated and will "
+ "be removed in a future version, use load_pem_private_key "
+ "instead.",
+ utils.DeprecatedIn06,
+ stacklevel=2
+ )
+ return self.load_pem_private_key(data, password)
+
+ def load_pkcs8_pem_private_key(self, data, password):
+ warnings.warn(
+ "load_pkcs8_pem_private_key is deprecated and will be removed in a"
+ " future version, use load_pem_private_key instead.",
+ utils.DeprecatedIn06,
+ stacklevel=2
+ )
+ return self.load_pem_private_key(data, password)
+
+ def _load_key(self, openssl_read_func, convert_func, data, password):
+ mem_bio = self._bytes_to_bio(data)
+
+ password_callback, password_func = self._pem_password_cb(password)
+
+ evp_pkey = openssl_read_func(
+ mem_bio.bio,
+ self._ffi.NULL,
+ password_callback,
+ self._ffi.NULL
+ )
+
+ if evp_pkey == self._ffi.NULL:
+ if password_func.exception is not None:
+ errors = self._consume_errors()
+ assert errors
+ raise password_func.exception
+ else:
+ self._handle_key_loading_error()
+
+ evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
+
+ if password is not None and password_func.called == 0:
+ raise TypeError(
+ "Password was given but private key is not encrypted.")
+
+ assert (
+ (password is not None and password_func.called == 1) or
+ password is None
+ )
+
+ return convert_func(evp_pkey)
+
+ def _handle_key_loading_error(self):
+ errors = self._consume_errors()
+
+ if not errors:
+ raise ValueError("Could not unserialize key data.")
+
+ elif errors[0][1:] in (
+ (
+ self._lib.ERR_LIB_EVP,
+ self._lib.EVP_F_EVP_DECRYPTFINAL_EX,
+ self._lib.EVP_R_BAD_DECRYPT
+ ),
+ (
+ self._lib.ERR_LIB_PKCS12,
+ self._lib.PKCS12_F_PKCS12_PBE_CRYPT,
+ self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR,
+ )
+ ):
+ raise ValueError("Bad decrypt. Incorrect password?")
+
+ elif errors[0][1:] in (
+ (
+ self._lib.ERR_LIB_PEM,
+ self._lib.PEM_F_PEM_GET_EVP_CIPHER_INFO,
+ self._lib.PEM_R_UNSUPPORTED_ENCRYPTION
+ ),
+
+ (
+ self._lib.ERR_LIB_EVP,
+ self._lib.EVP_F_EVP_PBE_CIPHERINIT,
+ self._lib.EVP_R_UNKNOWN_PBE_ALGORITHM
+ )
+ ):
+ raise UnsupportedAlgorithm(
+ "PEM data is encrypted with an unsupported cipher",
+ _Reasons.UNSUPPORTED_CIPHER
+ )
+
+ elif any(
+ error[1:] == (
+ self._lib.ERR_LIB_EVP,
+ self._lib.EVP_F_EVP_PKCS82PKEY,
+ self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM
+ )
+ for error in errors
+ ):
+ raise UnsupportedAlgorithm(
+ "Unsupported public key algorithm.",
+ _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
+ )
+
+ else:
+ assert errors[0][1] in (
+ self._lib.ERR_LIB_EVP,
+ self._lib.ERR_LIB_PEM,
+ self._lib.ERR_LIB_ASN1,
+ )
+ raise ValueError("Could not unserialize key data.")
+
+ def elliptic_curve_supported(self, curve):
+ if self._lib.Cryptography_HAS_EC != 1:
+ return False
+
+ try:
+ curve_nid = self._elliptic_curve_to_nid(curve)
+ except UnsupportedAlgorithm:
+ curve_nid = self._lib.NID_undef
+
+ ctx = self._lib.EC_GROUP_new_by_curve_name(curve_nid)
+
+ if ctx == self._ffi.NULL:
+ errors = self._consume_errors()
+ assert (
+ curve_nid == self._lib.NID_undef or
+ errors[0][1:] == (
+ self._lib.ERR_LIB_EC,
+ self._lib.EC_F_EC_GROUP_NEW_BY_CURVE_NAME,
+ self._lib.EC_R_UNKNOWN_GROUP
+ )
+ )
+ return False
+ else:
+ assert curve_nid != self._lib.NID_undef
+ self._lib.EC_GROUP_free(ctx)
+ return True
+
+ def elliptic_curve_signature_algorithm_supported(
+ self, signature_algorithm, curve
+ ):
+ if self._lib.Cryptography_HAS_EC != 1:
+ return False
+
+ # We only support ECDSA right now.
+ if not isinstance(signature_algorithm, ec.ECDSA):
+ return False
+
+ # Before 0.9.8m OpenSSL can't cope with digests longer than the curve.
+ if (
+ self._lib.OPENSSL_VERSION_NUMBER < 0x009080df and
+ curve.key_size < signature_algorithm.algorithm.digest_size * 8
+ ):
+ return False
+
+ return self.elliptic_curve_supported(curve)
+
+ def generate_elliptic_curve_private_key(self, curve):
+ """
+ Generate a new private key on the named curve.
+ """
+
+ if self.elliptic_curve_supported(curve):
+ curve_nid = self._elliptic_curve_to_nid(curve)
+
+ ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
+ assert ec_cdata != self._ffi.NULL
+ ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
+
+ res = self._lib.EC_KEY_generate_key(ec_cdata)
+ assert res == 1
+
+ res = self._lib.EC_KEY_check_key(ec_cdata)
+ assert res == 1
+
+ return _EllipticCurvePrivateKey(self, ec_cdata)
+ else:
+ raise UnsupportedAlgorithm(
+ "Backend object does not support {0}.".format(curve.name),
+ _Reasons.UNSUPPORTED_ELLIPTIC_CURVE
+ )
+
+ def elliptic_curve_private_key_from_numbers(self, numbers):
+ warnings.warn(
+ "elliptic_curve_private_key_from_numbers is deprecated and will "
+ "be removed in a future version.",
+ utils.DeprecatedIn06,
+ stacklevel=2
+ )
+ return self.load_elliptic_curve_private_numbers(numbers)
+
+ def load_elliptic_curve_private_numbers(self, numbers):
+ public = numbers.public_numbers
+
+ curve_nid = self._elliptic_curve_to_nid(public.curve)
+
+ ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
+ assert ec_cdata != self._ffi.NULL
+ ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
+
+ ec_cdata = self._ec_key_set_public_key_affine_coordinates(
+ ec_cdata, public.x, public.y)
+
+ res = self._lib.EC_KEY_set_private_key(
+ ec_cdata, self._int_to_bn(numbers.private_value))
+ assert res == 1
+
+ return _EllipticCurvePrivateKey(self, ec_cdata)
+
+ def elliptic_curve_public_key_from_numbers(self, numbers):
+ warnings.warn(
+ "elliptic_curve_public_key_from_numbers is deprecated and will be "
+ "removed in a future version.",
+ utils.DeprecatedIn06,
+ stacklevel=2
+ )
+ return self.load_elliptic_curve_public_numbers(numbers)
+
+ def load_elliptic_curve_public_numbers(self, numbers):
+ curve_nid = self._elliptic_curve_to_nid(numbers.curve)
+
+ ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
+ assert ec_cdata != self._ffi.NULL
+ ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
+
+ ec_cdata = self._ec_key_set_public_key_affine_coordinates(
+ ec_cdata, numbers.x, numbers.y)
+
+ return _EllipticCurvePublicKey(self, ec_cdata)
+
+ def _elliptic_curve_to_nid(self, curve):
+ """
+ Get the NID for a curve name.
+ """
+
+ curve_aliases = {
+ "secp192r1": "prime192v1",
+ "secp256r1": "prime256v1"
+ }
+
+ curve_name = curve_aliases.get(curve.name, curve.name)
+
+ curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
+ if curve_nid == self._lib.NID_undef:
+ raise UnsupportedAlgorithm(
+ "{0} is not a supported elliptic curve".format(curve.name),
+ _Reasons.UNSUPPORTED_ELLIPTIC_CURVE
+ )
+ return curve_nid
+
+ @contextmanager
+ def _tmp_bn_ctx(self):
+ bn_ctx = self._lib.BN_CTX_new()
+ assert bn_ctx != self._ffi.NULL
+ bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free)
+ self._lib.BN_CTX_start(bn_ctx)
+ try:
+ yield bn_ctx
+ finally:
+ self._lib.BN_CTX_end(bn_ctx)
+
+ def _ec_key_determine_group_get_set_funcs(self, ctx):
+ """
+ Given an EC_KEY determine the group and what methods are required to
+ get/set point coordinates.
+ """
+ assert ctx != self._ffi.NULL
+
+ nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field")
+ assert nid_two_field != self._lib.NID_undef
+
+ group = self._lib.EC_KEY_get0_group(ctx)
+ assert group != self._ffi.NULL
+
+ method = self._lib.EC_GROUP_method_of(group)
+ assert method != self._ffi.NULL
+
+ nid = self._lib.EC_METHOD_get_field_type(method)
+ assert nid != self._lib.NID_undef
+
+ if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M:
+ set_func = self._lib.EC_POINT_set_affine_coordinates_GF2m
+ get_func = self._lib.EC_POINT_get_affine_coordinates_GF2m
+ else:
+ set_func = self._lib.EC_POINT_set_affine_coordinates_GFp
+ get_func = self._lib.EC_POINT_get_affine_coordinates_GFp
+
+ assert set_func and get_func
+
+ return set_func, get_func, group
+
+ def _ec_key_set_public_key_affine_coordinates(self, ctx, x, y):
+ """
+ This is a port of EC_KEY_set_public_key_affine_coordinates that was
+ added in 1.0.1.
+
+ Sets the public key point in the EC_KEY context to the affine x and y
+ values.
+ """
+
+ if x < 0 or y < 0:
+ raise ValueError(
+ "Invalid EC key. Both x and y must be non-negative."
+ )
+
+ bn_x = self._int_to_bn(x)
+ bn_y = self._int_to_bn(y)
+
+ set_func, get_func, group = (
+ self._ec_key_determine_group_get_set_funcs(ctx)
+ )
+
+ point = self._lib.EC_POINT_new(group)
+ assert point != self._ffi.NULL
+ point = self._ffi.gc(point, self._lib.EC_POINT_free)
+
+ with self._tmp_bn_ctx() as bn_ctx:
+ check_x = self._lib.BN_CTX_get(bn_ctx)
+ check_y = self._lib.BN_CTX_get(bn_ctx)
+
+ res = set_func(group, point, bn_x, bn_y, bn_ctx)
+ assert res == 1
+
+ res = get_func(group, point, check_x, check_y, bn_ctx)
+ assert res == 1
+
+ assert self._lib.BN_cmp(bn_x, check_x) == 0
+ assert self._lib.BN_cmp(bn_y, check_y) == 0
+
+ res = self._lib.EC_KEY_set_public_key(ctx, point)
+ assert res == 1
+
+ res = self._lib.EC_KEY_check_key(ctx)
+ if res != 1:
+ self._consume_errors()
+ raise ValueError("Invalid EC key.")
+
+ return ctx
+
+
+class GetCipherByName(object):
+ def __init__(self, fmt):
+ self._fmt = fmt
+
+ def __call__(self, backend, cipher, mode):
+ cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
+ return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
+
+
+backend = Backend()