aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorErik Trauschke <erik.trauschke@gmail.com>2015-09-25 15:37:44 -0700
committerErik Trauschke <erik.trauschke@gmail.com>2015-09-25 15:37:44 -0700
commitcd33bcc4b11fef67806dbe387dc1246e9a211797 (patch)
treec0fadf54c101d4b2ca00dcd1c880c94779ef86e3 /src
parentea6983f58728d15e8ee075e643739e7e10798cae (diff)
parentd042614ab4aa6aea1e18b7001663f971a6fe6f8a (diff)
downloadcryptography-cd33bcc4b11fef67806dbe387dc1246e9a211797.tar.gz
cryptography-cd33bcc4b11fef67806dbe387dc1246e9a211797.tar.bz2
cryptography-cd33bcc4b11fef67806dbe387dc1246e9a211797.zip
Merge branch 'master' into crl_ossl_backend
Diffstat (limited to 'src')
-rw-r--r--src/cryptography/hazmat/backends/openssl/backend.py325
1 files changed, 172 insertions, 153 deletions
diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py
index 8503b9b9..18592893 100644
--- a/src/cryptography/hazmat/backends/openssl/backend.py
+++ b/src/cryptography/hazmat/backends/openssl/backend.py
@@ -62,6 +62,12 @@ _OpenSSLError = collections.namedtuple("_OpenSSLError",
["code", "lib", "func", "reason"])
+class UnhandledOpenSSLError(Exception):
+ def __init__(self, msg, errors):
+ super(UnhandledOpenSSLError, self).__init__(msg)
+ self.errors = errors
+
+
def _encode_asn1_int(backend, x):
"""
Converts a python integer to a ASN1_INTEGER. The returned ASN1_INTEGER will
@@ -78,7 +84,7 @@ def _encode_asn1_int(backend, x):
# Wrap in a ASN.1 integer. Don't GC -- as documented.
i = backend._lib.BN_to_ASN1_INTEGER(i, backend._ffi.NULL)
- assert i != backend._ffi.NULL
+ backend.openssl_assert(i != backend._ffi.NULL)
return i
@@ -94,7 +100,7 @@ def _encode_asn1_str(backend, data, length):
"""
s = backend._lib.ASN1_OCTET_STRING_new()
res = backend._lib.ASN1_OCTET_STRING_set(s, data, length)
- assert res == 1
+ backend.openssl_assert(res == 1)
return s
@@ -108,7 +114,7 @@ def _encode_inhibit_any_policy(backend, inhibit_any_policy):
asn1int = _encode_asn1_int_gc(backend, inhibit_any_policy.skip_certs)
pp = backend._ffi.new('unsigned char **')
r = backend._lib.i2d_ASN1_INTEGER(asn1int, pp)
- assert r > 0
+ backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
@@ -130,7 +136,7 @@ def _encode_name(backend, attributes):
value,
-1, -1, 0,
)
- assert res == 1
+ backend.openssl_assert(res == 1)
return subject
@@ -147,7 +153,7 @@ def _txt2obj(backend, name):
"""
name = name.encode('ascii')
obj = backend._lib.OBJ_txt2obj(name, 1)
- assert obj != backend._ffi.NULL
+ backend.openssl_assert(obj != backend._ffi.NULL)
return obj
@@ -170,33 +176,33 @@ def _encode_key_usage(backend, key_usage):
ku = backend._lib.ASN1_BIT_STRING_new()
ku = backend._ffi.gc(ku, backend._lib.ASN1_BIT_STRING_free)
res = set_bit(ku, 0, key_usage.digital_signature)
- assert res == 1
+ backend.openssl_assert(res == 1)
res = set_bit(ku, 1, key_usage.content_commitment)
- assert res == 1
+ backend.openssl_assert(res == 1)
res = set_bit(ku, 2, key_usage.key_encipherment)
- assert res == 1
+ backend.openssl_assert(res == 1)
res = set_bit(ku, 3, key_usage.data_encipherment)
- assert res == 1
+ backend.openssl_assert(res == 1)
res = set_bit(ku, 4, key_usage.key_agreement)
- assert res == 1
+ backend.openssl_assert(res == 1)
res = set_bit(ku, 5, key_usage.key_cert_sign)
- assert res == 1
+ backend.openssl_assert(res == 1)
res = set_bit(ku, 6, key_usage.crl_sign)
- assert res == 1
+ backend.openssl_assert(res == 1)
if key_usage.key_agreement:
res = set_bit(ku, 7, key_usage.encipher_only)
- assert res == 1
+ backend.openssl_assert(res == 1)
res = set_bit(ku, 8, key_usage.decipher_only)
- assert res == 1
+ backend.openssl_assert(res == 1)
else:
res = set_bit(ku, 7, 0)
- assert res == 1
+ backend.openssl_assert(res == 1)
res = set_bit(ku, 8, 0)
- assert res == 1
+ backend.openssl_assert(res == 1)
pp = backend._ffi.new('unsigned char **')
r = backend._lib.i2d_ASN1_BIT_STRING(ku, pp)
- assert r > 0
+ backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
@@ -205,7 +211,7 @@ def _encode_key_usage(backend, key_usage):
def _encode_authority_key_identifier(backend, authority_keyid):
akid = backend._lib.AUTHORITY_KEYID_new()
- assert akid != backend._ffi.NULL
+ backend.openssl_assert(akid != backend._ffi.NULL)
akid = backend._ffi.gc(akid, backend._lib.AUTHORITY_KEYID_free)
if authority_keyid.key_identifier is not None:
akid.keyid = _encode_asn1_str(
@@ -226,7 +232,7 @@ def _encode_authority_key_identifier(backend, authority_keyid):
pp = backend._ffi.new('unsigned char **')
r = backend._lib.i2d_AUTHORITY_KEYID(akid, pp)
- assert r > 0
+ backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
@@ -247,7 +253,7 @@ def _encode_basic_constraints(backend, basic_constraints):
# Fetch the encoded payload.
pp = backend._ffi.new('unsigned char **')
r = backend._lib.i2d_BASIC_CONSTRAINTS(constraints, pp)
- assert r > 0
+ backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
@@ -256,7 +262,7 @@ def _encode_basic_constraints(backend, basic_constraints):
def _encode_authority_information_access(backend, authority_info_access):
aia = backend._lib.sk_ACCESS_DESCRIPTION_new_null()
- assert aia != backend._ffi.NULL
+ backend.openssl_assert(aia != backend._ffi.NULL)
aia = backend._ffi.gc(
aia, backend._lib.sk_ACCESS_DESCRIPTION_free
)
@@ -269,11 +275,11 @@ def _encode_authority_information_access(backend, authority_info_access):
ad.method = method
ad.location = gn
res = backend._lib.sk_ACCESS_DESCRIPTION_push(aia, ad)
- assert res >= 1
+ backend.openssl_assert(res >= 1)
pp = backend._ffi.new('unsigned char **')
r = backend._lib.i2d_AUTHORITY_INFO_ACCESS(aia, pp)
- assert r > 0
+ backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
@@ -282,11 +288,11 @@ def _encode_authority_information_access(backend, authority_info_access):
def _encode_general_names(backend, names):
general_names = backend._lib.GENERAL_NAMES_new()
- assert general_names != backend._ffi.NULL
+ backend.openssl_assert(general_names != backend._ffi.NULL)
for name in names:
gn = _encode_general_name(backend, name)
res = backend._lib.sk_GENERAL_NAME_push(general_names, gn)
- assert res != 0
+ backend.openssl_assert(res != 0)
return general_names
@@ -298,7 +304,7 @@ def _encode_alt_name(backend, san):
)
pp = backend._ffi.new("unsigned char **")
r = backend._lib.i2d_GENERAL_NAMES(general_names, pp)
- assert r > 0
+ backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
@@ -309,7 +315,7 @@ def _encode_subject_key_identifier(backend, ski):
asn1_str = _encode_asn1_str_gc(backend, ski.digest, len(ski.digest))
pp = backend._ffi.new("unsigned char **")
r = backend._lib.i2d_ASN1_OCTET_STRING(asn1_str, pp)
- assert r > 0
+ backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
@@ -319,11 +325,11 @@ def _encode_subject_key_identifier(backend, ski):
def _encode_general_name(backend, name):
if isinstance(name, x509.DNSName):
gn = backend._lib.GENERAL_NAME_new()
- assert gn != backend._ffi.NULL
+ backend.openssl_assert(gn != backend._ffi.NULL)
gn.type = backend._lib.GEN_DNS
ia5 = backend._lib.ASN1_IA5STRING_new()
- assert ia5 != backend._ffi.NULL
+ backend.openssl_assert(ia5 != backend._ffi.NULL)
if name.value.startswith(u"*."):
value = b"*." + idna.encode(name.value[2:])
@@ -331,26 +337,26 @@ def _encode_general_name(backend, name):
value = idna.encode(name.value)
res = backend._lib.ASN1_STRING_set(ia5, value, len(value))
- assert res == 1
+ backend.openssl_assert(res == 1)
gn.d.dNSName = ia5
elif isinstance(name, x509.RegisteredID):
gn = backend._lib.GENERAL_NAME_new()
- assert gn != backend._ffi.NULL
+ backend.openssl_assert(gn != backend._ffi.NULL)
gn.type = backend._lib.GEN_RID
obj = backend._lib.OBJ_txt2obj(
name.value.dotted_string.encode('ascii'), 1
)
- assert obj != backend._ffi.NULL
+ backend.openssl_assert(obj != backend._ffi.NULL)
gn.d.registeredID = obj
elif isinstance(name, x509.DirectoryName):
gn = backend._lib.GENERAL_NAME_new()
- assert gn != backend._ffi.NULL
+ backend.openssl_assert(gn != backend._ffi.NULL)
dir_name = _encode_name(backend, name.value)
gn.type = backend._lib.GEN_DIRNAME
gn.d.directoryName = dir_name
elif isinstance(name, x509.IPAddress):
gn = backend._lib.GENERAL_NAME_new()
- assert gn != backend._ffi.NULL
+ backend.openssl_assert(gn != backend._ffi.NULL)
ipaddr = _encode_asn1_str(
backend, name.value.packed, len(name.value.packed)
)
@@ -358,14 +364,14 @@ def _encode_general_name(backend, name):
gn.d.iPAddress = ipaddr
elif isinstance(name, x509.OtherName):
gn = backend._lib.GENERAL_NAME_new()
- assert gn != backend._ffi.NULL
+ backend.openssl_assert(gn != backend._ffi.NULL)
other_name = backend._lib.OTHERNAME_new()
- assert other_name != backend._ffi.NULL
+ backend.openssl_assert(other_name != backend._ffi.NULL)
type_id = backend._lib.OBJ_txt2obj(
name.type_id.dotted_string.encode('ascii'), 1
)
- assert type_id != backend._ffi.NULL
+ backend.openssl_assert(type_id != backend._ffi.NULL)
data = backend._ffi.new("unsigned char[]", name.value)
data_ptr_ptr = backend._ffi.new("unsigned char **")
data_ptr_ptr[0] = data
@@ -381,7 +387,7 @@ def _encode_general_name(backend, name):
gn.d.otherName = other_name
elif isinstance(name, x509.RFC822Name):
gn = backend._lib.GENERAL_NAME_new()
- assert gn != backend._ffi.NULL
+ backend.openssl_assert(gn != backend._ffi.NULL)
asn1_str = _encode_asn1_str(
backend, name._encoded, len(name._encoded)
)
@@ -389,7 +395,7 @@ def _encode_general_name(backend, name):
gn.d.rfc822Name = asn1_str
elif isinstance(name, x509.UniformResourceIdentifier):
gn = backend._lib.GENERAL_NAME_new()
- assert gn != backend._ffi.NULL
+ backend.openssl_assert(gn != backend._ffi.NULL)
asn1_str = _encode_asn1_str(
backend, name._encoded, len(name._encoded)
)
@@ -409,13 +415,13 @@ def _encode_extended_key_usage(backend, extended_key_usage):
for oid in extended_key_usage:
obj = _txt2obj(backend, oid.dotted_string)
res = backend._lib.sk_ASN1_OBJECT_push(eku, obj)
- assert res >= 1
+ backend.openssl_assert(res >= 1)
pp = backend._ffi.new('unsigned char **')
r = backend._lib.i2d_EXTENDED_KEY_USAGE(
backend._ffi.cast("EXTENDED_KEY_USAGE *", eku), pp
)
- assert r > 0
+ backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
@@ -439,32 +445,32 @@ def _encode_crl_distribution_points(backend, crl_distribution_points):
cdp = backend._ffi.gc(cdp, backend._lib.sk_DIST_POINT_free)
for point in crl_distribution_points:
dp = backend._lib.DIST_POINT_new()
- assert dp != backend._ffi.NULL
+ backend.openssl_assert(dp != backend._ffi.NULL)
if point.reasons:
bitmask = backend._lib.ASN1_BIT_STRING_new()
- assert bitmask != backend._ffi.NULL
+ backend.openssl_assert(bitmask != backend._ffi.NULL)
dp.reasons = bitmask
for reason in point.reasons:
res = backend._lib.ASN1_BIT_STRING_set_bit(
bitmask, _CRLREASONFLAGS[reason], 1
)
- assert res == 1
+ backend.openssl_assert(res == 1)
if point.full_name:
dpn = backend._lib.DIST_POINT_NAME_new()
- assert dpn != backend._ffi.NULL
+ backend.openssl_assert(dpn != backend._ffi.NULL)
dpn.type = _DISTPOINT_TYPE_FULLNAME
dpn.name.fullname = _encode_general_names(backend, point.full_name)
dp.distpoint = dpn
if point.relative_name:
dpn = backend._lib.DIST_POINT_NAME_new()
- assert dpn != backend._ffi.NULL
+ backend.openssl_assert(dpn != backend._ffi.NULL)
dpn.type = _DISTPOINT_TYPE_RELATIVENAME
name = _encode_name_gc(backend, point.relative_name)
relativename = backend._lib.sk_X509_NAME_ENTRY_dup(name.entries)
- assert relativename != backend._ffi.NULL
+ backend.openssl_assert(relativename != backend._ffi.NULL)
dpn.name.relativename = relativename
dp.distpoint = dpn
@@ -472,11 +478,11 @@ def _encode_crl_distribution_points(backend, crl_distribution_points):
dp.CRLissuer = _encode_general_names(backend, point.crl_issuer)
res = backend._lib.sk_DIST_POINT_push(cdp, dp)
- assert res >= 1
+ backend.openssl_assert(res >= 1)
pp = backend._ffi.new('unsigned char **')
r = backend._lib.i2d_CRL_DIST_POINTS(cdp, pp)
- assert r > 0
+ backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
@@ -526,7 +532,7 @@ class Backend(object):
# is the default for newer OpenSSLs for several years and is
# recommended in RFC 2459.
res = self._lib.ASN1_STRING_set_default_mask_asc(b"utf8only")
- assert res == 1
+ self.openssl_assert(res == 1)
self._binding.init_static_locks()
@@ -534,6 +540,16 @@ class Backend(object):
self._register_default_ciphers()
self.activate_osrandom_engine()
+ def openssl_assert(self, ok):
+ if not ok:
+ errors = self._consume_errors()
+ raise UnhandledOpenSSLError(
+ "Unknown OpenSSL error. Please file an issue at https://github"
+ ".com/pyca/cryptography/issues with information on how to "
+ "reproduce this.",
+ errors
+ )
+
def activate_builtin_random(self):
# Obtain a new structural reference.
e = self._lib.ENGINE_get_default_RAND()
@@ -543,7 +559,7 @@ class Backend(object):
self._lib.RAND_cleanup()
# decrement the structural reference from get_default_RAND
res = self._lib.ENGINE_finish(e)
- assert res == 1
+ self.openssl_assert(res == 1)
def activate_osrandom_engine(self):
# Unregister and free the current engine.
@@ -551,19 +567,19 @@ class Backend(object):
# Fetches an engine by id and returns it. This creates a structural
# reference.
e = self._lib.ENGINE_by_id(self._binding._osrandom_engine_id)
- assert e != self._ffi.NULL
+ self.openssl_assert(e != self._ffi.NULL)
# Initialize the engine for use. This adds a functional reference.
res = self._lib.ENGINE_init(e)
- assert res == 1
+ self.openssl_assert(res == 1)
# Set the engine as the default RAND provider.
res = self._lib.ENGINE_set_default_RAND(e)
- assert res == 1
+ self.openssl_assert(res == 1)
# Decrement the structural ref incremented by ENGINE_by_id.
res = self._lib.ENGINE_free(e)
- assert res == 1
+ self.openssl_assert(res == 1)
# Decrement the functional ref incremented by ENGINE_init.
res = self._lib.ENGINE_finish(e)
- assert res == 1
+ self.openssl_assert(res == 1)
# Reset the RNG to use the new engine.
self._lib.RAND_cleanup()
@@ -705,7 +721,7 @@ class Backend(object):
if self._lib.Cryptography_HAS_PBKDF2_HMAC:
evp_md = self._lib.EVP_get_digestbyname(
algorithm.name.encode("ascii"))
- assert evp_md != self._ffi.NULL
+ self.openssl_assert(evp_md != self._ffi.NULL)
res = self._lib.PKCS5_PBKDF2_HMAC(
key_material,
len(key_material),
@@ -716,7 +732,7 @@ class Backend(object):
length,
buf
)
- assert res == 1
+ self.openssl_assert(res == 1)
else:
if not isinstance(algorithm, hashes.SHA1):
raise UnsupportedAlgorithm(
@@ -733,7 +749,7 @@ class Backend(object):
length,
buf
)
- assert res == 1
+ self.openssl_assert(res == 1)
return self._ffi.buffer(buf)[:]
@@ -773,15 +789,14 @@ class Backend(object):
bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
# A zero length means the BN has value 0
- assert bin_len >= 0
- assert bin_ptr != self._ffi.NULL
+ self.openssl_assert(bin_len >= 0)
return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
else:
# Under Python 2 the best we can do is hex()
hex_cdata = self._lib.BN_bn2hex(bn)
- assert hex_cdata != self._ffi.NULL
+ self.openssl_assert(hex_cdata != self._ffi.NULL)
hex_str = self._ffi.string(hex_cdata)
self._lib.OPENSSL_free(hex_cdata)
return int(hex_str, 16)
@@ -803,7 +818,7 @@ class Backend(object):
binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big")
bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn)
- assert bn_ptr != self._ffi.NULL
+ self.openssl_assert(bn_ptr != self._ffi.NULL)
return bn_ptr
else:
@@ -813,15 +828,15 @@ class Backend(object):
bn_ptr = self._ffi.new("BIGNUM **")
bn_ptr[0] = bn
res = self._lib.BN_hex2bn(bn_ptr, hex_num)
- assert res != 0
- assert bn_ptr[0] != self._ffi.NULL
+ self.openssl_assert(res != 0)
+ self.openssl_assert(bn_ptr[0] != self._ffi.NULL)
return bn_ptr[0]
def generate_rsa_private_key(self, public_exponent, key_size):
rsa._verify_rsa_parameters(public_exponent, key_size)
rsa_cdata = self._lib.RSA_new()
- assert rsa_cdata != self._ffi.NULL
+ self.openssl_assert(rsa_cdata != self._ffi.NULL)
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
bn = self._int_to_bn(public_exponent)
@@ -830,7 +845,7 @@ class Backend(object):
res = self._lib.RSA_generate_key_ex(
rsa_cdata, key_size, bn, self._ffi.NULL
)
- assert res == 1
+ self.openssl_assert(res == 1)
evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
@@ -851,7 +866,7 @@ class Backend(object):
numbers.public_numbers.n
)
rsa_cdata = self._lib.RSA_new()
- assert rsa_cdata != self._ffi.NULL
+ self.openssl_assert(rsa_cdata != self._ffi.NULL)
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
rsa_cdata.p = self._int_to_bn(numbers.p)
rsa_cdata.q = self._int_to_bn(numbers.q)
@@ -862,7 +877,7 @@ class Backend(object):
rsa_cdata.e = self._int_to_bn(numbers.public_numbers.e)
rsa_cdata.n = self._int_to_bn(numbers.public_numbers.n)
res = self._lib.RSA_blinding_on(rsa_cdata, self._ffi.NULL)
- assert res == 1
+ self.openssl_assert(res == 1)
evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
@@ -870,22 +885,22 @@ class Backend(object):
def load_rsa_public_numbers(self, numbers):
rsa._check_public_key_components(numbers.e, numbers.n)
rsa_cdata = self._lib.RSA_new()
- assert rsa_cdata != self._ffi.NULL
+ self.openssl_assert(rsa_cdata != self._ffi.NULL)
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
rsa_cdata.e = self._int_to_bn(numbers.e)
rsa_cdata.n = self._int_to_bn(numbers.n)
res = self._lib.RSA_blinding_on(rsa_cdata, self._ffi.NULL)
- assert res == 1
+ self.openssl_assert(res == 1)
evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
return _RSAPublicKey(self, rsa_cdata, evp_pkey)
def _rsa_cdata_to_evp_pkey(self, rsa_cdata):
evp_pkey = self._lib.EVP_PKEY_new()
- assert evp_pkey != self._ffi.NULL
+ self.openssl_assert(evp_pkey != self._ffi.NULL)
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata)
- assert res == 1
+ self.openssl_assert(res == 1)
return evp_pkey
def _bytes_to_bio(self, data):
@@ -899,7 +914,7 @@ class Backend(object):
bio = self._lib.BIO_new_mem_buf(
data_char_p, len(data)
)
- assert bio != self._ffi.NULL
+ self.openssl_assert(bio != self._ffi.NULL)
return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_char_p)
@@ -908,9 +923,9 @@ class Backend(object):
Creates an empty memory BIO.
"""
bio_method = self._lib.BIO_s_mem()
- assert bio_method != self._ffi.NULL
+ self.openssl_assert(bio_method != self._ffi.NULL)
bio = self._lib.BIO_new(bio_method)
- assert bio != self._ffi.NULL
+ self.openssl_assert(bio != self._ffi.NULL)
bio = self._ffi.gc(bio, self._lib.BIO_free)
return bio
@@ -920,8 +935,8 @@ class Backend(object):
"""
buf = self._ffi.new("char **")
buf_len = self._lib.BIO_get_mem_data(bio, buf)
- assert buf_len > 0
- assert buf[0] != self._ffi.NULL
+ self.openssl_assert(buf_len > 0)
+ self.openssl_assert(buf[0] != self._ffi.NULL)
bio_data = self._ffi.buffer(buf[0], buf_len)[:]
return bio_data
@@ -935,18 +950,18 @@ class Backend(object):
if key_type == self._lib.EVP_PKEY_RSA:
rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
- assert rsa_cdata != self._ffi.NULL
+ self.openssl_assert(rsa_cdata != self._ffi.NULL)
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
elif key_type == self._lib.EVP_PKEY_DSA:
dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
- assert dsa_cdata != self._ffi.NULL
+ self.openssl_assert(dsa_cdata != self._ffi.NULL)
dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
elif (self._lib.Cryptography_HAS_EC == 1 and
key_type == self._lib.EVP_PKEY_EC):
ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
- assert ec_cdata != self._ffi.NULL
+ self.openssl_assert(ec_cdata != self._ffi.NULL)
ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
else:
@@ -962,18 +977,18 @@ class Backend(object):
if key_type == self._lib.EVP_PKEY_RSA:
rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
- assert rsa_cdata != self._ffi.NULL
+ self.openssl_assert(rsa_cdata != self._ffi.NULL)
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
return _RSAPublicKey(self, rsa_cdata, evp_pkey)
elif key_type == self._lib.EVP_PKEY_DSA:
dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
- assert dsa_cdata != self._ffi.NULL
+ self.openssl_assert(dsa_cdata != self._ffi.NULL)
dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
return _DSAPublicKey(self, dsa_cdata, evp_pkey)
elif (self._lib.Cryptography_HAS_EC == 1 and
key_type == self._lib.EVP_PKEY_EC):
ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
- assert ec_cdata != self._ffi.NULL
+ self.openssl_assert(ec_cdata != self._ffi.NULL)
ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
else:
@@ -1047,7 +1062,7 @@ class Backend(object):
"support larger key sizes.")
ctx = self._lib.DSA_new()
- assert ctx != self._ffi.NULL
+ self.openssl_assert(ctx != self._ffi.NULL)
ctx = self._ffi.gc(ctx, self._lib.DSA_free)
res = self._lib.DSA_generate_parameters_ex(
@@ -1055,13 +1070,13 @@ class Backend(object):
self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
)
- assert res == 1
+ self.openssl_assert(res == 1)
return _DSAParameters(self, ctx)
def generate_dsa_private_key(self, parameters):
ctx = self._lib.DSA_new()
- assert ctx != self._ffi.NULL
+ self.openssl_assert(ctx != self._ffi.NULL)
ctx = self._ffi.gc(ctx, self._lib.DSA_free)
ctx.p = self._lib.BN_dup(parameters._dsa_cdata.p)
ctx.q = self._lib.BN_dup(parameters._dsa_cdata.q)
@@ -1081,7 +1096,7 @@ class Backend(object):
parameter_numbers = numbers.public_numbers.parameter_numbers
dsa_cdata = self._lib.DSA_new()
- assert dsa_cdata != self._ffi.NULL
+ self.openssl_assert(dsa_cdata != self._ffi.NULL)
dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
dsa_cdata.p = self._int_to_bn(parameter_numbers.p)
@@ -1097,7 +1112,7 @@ class Backend(object):
def load_dsa_public_numbers(self, numbers):
dsa._check_dsa_parameters(numbers.parameter_numbers)
dsa_cdata = self._lib.DSA_new()
- assert dsa_cdata != self._ffi.NULL
+ self.openssl_assert(dsa_cdata != self._ffi.NULL)
dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
dsa_cdata.p = self._int_to_bn(numbers.parameter_numbers.p)
@@ -1112,7 +1127,7 @@ class Backend(object):
def load_dsa_parameter_numbers(self, numbers):
dsa._check_dsa_parameters(numbers)
dsa_cdata = self._lib.DSA_new()
- assert dsa_cdata != self._ffi.NULL
+ self.openssl_assert(dsa_cdata != self._ffi.NULL)
dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
dsa_cdata.p = self._int_to_bn(numbers.p)
@@ -1123,10 +1138,10 @@ class Backend(object):
def _dsa_cdata_to_evp_pkey(self, dsa_cdata):
evp_pkey = self._lib.EVP_PKEY_new()
- assert evp_pkey != self._ffi.NULL
+ self.openssl_assert(evp_pkey != self._ffi.NULL)
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata)
- assert res == 1
+ self.openssl_assert(res == 1)
return evp_pkey
def dsa_hash_supported(self, algorithm):
@@ -1172,33 +1187,33 @@ class Backend(object):
evp_md = self._lib.EVP_get_digestbyname(
algorithm.name.encode('ascii')
)
- assert evp_md != self._ffi.NULL
+ self.openssl_assert(evp_md != self._ffi.NULL)
# Create an empty request.
x509_req = self._lib.X509_REQ_new()
- assert x509_req != self._ffi.NULL
+ self.openssl_assert(x509_req != self._ffi.NULL)
x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
# Set x509 version.
res = self._lib.X509_REQ_set_version(x509_req, x509.Version.v1.value)
- assert res == 1
+ self.openssl_assert(res == 1)
# Set subject name.
res = self._lib.X509_REQ_set_subject_name(
x509_req, _encode_name_gc(self, builder._subject_name)
)
- assert res == 1
+ self.openssl_assert(res == 1)
# Set subject public key.
public_key = private_key.public_key()
res = self._lib.X509_REQ_set_pubkey(
x509_req, public_key._evp_pkey
)
- assert res == 1
+ self.openssl_assert(res == 1)
# Add extensions.
extensions = self._lib.sk_X509_EXTENSION_new_null()
- assert extensions != self._ffi.NULL
+ self.openssl_assert(extensions != self._ffi.NULL)
extensions = self._ffi.gc(
extensions,
self._lib.sk_X509_EXTENSION_free,
@@ -1217,11 +1232,11 @@ class Backend(object):
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r),
)
- assert extension != self._ffi.NULL
+ self.openssl_assert(extension != self._ffi.NULL)
res = self._lib.sk_X509_EXTENSION_push(extensions, extension)
- assert res >= 1
+ self.openssl_assert(res >= 1)
res = self._lib.X509_REQ_add_extensions(x509_req, extensions)
- assert res == 1
+ self.openssl_assert(res == 1)
# Sign the request using the requester's private key.
res = self._lib.X509_REQ_sign(
@@ -1229,8 +1244,10 @@ class Backend(object):
)
if res == 0:
errors = self._consume_errors()
- assert errors[0][1] == self._lib.ERR_LIB_RSA
- assert errors[0][3] == self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY
+ self.openssl_assert(errors[0][1] == self._lib.ERR_LIB_RSA)
+ self.openssl_assert(
+ errors[0][3] == self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY
+ )
raise ValueError("Digest too big for RSA key")
return _CertificateSigningRequest(self, x509_req)
@@ -1257,7 +1274,7 @@ class Backend(object):
evp_md = self._lib.EVP_get_digestbyname(
algorithm.name.encode('ascii')
)
- assert evp_md != self._ffi.NULL
+ self.openssl_assert(evp_md != self._ffi.NULL)
# Create an empty certificate.
x509_cert = self._lib.X509_new()
@@ -1265,38 +1282,38 @@ class Backend(object):
# Set the x509 version.
res = self._lib.X509_set_version(x509_cert, builder._version.value)
- assert res == 1
+ self.openssl_assert(res == 1)
# Set the subject's name.
res = self._lib.X509_set_subject_name(
x509_cert, _encode_name(self, list(builder._subject_name))
)
- assert res == 1
+ self.openssl_assert(res == 1)
# Set the subject's public key.
res = self._lib.X509_set_pubkey(
x509_cert, builder._public_key._evp_pkey
)
- assert res == 1
+ self.openssl_assert(res == 1)
# Set the certificate serial number.
serial_number = _encode_asn1_int_gc(self, builder._serial_number)
res = self._lib.X509_set_serialNumber(x509_cert, serial_number)
- assert res == 1
+ self.openssl_assert(res == 1)
# Set the "not before" time.
res = self._lib.ASN1_TIME_set(
self._lib.X509_get_notBefore(x509_cert),
calendar.timegm(builder._not_valid_before.timetuple())
)
- assert res != self._ffi.NULL
+ self.openssl_assert(res != self._ffi.NULL)
# Set the "not after" time.
res = self._lib.ASN1_TIME_set(
self._lib.X509_get_notAfter(x509_cert),
calendar.timegm(builder._not_valid_after.timetuple())
)
- assert res != self._ffi.NULL
+ self.openssl_assert(res != self._ffi.NULL)
# Add extensions.
for i, extension in enumerate(builder._extensions):
@@ -1313,16 +1330,16 @@ class Backend(object):
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r)
)
- assert extension != self._ffi.NULL
+ self.openssl_assert(extension != self._ffi.NULL)
extension = self._ffi.gc(extension, self._lib.X509_EXTENSION_free)
res = self._lib.X509_add_ext(x509_cert, extension, i)
- assert res == 1
+ self.openssl_assert(res == 1)
# Set the issuer name.
res = self._lib.X509_set_issuer_name(
x509_cert, _encode_name(self, list(builder._issuer_name))
)
- assert res == 1
+ self.openssl_assert(res == 1)
# Sign the certificate with the issuer's private key.
res = self._lib.X509_sign(
@@ -1330,8 +1347,10 @@ class Backend(object):
)
if res == 0:
errors = self._consume_errors()
- assert errors[0][1] == self._lib.ERR_LIB_RSA
- assert errors[0][3] == self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY
+ self.openssl_assert(errors[0][1] == self._lib.ERR_LIB_RSA)
+ self.openssl_assert(
+ errors[0][3] == self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY
+ )
raise ValueError("Digest too big for RSA key")
return _Certificate(self, x509_cert)
@@ -1358,7 +1377,7 @@ class Backend(object):
# embedded in a subjectPublicKeyInfo)
self._consume_errors()
res = self._lib.BIO_reset(mem_bio.bio)
- assert res == 1
+ self.openssl_assert(res == 1)
rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey(
mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
)
@@ -1382,7 +1401,7 @@ class Backend(object):
# PKCS8 keys using d2i_PKCS8PrivateKey_bio so we do this instead.
# Reset the memory BIO so we can read the data again.
res = self._lib.BIO_reset(bio_data.bio)
- assert res == 1
+ self.openssl_assert(res == 1)
key = self._evp_pkey_from_der_unencrypted_pkcs8(bio_data, password)
if key:
@@ -1418,7 +1437,7 @@ class Backend(object):
info = self._ffi.gc(info, self._lib.PKCS8_PRIV_KEY_INFO_free)
if info != self._ffi.NULL:
key = self._lib.EVP_PKCS82PKEY(info)
- assert key != self._ffi.NULL
+ self.openssl_assert(key != self._ffi.NULL)
key = self._ffi.gc(key, self._lib.EVP_PKEY_free)
if password is not None:
raise TypeError(
@@ -1441,7 +1460,7 @@ class Backend(object):
# embedded in a subjectPublicKeyInfo)
self._consume_errors()
res = self._lib.BIO_reset(mem_bio.bio)
- assert res == 1
+ self.openssl_assert(res == 1)
rsa_cdata = self._lib.d2i_RSAPublicKey_bio(
mem_bio.bio, self._ffi.NULL
)
@@ -1533,7 +1552,7 @@ class Backend(object):
if evp_pkey == self._ffi.NULL:
if password_func.exception is not None:
errors = self._consume_errors()
- assert errors
+ self.openssl_assert(errors)
raise password_func.exception
else:
self._handle_key_loading_error()
@@ -1623,7 +1642,7 @@ class Backend(object):
if ctx == self._ffi.NULL:
errors = self._consume_errors()
- assert (
+ self.openssl_assert(
curve_nid == self._lib.NID_undef or
errors[0][1:] == (
self._lib.ERR_LIB_EC,
@@ -1633,7 +1652,7 @@ class Backend(object):
)
return False
else:
- assert curve_nid != self._lib.NID_undef
+ self.openssl_assert(curve_nid != self._lib.NID_undef)
self._lib.EC_GROUP_free(ctx)
return True
@@ -1665,14 +1684,14 @@ class Backend(object):
curve_nid = self._elliptic_curve_to_nid(curve)
ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
- assert ec_cdata != self._ffi.NULL
+ self.openssl_assert(ec_cdata != self._ffi.NULL)
ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
res = self._lib.EC_KEY_generate_key(ec_cdata)
- assert res == 1
+ self.openssl_assert(res == 1)
res = self._lib.EC_KEY_check_key(ec_cdata)
- assert res == 1
+ self.openssl_assert(res == 1)
evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
@@ -1689,7 +1708,7 @@ class Backend(object):
curve_nid = self._elliptic_curve_to_nid(public.curve)
ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
- assert ec_cdata != self._ffi.NULL
+ self.openssl_assert(ec_cdata != self._ffi.NULL)
ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
ec_cdata = self._ec_key_set_public_key_affine_coordinates(
@@ -1697,7 +1716,7 @@ class Backend(object):
res = self._lib.EC_KEY_set_private_key(
ec_cdata, self._int_to_bn(numbers.private_value))
- assert res == 1
+ self.openssl_assert(res == 1)
evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
@@ -1706,7 +1725,7 @@ class Backend(object):
curve_nid = self._elliptic_curve_to_nid(numbers.curve)
ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
- assert ec_cdata != self._ffi.NULL
+ self.openssl_assert(ec_cdata != self._ffi.NULL)
ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
ec_cdata = self._ec_key_set_public_key_affine_coordinates(
@@ -1717,10 +1736,10 @@ class Backend(object):
def _ec_cdata_to_evp_pkey(self, ec_cdata):
evp_pkey = self._lib.EVP_PKEY_new()
- assert evp_pkey != self._ffi.NULL
+ self.openssl_assert(evp_pkey != self._ffi.NULL)
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata)
- assert res == 1
+ self.openssl_assert(res == 1)
return evp_pkey
def _elliptic_curve_to_nid(self, curve):
@@ -1746,7 +1765,7 @@ class Backend(object):
@contextmanager
def _tmp_bn_ctx(self):
bn_ctx = self._lib.BN_CTX_new()
- assert bn_ctx != self._ffi.NULL
+ self.openssl_assert(bn_ctx != self._ffi.NULL)
bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free)
self._lib.BN_CTX_start(bn_ctx)
try:
@@ -1759,19 +1778,19 @@ class Backend(object):
Given an EC_KEY determine the group and what methods are required to
get/set point coordinates.
"""
- assert ctx != self._ffi.NULL
+ self.openssl_assert(ctx != self._ffi.NULL)
nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field")
- assert nid_two_field != self._lib.NID_undef
+ self.openssl_assert(nid_two_field != self._lib.NID_undef)
group = self._lib.EC_KEY_get0_group(ctx)
- assert group != self._ffi.NULL
+ self.openssl_assert(group != self._ffi.NULL)
method = self._lib.EC_GROUP_method_of(group)
- assert method != self._ffi.NULL
+ self.openssl_assert(method != self._ffi.NULL)
nid = self._lib.EC_METHOD_get_field_type(method)
- assert nid != self._lib.NID_undef
+ self.openssl_assert(nid != self._lib.NID_undef)
if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M:
set_func = self._lib.EC_POINT_set_affine_coordinates_GF2m
@@ -1803,7 +1822,7 @@ class Backend(object):
)
point = self._lib.EC_POINT_new(group)
- assert point != self._ffi.NULL
+ self.openssl_assert(point != self._ffi.NULL)
point = self._ffi.gc(point, self._lib.EC_POINT_free)
bn_x = self._int_to_bn(x)
@@ -1814,18 +1833,18 @@ class Backend(object):
check_y = self._lib.BN_CTX_get(bn_ctx)
res = set_func(group, point, bn_x, bn_y, bn_ctx)
- assert res == 1
+ self.openssl_assert(res == 1)
res = get_func(group, point, check_x, check_y, bn_ctx)
- assert res == 1
+ self.openssl_assert(res == 1)
res = self._lib.BN_cmp(bn_x, check_x)
- assert res == 0
+ self.openssl_assert(res == 0)
res = self._lib.BN_cmp(bn_y, check_y)
- assert res == 0
+ self.openssl_assert(res == 0)
res = self._lib.EC_KEY_set_public_key(ctx, point)
- assert res == 1
+ self.openssl_assert(res == 1)
res = self._lib.EC_KEY_check_key(ctx)
if res != 1:
@@ -1914,7 +1933,7 @@ class Backend(object):
self._ffi.NULL,
self._ffi.NULL
)
- assert res == 1
+ self.openssl_assert(res == 1)
return self._read_mem_bio(bio)
def _private_key_bytes_traditional_der(self, key_type, cdata):
@@ -1924,12 +1943,12 @@ class Backend(object):
key_type == self._lib.EVP_PKEY_EC):
write_bio = self._lib.i2d_ECPrivateKey_bio
else:
- assert key_type == self._lib.EVP_PKEY_DSA
+ self.openssl_assert(key_type == self._lib.EVP_PKEY_DSA)
write_bio = self._lib.i2d_DSAPrivateKey_bio
bio = self._create_mem_bio()
res = write_bio(bio, cdata)
- assert res == 1
+ self.openssl_assert(res == 1)
return self._read_mem_bio(bio)
def _public_key_bytes(self, encoding, format, evp_pkey, cdata):
@@ -1961,12 +1980,12 @@ class Backend(object):
bio = self._create_mem_bio()
res = write_bio(bio, key)
- assert res == 1
+ self.openssl_assert(res == 1)
return self._read_mem_bio(bio)
def _asn1_integer_to_int(self, asn1_int):
bn = self._lib.ASN1_INTEGER_to_BN(asn1_int, self._ffi.NULL)
- assert bn != self._ffi.NULL
+ self.openssl_assert(bn != self._ffi.NULL)
bn = self._ffi.gc(bn, self._lib.BN_free)
return self._bn_to_int(bn)
@@ -1979,8 +1998,8 @@ class Backend(object):
def _asn1_string_to_utf8(self, asn1_string):
buf = self._ffi.new("unsigned char **")
res = self._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
- assert res >= 0
- assert buf[0] != self._ffi.NULL
+ self.openssl_assert(res >= 0)
+ self.openssl_assert(buf[0] != self._ffi.NULL)
buf = self._ffi.gc(
buf, lambda buffer: self._lib.OPENSSL_free(buffer[0])
)
@@ -1989,19 +2008,19 @@ class Backend(object):
def _asn1_to_der(self, asn1_type):
buf = self._ffi.new("unsigned char **")
res = self._lib.i2d_ASN1_TYPE(asn1_type, buf)
- assert res >= 0
- assert buf[0] != self._ffi.NULL
+ self.openssl_assert(res >= 0)
+ self.openssl_assert(buf[0] != self._ffi.NULL)
buf = self._ffi.gc(
buf, lambda buffer: self._lib.OPENSSL_free(buffer[0])
)
return self._ffi.buffer(buf[0], res)[:]
def _parse_asn1_time(self, asn1_time):
- assert asn1_time != self._ffi.NULL
+ self.openssl_assert(asn1_time != self._ffi.NULL)
generalized_time = self._lib.ASN1_TIME_to_generalizedtime(
asn1_time, self._ffi.NULL
)
- assert generalized_time != self._ffi.NULL
+ self.openssl_assert(generalized_time != self._ffi.NULL)
generalized_time = self._ffi.gc(
generalized_time, self._lib.ASN1_GENERALIZEDTIME_free
)