diff options
author | Paul Kehrer <paul.l.kehrer@gmail.com> | 2017-02-13 20:01:23 -0600 |
---|---|---|
committer | Alex Gaynor <alex.gaynor@gmail.com> | 2017-02-13 18:01:23 -0800 |
commit | 4a90c254278231d7defeac304a3cfd752e96e786 (patch) | |
tree | 7133b8188ad38f12b44c40064c6021566e070595 | |
parent | bd7cd2d43f75bd34830dfbeaf0ac4f8be2fce9a7 (diff) | |
download | cryptography-4a90c254278231d7defeac304a3cfd752e96e786.tar.gz cryptography-4a90c254278231d7defeac304a3cfd752e96e786.tar.bz2 cryptography-4a90c254278231d7defeac304a3cfd752e96e786.zip |
switch the PEM password callback to a C implementation (#3382)
* switch the PEM password callback to a C implementation
Calling from C to Python is fraught with edge cases, especially in
subinterpreter land. This commit moves the PEM password callback logic
into a small C function and then removes all the infrastructure for the
cffi callbacks (as we no longer have any)
* review feedback and update tests
* rename the struct
* aaand one more fix
-rw-r--r-- | src/_cffi_src/openssl/callbacks.py | 66 | ||||
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/backend.py | 93 | ||||
-rw-r--r-- | src/cryptography/hazmat/bindings/openssl/binding.py | 19 | ||||
-rw-r--r-- | tests/hazmat/backends/test_openssl.py | 39 |
4 files changed, 81 insertions, 136 deletions
diff --git a/src/_cffi_src/openssl/callbacks.py b/src/_cffi_src/openssl/callbacks.py index 0194d2a5..a5f6f361 100644 --- a/src/_cffi_src/openssl/callbacks.py +++ b/src/_cffi_src/openssl/callbacks.py @@ -4,8 +4,6 @@ from __future__ import absolute_import, division, print_function -import cffi - INCLUDES = """ #include <openssl/ssl.h> #include <openssl/x509.h> @@ -16,45 +14,24 @@ INCLUDES = """ """ TYPES = """ -static const long Cryptography_STATIC_CALLBACKS; - -/* crypto.h - * CRYPTO_set_locking_callback - * void (*cb)(int mode, int type, const char *file, int line) - */ -extern "Python" void Cryptography_locking_cb(int, int, const char *, int); - -/* pem.h - * int pem_password_cb(char *buf, int size, int rwflag, void *userdata); - */ -extern "Python" int Cryptography_pem_password_cb(char *, int, int, void *); - -/* rand.h - * int (*bytes)(unsigned char *buf, int num); - * int (*status)(void); - */ -extern "Python" int Cryptography_rand_bytes(unsigned char *, int); -extern "Python" int Cryptography_rand_status(void); +typedef struct { + char *password; + int length; + int called; + int error; + int maxsize; +} CRYPTOGRAPHY_PASSWORD_DATA; """ FUNCTIONS = """ int _setup_ssl_threads(void); +int Cryptography_pem_password_cb(char *, int, int, void *); """ MACROS = """ """ CUSTOMIZATIONS = """ -static const long Cryptography_STATIC_CALLBACKS = 1; -""" - -if cffi.__version_info__ < (1, 4, 0): - # backwards compatibility for old cffi version on PyPy - TYPES = "static const long Cryptography_STATIC_CALLBACKS;" - CUSTOMIZATIONS = """static const long Cryptography_STATIC_CALLBACKS = 0; -""" - -CUSTOMIZATIONS += """ /* This code is derived from the locking code found in the Python _ssl module's locking callback for OpenSSL. @@ -118,4 +95,31 @@ int _setup_ssl_threads(void) { } return 1; } + +typedef struct { + char *password; + int length; + int called; + int error; + int maxsize; +} CRYPTOGRAPHY_PASSWORD_DATA; + +int Cryptography_pem_password_cb(char *buf, int size, + int rwflag, void *userdata) { + /* The password cb is only invoked if OpenSSL decides the private + key is encrypted. So this path only occurs if it needs a password */ + CRYPTOGRAPHY_PASSWORD_DATA *st = (CRYPTOGRAPHY_PASSWORD_DATA *)userdata; + st->called += 1; + st->maxsize = size; + if (st->length == 0) { + st->error = -1; + return 0; + } else if (st->length < size) { + memcpy(buf, st->password, st->length); + return st->length; + } else { + st->error = -2; + return 0; + } +} """ diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index e5144951..e16cb621 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -45,7 +45,7 @@ from cryptography.hazmat.backends.openssl.x509 import ( _Certificate, _CertificateRevocationList, _CertificateSigningRequest, _RevokedCertificate ) -from cryptography.hazmat.bindings._openssl import ffi as _ffi +from cryptography.hazmat.bindings._openssl import lib as _lib from cryptography.hazmat.bindings.openssl import binding from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa @@ -64,50 +64,6 @@ from cryptography.hazmat.primitives.kdf import scrypt _MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) -class _PasswordUserdata(object): - def __init__(self, password): - if password is not None and not isinstance(password, bytes): - raise TypeError("Password must be bytes") - self.password = password - self.called = 0 - self.exception = None - - -@binding.ffi_callback("int (char *, int, int, void *)", - name="Cryptography_pem_password_cb") -def _pem_password_cb(buf, size, writing, userdata_handle): - """ - A pem_password_cb function pointer that copied the password to - OpenSSL as required and returns the number of bytes copied. - - typedef int pem_password_cb(char *buf, int size, - int rwflag, void *userdata); - - Useful for decrypting PKCS8 files and so on. - - The userdata pointer must point to a cffi handle of a - _PasswordUserdata instance. - """ - ud = _ffi.from_handle(userdata_handle) - ud.called += 1 - - if not ud.password: - ud.exception = TypeError( - "Password was not given but private key is encrypted." - ) - return -1 - elif len(ud.password) < size: - pw_buf = _ffi.buffer(buf, size) - pw_buf[:len(ud.password)] = ud.password - return len(ud.password) - else: - ud.exception = ValueError( - "Passwords longer than {0} bytes are not supported " - "by this backend.".format(size - 1) - ) - return 0 - - @utils.register_interface(CipherBackend) @utils.register_interface(CMACBackend) @utils.register_interface(DERSerializationBackend) @@ -564,26 +520,6 @@ class Backend(object): else: raise UnsupportedAlgorithm("Unsupported key type.") - def _pem_password_cb(self, password): - """ - Generate a pem_password_cb function pointer that copied the password to - OpenSSL as required and returns the number of bytes copied. - - typedef int pem_password_cb(char *buf, int size, - int rwflag, void *userdata); - - Useful for decrypting PKCS8 files and so on. - - Returns a tuple of (cdata function pointer, userdata). - """ - # Forward compatibility for new static callbacks: - # _pem_password_cb is not a nested function because closures don't - # work well with static callbacks. Static callbacks are registered - # globally. The backend is passed in as userdata argument. - - userdata = _PasswordUserdata(password=password) - return _pem_password_cb, userdata - def _oaep_hash_supported(self, algorithm): if self._lib.Cryptography_HAS_RSA_OAEP_MD: return isinstance( @@ -1185,21 +1121,36 @@ class Backend(object): def _load_key(self, openssl_read_func, convert_func, data, password): mem_bio = self._bytes_to_bio(data) - password_cb, userdata = self._pem_password_cb(password) - userdata_handle = self._ffi.new_handle(userdata) + if password is not None and not isinstance(password, bytes): + raise TypeError("Password must be bytes") + + userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") + if password is not None: + password_buf = self._ffi.new("char []", password) + userdata.password = password_buf + userdata.length = len(password) evp_pkey = openssl_read_func( mem_bio.bio, self._ffi.NULL, - password_cb, - userdata_handle, + self._ffi.addressof(_lib, "Cryptography_pem_password_cb"), + userdata, ) if evp_pkey == self._ffi.NULL: - if userdata.exception is not None: + if userdata.error != 0: errors = self._consume_errors() self.openssl_assert(errors) - raise userdata.exception + if userdata.error == -1: + raise TypeError( + "Password was not given but private key is encrypted" + ) + else: + assert userdata.error == -2 + raise ValueError( + "Passwords longer than {0} bytes are not supported " + "by this backend.".format(userdata.maxsize - 1) + ) else: self._handle_key_loading_error() diff --git a/src/cryptography/hazmat/bindings/openssl/binding.py b/src/cryptography/hazmat/bindings/openssl/binding.py index b6617543..6f9359c7 100644 --- a/src/cryptography/hazmat/bindings/openssl/binding.py +++ b/src/cryptography/hazmat/bindings/openssl/binding.py @@ -61,25 +61,6 @@ def _openssl_assert(lib, ok): ) -def ffi_callback(signature, name, **kwargs): - """Callback dispatcher - - The ffi_callback() dispatcher keeps callbacks compatible between dynamic - and static callbacks. - """ - def wrapper(func): - if lib.Cryptography_STATIC_CALLBACKS: - # def_extern() returns a decorator that sets the internal - # function pointer and returns the original function unmodified. - ffi.def_extern(name=name, **kwargs)(func) - callback = getattr(lib, name) - else: - # callback() wraps the function in a cdata function. - callback = ffi.callback(signature, **kwargs)(func) - return callback - return wrapper - - def build_conditional_library(lib, conditional_names): conditional_lib = types.ModuleType("lib") excluded_names = set() diff --git a/tests/hazmat/backends/test_openssl.py b/tests/hazmat/backends/test_openssl.py index a8198317..ff8a42ef 100644 --- a/tests/hazmat/backends/test_openssl.py +++ b/tests/hazmat/backends/test_openssl.py @@ -497,23 +497,32 @@ class TestOpenSSLCreateRevokedCertificate(object): class TestOpenSSLSerializationWithOpenSSL(object): - def test_pem_password_cb_buffer_too_small(self): - ffi_cb, userdata = backend._pem_password_cb(b"aa") - handle = backend._ffi.new_handle(userdata) - buf = backend._ffi.new('char *') - assert ffi_cb(buf, 1, False, handle) == 0 - assert userdata.called == 1 - assert isinstance(userdata.exception, ValueError) - def test_pem_password_cb(self): - password = b'abcdefg' - buf_size = len(password) + 1 - ffi_cb, userdata = backend._pem_password_cb(password) - handle = backend._ffi.new_handle(userdata) - buf = backend._ffi.new('char[]', buf_size) - assert ffi_cb(buf, buf_size, False, handle) == len(password) + userdata = backend._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") + pw = b"abcdefg" + password = backend._ffi.new("char []", pw) + userdata.password = password + userdata.length = len(pw) + buflen = 10 + buf = backend._ffi.new("char []", buflen) + res = backend._lib.Cryptography_pem_password_cb( + buf, buflen, 0, userdata + ) + assert res == len(pw) assert userdata.called == 1 - assert backend._ffi.string(buf, len(password)) == password + assert backend._ffi.buffer(buf, len(pw))[:] == pw + assert userdata.maxsize == buflen + assert userdata.error == 0 + + def test_pem_password_cb_no_password(self): + userdata = backend._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") + buflen = 10 + buf = backend._ffi.new("char []", buflen) + res = backend._lib.Cryptography_pem_password_cb( + buf, buflen, 0, userdata + ) + assert res == 0 + assert userdata.error == -1 def test_unsupported_evp_pkey_type(self): key = backend._create_evp_pkey_gc() |