diff options
Diffstat (limited to 'src/cryptography/hazmat/backends/openssl/backend.py')
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/backend.py | 1036 |
1 files changed, 1036 insertions, 0 deletions
diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py new file mode 100644 index 00000000..bb1a3f3d --- /dev/null +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -0,0 +1,1036 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function + +import collections +import itertools +import warnings +from contextlib import contextmanager + +import six + +from cryptography import utils +from cryptography.exceptions import ( + InternalError, UnsupportedAlgorithm, _Reasons +) +from cryptography.hazmat.backends.interfaces import ( + CMACBackend, CipherBackend, DSABackend, EllipticCurveBackend, HMACBackend, + HashBackend, PBKDF2HMACBackend, PEMSerializationBackend, + PKCS8SerializationBackend, RSABackend, + TraditionalOpenSSLSerializationBackend +) +from cryptography.hazmat.backends.openssl.ciphers import ( + _AESCTRCipherContext, _CipherContext +) +from cryptography.hazmat.backends.openssl.cmac import _CMACContext +from cryptography.hazmat.backends.openssl.dsa import ( + _DSAParameters, _DSAPrivateKey, _DSAPublicKey +) +from cryptography.hazmat.backends.openssl.ec import ( + _EllipticCurvePrivateKey, _EllipticCurvePublicKey +) +from cryptography.hazmat.backends.openssl.hashes import _HashContext +from cryptography.hazmat.backends.openssl.hmac import _HMACContext +from cryptography.hazmat.backends.openssl.rsa import ( + _RSAPrivateKey, _RSAPublicKey +) +from cryptography.hazmat.bindings.openssl.binding import Binding +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa +from cryptography.hazmat.primitives.asymmetric.padding import ( + MGF1, OAEP, PKCS1v15, PSS +) +from cryptography.hazmat.primitives.ciphers.algorithms import ( + AES, ARC4, Blowfish, CAST5, Camellia, IDEA, SEED, TripleDES +) +from cryptography.hazmat.primitives.ciphers.modes import ( + CBC, CFB, CFB8, CTR, ECB, GCM, OFB +) + + +_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) +_OpenSSLError = collections.namedtuple("_OpenSSLError", + ["code", "lib", "func", "reason"]) + + +@utils.register_interface(CipherBackend) +@utils.register_interface(CMACBackend) +@utils.register_interface(DSABackend) +@utils.register_interface(EllipticCurveBackend) +@utils.register_interface(HashBackend) +@utils.register_interface(HMACBackend) +@utils.register_interface(PBKDF2HMACBackend) +@utils.register_interface(PKCS8SerializationBackend) +@utils.register_interface(RSABackend) +@utils.register_interface(TraditionalOpenSSLSerializationBackend) +@utils.register_interface(PEMSerializationBackend) +class Backend(object): + """ + OpenSSL API binding interfaces. + """ + name = "openssl" + + def __init__(self): + self._binding = Binding() + self._ffi = self._binding.ffi + self._lib = self._binding.lib + + self._binding.init_static_locks() + + # adds all ciphers/digests for EVP + self._lib.OpenSSL_add_all_algorithms() + # registers available SSL/TLS ciphers and digests + self._lib.SSL_library_init() + # loads error strings for libcrypto and libssl functions + self._lib.SSL_load_error_strings() + + self._cipher_registry = {} + self._register_default_ciphers() + self.activate_osrandom_engine() + + def activate_builtin_random(self): + # Obtain a new structural reference. + e = self._lib.ENGINE_get_default_RAND() + if e != self._ffi.NULL: + self._lib.ENGINE_unregister_RAND(e) + # Reset the RNG to use the new engine. + self._lib.RAND_cleanup() + # decrement the structural reference from get_default_RAND + res = self._lib.ENGINE_finish(e) + assert res == 1 + + def activate_osrandom_engine(self): + # Unregister and free the current engine. + self.activate_builtin_random() + # Fetches an engine by id and returns it. This creates a structural + # reference. + e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id) + assert e != self._ffi.NULL + # Initialize the engine for use. This adds a functional reference. + res = self._lib.ENGINE_init(e) + assert res == 1 + # Set the engine as the default RAND provider. + res = self._lib.ENGINE_set_default_RAND(e) + assert res == 1 + # Decrement the structural ref incremented by ENGINE_by_id. + res = self._lib.ENGINE_free(e) + assert res == 1 + # Decrement the functional ref incremented by ENGINE_init. + res = self._lib.ENGINE_finish(e) + assert res == 1 + # Reset the RNG to use the new engine. + self._lib.RAND_cleanup() + + def openssl_version_text(self): + """ + Friendly string name of the loaded OpenSSL library. This is not + necessarily the same version as it was compiled against. + + Example: OpenSSL 1.0.1e 11 Feb 2013 + """ + return self._ffi.string( + self._lib.SSLeay_version(self._lib.SSLEAY_VERSION) + ).decode("ascii") + + def create_hmac_ctx(self, key, algorithm): + return _HMACContext(self, key, algorithm) + + def hash_supported(self, algorithm): + digest = self._lib.EVP_get_digestbyname(algorithm.name.encode("ascii")) + return digest != self._ffi.NULL + + def hmac_supported(self, algorithm): + return self.hash_supported(algorithm) + + def create_hash_ctx(self, algorithm): + return _HashContext(self, algorithm) + + def cipher_supported(self, cipher, mode): + if self._evp_cipher_supported(cipher, mode): + return True + elif isinstance(mode, CTR) and isinstance(cipher, AES): + return True + else: + return False + + def _evp_cipher_supported(self, cipher, mode): + try: + adapter = self._cipher_registry[type(cipher), type(mode)] + except KeyError: + return False + evp_cipher = adapter(self, cipher, mode) + return self._ffi.NULL != evp_cipher + + def register_cipher_adapter(self, cipher_cls, mode_cls, adapter): + if (cipher_cls, mode_cls) in self._cipher_registry: + raise ValueError("Duplicate registration for: {0} {1}.".format( + cipher_cls, mode_cls) + ) + self._cipher_registry[cipher_cls, mode_cls] = adapter + + def _register_default_ciphers(self): + for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8]: + self.register_cipher_adapter( + AES, + mode_cls, + GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}") + ) + for mode_cls in [CBC, CTR, ECB, OFB, CFB]: + self.register_cipher_adapter( + Camellia, + mode_cls, + GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}") + ) + for mode_cls in [CBC, CFB, CFB8, OFB]: + self.register_cipher_adapter( + TripleDES, + mode_cls, + GetCipherByName("des-ede3-{mode.name}") + ) + self.register_cipher_adapter( + TripleDES, + ECB, + GetCipherByName("des-ede3") + ) + for mode_cls in [CBC, CFB, OFB, ECB]: + self.register_cipher_adapter( + Blowfish, + mode_cls, + GetCipherByName("bf-{mode.name}") + ) + for mode_cls in [CBC, CFB, OFB, ECB]: + self.register_cipher_adapter( + SEED, + mode_cls, + GetCipherByName("seed-{mode.name}") + ) + for cipher_cls, mode_cls in itertools.product( + [CAST5, IDEA], + [CBC, OFB, CFB, ECB], + ): + self.register_cipher_adapter( + cipher_cls, + mode_cls, + GetCipherByName("{cipher.name}-{mode.name}") + ) + self.register_cipher_adapter( + ARC4, + type(None), + GetCipherByName("rc4") + ) + self.register_cipher_adapter( + AES, + GCM, + GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}") + ) + + def create_symmetric_encryption_ctx(self, cipher, mode): + if (isinstance(mode, CTR) and isinstance(cipher, AES) + and not self._evp_cipher_supported(cipher, mode)): + # This is needed to provide support for AES CTR mode in OpenSSL + # 0.9.8. It can be removed when we drop 0.9.8 support (RHEL 5 + # extended life ends 2020). + return _AESCTRCipherContext(self, cipher, mode) + else: + return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT) + + def create_symmetric_decryption_ctx(self, cipher, mode): + if (isinstance(mode, CTR) and isinstance(cipher, AES) + and not self._evp_cipher_supported(cipher, mode)): + # This is needed to provide support for AES CTR mode in OpenSSL + # 0.9.8. It can be removed when we drop 0.9.8 support (RHEL 5 + # extended life ends 2020). + return _AESCTRCipherContext(self, cipher, mode) + else: + return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT) + + def pbkdf2_hmac_supported(self, algorithm): + if self._lib.Cryptography_HAS_PBKDF2_HMAC: + return self.hmac_supported(algorithm) + else: + # OpenSSL < 1.0.0 has an explicit PBKDF2-HMAC-SHA1 function, + # so if the PBKDF2_HMAC function is missing we only support + # SHA1 via PBKDF2_HMAC_SHA1. + return isinstance(algorithm, hashes.SHA1) + + def derive_pbkdf2_hmac(self, algorithm, length, salt, iterations, + key_material): + buf = self._ffi.new("char[]", length) + if self._lib.Cryptography_HAS_PBKDF2_HMAC: + evp_md = self._lib.EVP_get_digestbyname( + algorithm.name.encode("ascii")) + assert evp_md != self._ffi.NULL + res = self._lib.PKCS5_PBKDF2_HMAC( + key_material, + len(key_material), + salt, + len(salt), + iterations, + evp_md, + length, + buf + ) + assert res == 1 + else: + if not isinstance(algorithm, hashes.SHA1): + raise UnsupportedAlgorithm( + "This version of OpenSSL only supports PBKDF2HMAC with " + "SHA1.", + _Reasons.UNSUPPORTED_HASH + ) + res = self._lib.PKCS5_PBKDF2_HMAC_SHA1( + key_material, + len(key_material), + salt, + len(salt), + iterations, + length, + buf + ) + assert res == 1 + + return self._ffi.buffer(buf)[:] + + def _err_string(self, code): + err_buf = self._ffi.new("char[]", 256) + self._lib.ERR_error_string_n(code, err_buf, 256) + return self._ffi.string(err_buf, 256)[:] + + def _consume_errors(self): + errors = [] + while True: + code = self._lib.ERR_get_error() + if code == 0: + break + + lib = self._lib.ERR_GET_LIB(code) + func = self._lib.ERR_GET_FUNC(code) + reason = self._lib.ERR_GET_REASON(code) + + errors.append(_OpenSSLError(code, lib, func, reason)) + return errors + + def _unknown_error(self, error): + return InternalError( + "Unknown error code {0} from OpenSSL, " + "you should probably file a bug. {1}.".format( + error.code, self._err_string(error.code) + ) + ) + + def _bn_to_int(self, bn): + if six.PY3: + # Python 3 has constant time from_bytes, so use that. + + bn_num_bytes = (self._lib.BN_num_bits(bn) + 7) // 8 + bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes) + bin_len = self._lib.BN_bn2bin(bn, bin_ptr) + assert bin_len > 0 + assert bin_ptr != self._ffi.NULL + return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big") + + else: + # Under Python 2 the best we can do is hex() + + hex_cdata = self._lib.BN_bn2hex(bn) + assert hex_cdata != self._ffi.NULL + hex_str = self._ffi.string(hex_cdata) + self._lib.OPENSSL_free(hex_cdata) + return int(hex_str, 16) + + def _int_to_bn(self, num, bn=None): + """ + Converts a python integer to a BIGNUM. The returned BIGNUM will not + be garbage collected (to support adding them to structs that take + ownership of the object). Be sure to register it for GC if it will + be discarded after use. + """ + + if bn is None: + bn = self._ffi.NULL + + if six.PY3: + # Python 3 has constant time to_bytes, so use that. + + binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big") + bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn) + assert bn_ptr != self._ffi.NULL + return bn_ptr + + else: + # Under Python 2 the best we can do is hex() + + hex_num = hex(num).rstrip("L").lstrip("0x").encode("ascii") or b"0" + bn_ptr = self._ffi.new("BIGNUM **") + bn_ptr[0] = bn + res = self._lib.BN_hex2bn(bn_ptr, hex_num) + assert res != 0 + assert bn_ptr[0] != self._ffi.NULL + return bn_ptr[0] + + def generate_rsa_private_key(self, public_exponent, key_size): + rsa._verify_rsa_parameters(public_exponent, key_size) + + rsa_cdata = self._lib.RSA_new() + assert rsa_cdata != self._ffi.NULL + rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) + + bn = self._int_to_bn(public_exponent) + bn = self._ffi.gc(bn, self._lib.BN_free) + + res = self._lib.RSA_generate_key_ex( + rsa_cdata, key_size, bn, self._ffi.NULL + ) + assert res == 1 + + return _RSAPrivateKey(self, rsa_cdata) + + def generate_rsa_parameters_supported(self, public_exponent, key_size): + return (public_exponent >= 3 and public_exponent & 1 != 0 and + key_size >= 512) + + def load_rsa_private_numbers(self, numbers): + rsa._check_private_key_components( + numbers.p, + numbers.q, + numbers.d, + numbers.dmp1, + numbers.dmq1, + numbers.iqmp, + numbers.public_numbers.e, + numbers.public_numbers.n + ) + rsa_cdata = self._lib.RSA_new() + assert rsa_cdata != self._ffi.NULL + rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) + rsa_cdata.p = self._int_to_bn(numbers.p) + rsa_cdata.q = self._int_to_bn(numbers.q) + rsa_cdata.d = self._int_to_bn(numbers.d) + rsa_cdata.dmp1 = self._int_to_bn(numbers.dmp1) + rsa_cdata.dmq1 = self._int_to_bn(numbers.dmq1) + rsa_cdata.iqmp = self._int_to_bn(numbers.iqmp) + rsa_cdata.e = self._int_to_bn(numbers.public_numbers.e) + rsa_cdata.n = self._int_to_bn(numbers.public_numbers.n) + res = self._lib.RSA_blinding_on(rsa_cdata, self._ffi.NULL) + assert res == 1 + + return _RSAPrivateKey(self, rsa_cdata) + + def load_rsa_public_numbers(self, numbers): + rsa._check_public_key_components(numbers.e, numbers.n) + rsa_cdata = self._lib.RSA_new() + assert rsa_cdata != self._ffi.NULL + rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) + rsa_cdata.e = self._int_to_bn(numbers.e) + rsa_cdata.n = self._int_to_bn(numbers.n) + res = self._lib.RSA_blinding_on(rsa_cdata, self._ffi.NULL) + assert res == 1 + + return _RSAPublicKey(self, rsa_cdata) + + def _bytes_to_bio(self, data): + """ + Return a _MemoryBIO namedtuple of (BIO, char*). + + The char* is the storage for the BIO and it must stay alive until the + BIO is finished with. + """ + data_char_p = self._ffi.new("char[]", data) + bio = self._lib.BIO_new_mem_buf( + data_char_p, len(data) + ) + assert bio != self._ffi.NULL + + return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_char_p) + + def _evp_pkey_to_private_key(self, evp_pkey): + """ + Return the appropriate type of PrivateKey given an evp_pkey cdata + pointer. + """ + + type = evp_pkey.type + + if type == self._lib.EVP_PKEY_RSA: + rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) + assert rsa_cdata != self._ffi.NULL + rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) + return _RSAPrivateKey(self, rsa_cdata) + elif type == self._lib.EVP_PKEY_DSA: + dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) + assert dsa_cdata != self._ffi.NULL + dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) + return _DSAPrivateKey(self, dsa_cdata) + elif (self._lib.Cryptography_HAS_EC == 1 and + type == self._lib.EVP_PKEY_EC): + ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) + assert ec_cdata != self._ffi.NULL + ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) + return _EllipticCurvePrivateKey(self, ec_cdata) + else: + raise UnsupportedAlgorithm("Unsupported key type.") + + def _evp_pkey_to_public_key(self, evp_pkey): + """ + Return the appropriate type of PublicKey given an evp_pkey cdata + pointer. + """ + + type = evp_pkey.type + + if type == self._lib.EVP_PKEY_RSA: + rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) + assert rsa_cdata != self._ffi.NULL + rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) + return _RSAPublicKey(self, rsa_cdata) + elif type == self._lib.EVP_PKEY_DSA: + dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) + assert dsa_cdata != self._ffi.NULL + dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) + return _DSAPublicKey(self, dsa_cdata) + elif (self._lib.Cryptography_HAS_EC == 1 and + type == self._lib.EVP_PKEY_EC): + ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) + assert ec_cdata != self._ffi.NULL + ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) + return _EllipticCurvePublicKey(self, ec_cdata) + else: + raise UnsupportedAlgorithm("Unsupported key type.") + + def _pem_password_cb(self, password): + """ + Generate a pem_password_cb function pointer that copied the password to + OpenSSL as required and returns the number of bytes copied. + + typedef int pem_password_cb(char *buf, int size, + int rwflag, void *userdata); + + Useful for decrypting PKCS8 files and so on. + + Returns a tuple of (cdata function pointer, callback function). + """ + + def pem_password_cb(buf, size, writing, userdata): + pem_password_cb.called += 1 + + if not password: + pem_password_cb.exception = TypeError( + "Password was not given but private key is encrypted." + ) + return 0 + elif len(password) < size: + pw_buf = self._ffi.buffer(buf, size) + pw_buf[:len(password)] = password + return len(password) + else: + pem_password_cb.exception = ValueError( + "Passwords longer than {0} bytes are not supported " + "by this backend.".format(size - 1) + ) + return 0 + + pem_password_cb.called = 0 + pem_password_cb.exception = None + + return ( + self._ffi.callback("int (char *, int, int, void *)", + pem_password_cb), + pem_password_cb + ) + + def _mgf1_hash_supported(self, algorithm): + if self._lib.Cryptography_HAS_MGF1_MD: + return self.hash_supported(algorithm) + else: + return isinstance(algorithm, hashes.SHA1) + + def rsa_padding_supported(self, padding): + if isinstance(padding, PKCS1v15): + return True + elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1): + return self._mgf1_hash_supported(padding._mgf._algorithm) + elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1): + return isinstance(padding._mgf._algorithm, hashes.SHA1) + else: + return False + + def generate_dsa_parameters(self, key_size): + if key_size not in (1024, 2048, 3072): + raise ValueError( + "Key size must be 1024 or 2048 or 3072 bits.") + + if (self._lib.OPENSSL_VERSION_NUMBER < 0x1000000f and + key_size > 1024): + raise ValueError( + "Key size must be 1024 because OpenSSL < 1.0.0 doesn't " + "support larger key sizes.") + + ctx = self._lib.DSA_new() + assert ctx != self._ffi.NULL + ctx = self._ffi.gc(ctx, self._lib.DSA_free) + + res = self._lib.DSA_generate_parameters_ex( + ctx, key_size, self._ffi.NULL, 0, + self._ffi.NULL, self._ffi.NULL, self._ffi.NULL + ) + + assert res == 1 + + return _DSAParameters(self, ctx) + + def generate_dsa_private_key(self, parameters): + ctx = self._lib.DSA_new() + assert ctx != self._ffi.NULL + ctx = self._ffi.gc(ctx, self._lib.DSA_free) + ctx.p = self._lib.BN_dup(parameters._dsa_cdata.p) + ctx.q = self._lib.BN_dup(parameters._dsa_cdata.q) + ctx.g = self._lib.BN_dup(parameters._dsa_cdata.g) + + self._lib.DSA_generate_key(ctx) + + return _DSAPrivateKey(self, ctx) + + def generate_dsa_private_key_and_parameters(self, key_size): + parameters = self.generate_dsa_parameters(key_size) + return self.generate_dsa_private_key(parameters) + + def load_dsa_private_numbers(self, numbers): + dsa._check_dsa_private_numbers(numbers) + parameter_numbers = numbers.public_numbers.parameter_numbers + + dsa_cdata = self._lib.DSA_new() + assert dsa_cdata != self._ffi.NULL + dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) + + dsa_cdata.p = self._int_to_bn(parameter_numbers.p) + dsa_cdata.q = self._int_to_bn(parameter_numbers.q) + dsa_cdata.g = self._int_to_bn(parameter_numbers.g) + dsa_cdata.pub_key = self._int_to_bn(numbers.public_numbers.y) + dsa_cdata.priv_key = self._int_to_bn(numbers.x) + + return _DSAPrivateKey(self, dsa_cdata) + + def load_dsa_public_numbers(self, numbers): + dsa._check_dsa_parameters(numbers.parameter_numbers) + dsa_cdata = self._lib.DSA_new() + assert dsa_cdata != self._ffi.NULL + dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) + + dsa_cdata.p = self._int_to_bn(numbers.parameter_numbers.p) + dsa_cdata.q = self._int_to_bn(numbers.parameter_numbers.q) + dsa_cdata.g = self._int_to_bn(numbers.parameter_numbers.g) + dsa_cdata.pub_key = self._int_to_bn(numbers.y) + + return _DSAPublicKey(self, dsa_cdata) + + def load_dsa_parameter_numbers(self, numbers): + dsa._check_dsa_parameters(numbers) + dsa_cdata = self._lib.DSA_new() + assert dsa_cdata != self._ffi.NULL + dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) + + dsa_cdata.p = self._int_to_bn(numbers.p) + dsa_cdata.q = self._int_to_bn(numbers.q) + dsa_cdata.g = self._int_to_bn(numbers.g) + + return _DSAParameters(self, dsa_cdata) + + def dsa_hash_supported(self, algorithm): + if self._lib.OPENSSL_VERSION_NUMBER < 0x1000000f: + return isinstance(algorithm, hashes.SHA1) + else: + return self.hash_supported(algorithm) + + def dsa_parameters_supported(self, p, q, g): + if self._lib.OPENSSL_VERSION_NUMBER < 0x1000000f: + return (utils.bit_length(p) <= 1024 and utils.bit_length(q) <= 160) + else: + return True + + def cmac_algorithm_supported(self, algorithm): + return ( + self._lib.Cryptography_HAS_CMAC == 1 + and self.cipher_supported(algorithm, CBC( + b"\x00" * algorithm.block_size)) + ) + + def create_cmac_ctx(self, algorithm): + return _CMACContext(self, algorithm) + + def load_pem_private_key(self, data, password): + return self._load_key( + self._lib.PEM_read_bio_PrivateKey, + self._evp_pkey_to_private_key, + data, + password, + ) + + def load_pem_public_key(self, data): + return self._load_key( + self._lib.PEM_read_bio_PUBKEY, + self._evp_pkey_to_public_key, + data, + None, + ) + + def load_traditional_openssl_pem_private_key(self, data, password): + warnings.warn( + "load_traditional_openssl_pem_private_key is deprecated and will " + "be removed in a future version, use load_pem_private_key " + "instead.", + utils.DeprecatedIn06, + stacklevel=2 + ) + return self.load_pem_private_key(data, password) + + def load_pkcs8_pem_private_key(self, data, password): + warnings.warn( + "load_pkcs8_pem_private_key is deprecated and will be removed in a" + " future version, use load_pem_private_key instead.", + utils.DeprecatedIn06, + stacklevel=2 + ) + return self.load_pem_private_key(data, password) + + def _load_key(self, openssl_read_func, convert_func, data, password): + mem_bio = self._bytes_to_bio(data) + + password_callback, password_func = self._pem_password_cb(password) + + evp_pkey = openssl_read_func( + mem_bio.bio, + self._ffi.NULL, + password_callback, + self._ffi.NULL + ) + + if evp_pkey == self._ffi.NULL: + if password_func.exception is not None: + errors = self._consume_errors() + assert errors + raise password_func.exception + else: + self._handle_key_loading_error() + + evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) + + if password is not None and password_func.called == 0: + raise TypeError( + "Password was given but private key is not encrypted.") + + assert ( + (password is not None and password_func.called == 1) or + password is None + ) + + return convert_func(evp_pkey) + + def _handle_key_loading_error(self): + errors = self._consume_errors() + + if not errors: + raise ValueError("Could not unserialize key data.") + + elif errors[0][1:] in ( + ( + self._lib.ERR_LIB_EVP, + self._lib.EVP_F_EVP_DECRYPTFINAL_EX, + self._lib.EVP_R_BAD_DECRYPT + ), + ( + self._lib.ERR_LIB_PKCS12, + self._lib.PKCS12_F_PKCS12_PBE_CRYPT, + self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR, + ) + ): + raise ValueError("Bad decrypt. Incorrect password?") + + elif errors[0][1:] in ( + ( + self._lib.ERR_LIB_PEM, + self._lib.PEM_F_PEM_GET_EVP_CIPHER_INFO, + self._lib.PEM_R_UNSUPPORTED_ENCRYPTION + ), + + ( + self._lib.ERR_LIB_EVP, + self._lib.EVP_F_EVP_PBE_CIPHERINIT, + self._lib.EVP_R_UNKNOWN_PBE_ALGORITHM + ) + ): + raise UnsupportedAlgorithm( + "PEM data is encrypted with an unsupported cipher", + _Reasons.UNSUPPORTED_CIPHER + ) + + elif any( + error[1:] == ( + self._lib.ERR_LIB_EVP, + self._lib.EVP_F_EVP_PKCS82PKEY, + self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM + ) + for error in errors + ): + raise UnsupportedAlgorithm( + "Unsupported public key algorithm.", + _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM + ) + + else: + assert errors[0][1] in ( + self._lib.ERR_LIB_EVP, + self._lib.ERR_LIB_PEM, + self._lib.ERR_LIB_ASN1, + ) + raise ValueError("Could not unserialize key data.") + + def elliptic_curve_supported(self, curve): + if self._lib.Cryptography_HAS_EC != 1: + return False + + try: + curve_nid = self._elliptic_curve_to_nid(curve) + except UnsupportedAlgorithm: + curve_nid = self._lib.NID_undef + + ctx = self._lib.EC_GROUP_new_by_curve_name(curve_nid) + + if ctx == self._ffi.NULL: + errors = self._consume_errors() + assert ( + curve_nid == self._lib.NID_undef or + errors[0][1:] == ( + self._lib.ERR_LIB_EC, + self._lib.EC_F_EC_GROUP_NEW_BY_CURVE_NAME, + self._lib.EC_R_UNKNOWN_GROUP + ) + ) + return False + else: + assert curve_nid != self._lib.NID_undef + self._lib.EC_GROUP_free(ctx) + return True + + def elliptic_curve_signature_algorithm_supported( + self, signature_algorithm, curve + ): + if self._lib.Cryptography_HAS_EC != 1: + return False + + # We only support ECDSA right now. + if not isinstance(signature_algorithm, ec.ECDSA): + return False + + # Before 0.9.8m OpenSSL can't cope with digests longer than the curve. + if ( + self._lib.OPENSSL_VERSION_NUMBER < 0x009080df and + curve.key_size < signature_algorithm.algorithm.digest_size * 8 + ): + return False + + return self.elliptic_curve_supported(curve) + + def generate_elliptic_curve_private_key(self, curve): + """ + Generate a new private key on the named curve. + """ + + if self.elliptic_curve_supported(curve): + curve_nid = self._elliptic_curve_to_nid(curve) + + ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid) + assert ec_cdata != self._ffi.NULL + ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) + + res = self._lib.EC_KEY_generate_key(ec_cdata) + assert res == 1 + + res = self._lib.EC_KEY_check_key(ec_cdata) + assert res == 1 + + return _EllipticCurvePrivateKey(self, ec_cdata) + else: + raise UnsupportedAlgorithm( + "Backend object does not support {0}.".format(curve.name), + _Reasons.UNSUPPORTED_ELLIPTIC_CURVE + ) + + def elliptic_curve_private_key_from_numbers(self, numbers): + warnings.warn( + "elliptic_curve_private_key_from_numbers is deprecated and will " + "be removed in a future version.", + utils.DeprecatedIn06, + stacklevel=2 + ) + return self.load_elliptic_curve_private_numbers(numbers) + + def load_elliptic_curve_private_numbers(self, numbers): + public = numbers.public_numbers + + curve_nid = self._elliptic_curve_to_nid(public.curve) + + ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid) + assert ec_cdata != self._ffi.NULL + ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) + + ec_cdata = self._ec_key_set_public_key_affine_coordinates( + ec_cdata, public.x, public.y) + + res = self._lib.EC_KEY_set_private_key( + ec_cdata, self._int_to_bn(numbers.private_value)) + assert res == 1 + + return _EllipticCurvePrivateKey(self, ec_cdata) + + def elliptic_curve_public_key_from_numbers(self, numbers): + warnings.warn( + "elliptic_curve_public_key_from_numbers is deprecated and will be " + "removed in a future version.", + utils.DeprecatedIn06, + stacklevel=2 + ) + return self.load_elliptic_curve_public_numbers(numbers) + + def load_elliptic_curve_public_numbers(self, numbers): + curve_nid = self._elliptic_curve_to_nid(numbers.curve) + + ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid) + assert ec_cdata != self._ffi.NULL + ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) + + ec_cdata = self._ec_key_set_public_key_affine_coordinates( + ec_cdata, numbers.x, numbers.y) + + return _EllipticCurvePublicKey(self, ec_cdata) + + def _elliptic_curve_to_nid(self, curve): + """ + Get the NID for a curve name. + """ + + curve_aliases = { + "secp192r1": "prime192v1", + "secp256r1": "prime256v1" + } + + curve_name = curve_aliases.get(curve.name, curve.name) + + curve_nid = self._lib.OBJ_sn2nid(curve_name.encode()) + if curve_nid == self._lib.NID_undef: + raise UnsupportedAlgorithm( + "{0} is not a supported elliptic curve".format(curve.name), + _Reasons.UNSUPPORTED_ELLIPTIC_CURVE + ) + return curve_nid + + @contextmanager + def _tmp_bn_ctx(self): + bn_ctx = self._lib.BN_CTX_new() + assert bn_ctx != self._ffi.NULL + bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free) + self._lib.BN_CTX_start(bn_ctx) + try: + yield bn_ctx + finally: + self._lib.BN_CTX_end(bn_ctx) + + def _ec_key_determine_group_get_set_funcs(self, ctx): + """ + Given an EC_KEY determine the group and what methods are required to + get/set point coordinates. + """ + assert ctx != self._ffi.NULL + + nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field") + assert nid_two_field != self._lib.NID_undef + + group = self._lib.EC_KEY_get0_group(ctx) + assert group != self._ffi.NULL + + method = self._lib.EC_GROUP_method_of(group) + assert method != self._ffi.NULL + + nid = self._lib.EC_METHOD_get_field_type(method) + assert nid != self._lib.NID_undef + + if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M: + set_func = self._lib.EC_POINT_set_affine_coordinates_GF2m + get_func = self._lib.EC_POINT_get_affine_coordinates_GF2m + else: + set_func = self._lib.EC_POINT_set_affine_coordinates_GFp + get_func = self._lib.EC_POINT_get_affine_coordinates_GFp + + assert set_func and get_func + + return set_func, get_func, group + + def _ec_key_set_public_key_affine_coordinates(self, ctx, x, y): + """ + This is a port of EC_KEY_set_public_key_affine_coordinates that was + added in 1.0.1. + + Sets the public key point in the EC_KEY context to the affine x and y + values. + """ + + if x < 0 or y < 0: + raise ValueError( + "Invalid EC key. Both x and y must be non-negative." + ) + + bn_x = self._int_to_bn(x) + bn_y = self._int_to_bn(y) + + set_func, get_func, group = ( + self._ec_key_determine_group_get_set_funcs(ctx) + ) + + point = self._lib.EC_POINT_new(group) + assert point != self._ffi.NULL + point = self._ffi.gc(point, self._lib.EC_POINT_free) + + with self._tmp_bn_ctx() as bn_ctx: + check_x = self._lib.BN_CTX_get(bn_ctx) + check_y = self._lib.BN_CTX_get(bn_ctx) + + res = set_func(group, point, bn_x, bn_y, bn_ctx) + assert res == 1 + + res = get_func(group, point, check_x, check_y, bn_ctx) + assert res == 1 + + assert self._lib.BN_cmp(bn_x, check_x) == 0 + assert self._lib.BN_cmp(bn_y, check_y) == 0 + + res = self._lib.EC_KEY_set_public_key(ctx, point) + assert res == 1 + + res = self._lib.EC_KEY_check_key(ctx) + if res != 1: + self._consume_errors() + raise ValueError("Invalid EC key.") + + return ctx + + +class GetCipherByName(object): + def __init__(self, fmt): + self._fmt = fmt + + def __call__(self, backend, cipher, mode): + cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() + return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) + + +backend = Backend() |