From 1ab9afd0f081a384843aad7602a26bb2d22a3a9e Mon Sep 17 00:00:00 2001 From: Paul Kehrer Date: Sat, 8 Jul 2017 08:07:28 -0500 Subject: make the aead work a bit more generic (#3761) --- src/cryptography/hazmat/backends/openssl/aead.py | 111 +++++++++++++++++++++ .../hazmat/backends/openssl/backend.py | 10 +- .../hazmat/backends/openssl/chacha20poly1305.py | 101 ------------------- 3 files changed, 116 insertions(+), 106 deletions(-) create mode 100644 src/cryptography/hazmat/backends/openssl/aead.py delete mode 100644 src/cryptography/hazmat/backends/openssl/chacha20poly1305.py diff --git a/src/cryptography/hazmat/backends/openssl/aead.py b/src/cryptography/hazmat/backends/openssl/aead.py new file mode 100644 index 00000000..5eb2e997 --- /dev/null +++ b/src/cryptography/hazmat/backends/openssl/aead.py @@ -0,0 +1,111 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +from __future__ import absolute_import, division, print_function + +from cryptography.exceptions import InvalidTag + + +_ENCRYPT = 1 +_DECRYPT = 0 + + +def _aead_setup(backend, cipher_name, key, nonce, tag, tag_len, operation): + evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name) + backend.openssl_assert(evp_cipher != backend._ffi.NULL) + ctx = backend._lib.EVP_CIPHER_CTX_new() + ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free) + res = backend._lib.EVP_CipherInit_ex( + ctx, evp_cipher, + backend._ffi.NULL, + backend._ffi.NULL, + backend._ffi.NULL, + int(operation == _ENCRYPT) + ) + backend.openssl_assert(res != 0) + res = backend._lib.EVP_CIPHER_CTX_set_key_length(ctx, len(key)) + backend.openssl_assert(res != 0) + res = backend._lib.EVP_CIPHER_CTX_ctrl( + ctx, backend._lib.EVP_CTRL_AEAD_SET_IVLEN, len(nonce), + backend._ffi.NULL + ) + backend.openssl_assert(res != 0) + if operation == _DECRYPT: + res = backend._lib.EVP_CIPHER_CTX_ctrl( + ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag + ) + backend.openssl_assert(res != 0) + else: + res = backend._lib.EVP_CIPHER_CTX_ctrl( + ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, tag_len, backend._ffi.NULL + ) + + res = backend._lib.EVP_CipherInit_ex( + ctx, + backend._ffi.NULL, + backend._ffi.NULL, + key, + nonce, + int(operation == _ENCRYPT) + ) + backend.openssl_assert(res != 0) + return ctx + + +def _process_aad(backend, ctx, associated_data): + outlen = backend._ffi.new("int *") + res = backend._lib.EVP_CipherUpdate( + ctx, backend._ffi.NULL, outlen, associated_data, len(associated_data) + ) + backend.openssl_assert(res != 0) + + +def _process_data(backend, ctx, data): + outlen = backend._ffi.new("int *") + buf = backend._ffi.new("unsigned char[]", len(data)) + res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data, len(data)) + backend.openssl_assert(res != 0) + return backend._ffi.buffer(buf, outlen[0])[:] + + +def _encrypt(backend, cipher_name, key, nonce, data, associated_data, + tag_length): + ctx = _aead_setup( + backend, cipher_name, key, nonce, None, tag_length, _ENCRYPT + ) + + _process_aad(backend, ctx, associated_data) + processed_data = _process_data(backend, ctx, data) + outlen = backend._ffi.new("int *") + res = backend._lib.EVP_CipherFinal_ex(ctx, backend._ffi.NULL, outlen) + backend.openssl_assert(res != 0) + backend.openssl_assert(outlen[0] == 0) + tag_buf = backend._ffi.new("unsigned char[]", tag_length) + res = backend._lib.EVP_CIPHER_CTX_ctrl( + ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf + ) + backend.openssl_assert(res != 0) + tag = backend._ffi.buffer(tag_buf)[:] + + return processed_data + tag + + +def _decrypt(backend, cipher_name, key, nonce, data, associated_data, + tag_length): + if len(data) < tag_length: + raise InvalidTag + tag = data[-tag_length:] + data = data[:-tag_length] + ctx = _aead_setup( + backend, cipher_name, key, nonce, tag, tag_length, _DECRYPT + ) + _process_aad(backend, ctx, associated_data) + processed_data = _process_data(backend, ctx, data) + outlen = backend._ffi.new("int *") + res = backend._lib.EVP_CipherFinal_ex(ctx, backend._ffi.NULL, outlen) + if res == 0: + backend._consume_errors() + raise InvalidTag + + return processed_data diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index cf0300e0..c63ea32d 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -20,7 +20,7 @@ from cryptography.hazmat.backends.interfaces import ( EllipticCurveBackend, HMACBackend, HashBackend, PBKDF2HMACBackend, PEMSerializationBackend, RSABackend, ScryptBackend, X509Backend ) -from cryptography.hazmat.backends.openssl import chacha20poly1305 +from cryptography.hazmat.backends.openssl import aead from cryptography.hazmat.backends.openssl.ciphers import _CipherContext from cryptography.hazmat.backends.openssl.cmac import _CMACContext from cryptography.hazmat.backends.openssl.dh import ( @@ -1925,13 +1925,13 @@ class Backend(object): return self._ffi.buffer(buf)[:] def chacha20poly1305_encrypt(self, key, nonce, data, associated_data): - return chacha20poly1305.encrypt( - self, key, nonce, data, associated_data + return aead._encrypt( + self, b"chacha20-poly1305", key, nonce, data, associated_data, 16 ) def chacha20poly1305_decrypt(self, key, nonce, data, associated_data): - return chacha20poly1305.decrypt( - self, key, nonce, data, associated_data + return aead._decrypt( + self, b"chacha20-poly1305", key, nonce, data, associated_data, 16 ) def chacha20poly1305_supported(self): diff --git a/src/cryptography/hazmat/backends/openssl/chacha20poly1305.py b/src/cryptography/hazmat/backends/openssl/chacha20poly1305.py deleted file mode 100644 index 0834f19c..00000000 --- a/src/cryptography/hazmat/backends/openssl/chacha20poly1305.py +++ /dev/null @@ -1,101 +0,0 @@ -# This file is dual licensed under the terms of the Apache License, Version -# 2.0, and the BSD License. See the LICENSE file in the root of this repository -# for complete details. - -from __future__ import absolute_import, division, print_function - -from cryptography.exceptions import InvalidTag - - -_ENCRYPT = 1 -_DECRYPT = 0 - - -def _chacha20poly1305_setup(backend, key, nonce, tag, operation): - evp_cipher = backend._lib.EVP_get_cipherbyname(b"chacha20-poly1305") - ctx = backend._lib.EVP_CIPHER_CTX_new() - ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free) - res = backend._lib.EVP_CipherInit_ex( - ctx, evp_cipher, - backend._ffi.NULL, - backend._ffi.NULL, - backend._ffi.NULL, - int(operation == _ENCRYPT) - ) - backend.openssl_assert(res != 0) - res = backend._lib.EVP_CIPHER_CTX_set_key_length(ctx, len(key)) - backend.openssl_assert(res != 0) - res = backend._lib.EVP_CIPHER_CTX_ctrl( - ctx, backend._lib.EVP_CTRL_AEAD_SET_IVLEN, len(nonce), - backend._ffi.NULL - ) - backend.openssl_assert(res != 0) - if operation == _DECRYPT: - res = backend._lib.EVP_CIPHER_CTX_ctrl( - ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag - ) - backend.openssl_assert(res != 0) - - res = backend._lib.EVP_CipherInit_ex( - ctx, - backend._ffi.NULL, - backend._ffi.NULL, - key, - nonce, - int(operation == _ENCRYPT) - ) - backend.openssl_assert(res != 0) - return ctx - - -def _process_aad(backend, ctx, associated_data): - outlen = backend._ffi.new("int *") - res = backend._lib.EVP_CipherUpdate( - ctx, backend._ffi.NULL, outlen, associated_data, len(associated_data) - ) - backend.openssl_assert(res != 0) - - -def _process_data(backend, ctx, data): - outlen = backend._ffi.new("int *") - buf = backend._ffi.new("unsigned char[]", len(data)) - res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data, len(data)) - backend.openssl_assert(res != 0) - return backend._ffi.buffer(buf, outlen[0])[:] - - -def encrypt(backend, key, nonce, data, associated_data): - ctx = _chacha20poly1305_setup(backend, key, nonce, None, _ENCRYPT) - - _process_aad(backend, ctx, associated_data) - processed_data = _process_data(backend, ctx, data) - outlen = backend._ffi.new("int *") - res = backend._lib.EVP_CipherFinal_ex(ctx, backend._ffi.NULL, outlen) - backend.openssl_assert(res != 0) - backend.openssl_assert(outlen[0] == 0) - # get the tag - tag_buf = backend._ffi.new("unsigned char[]", 16) - res = backend._lib.EVP_CIPHER_CTX_ctrl( - ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, 16, tag_buf - ) - backend.openssl_assert(res != 0) - tag = backend._ffi.buffer(tag_buf)[:] - - return processed_data + tag - - -def decrypt(backend, key, nonce, data, associated_data): - if len(data) < 16: - raise InvalidTag - tag = data[-16:] - data = data[:-16] - ctx = _chacha20poly1305_setup(backend, key, nonce, tag, _DECRYPT) - _process_aad(backend, ctx, associated_data) - processed_data = _process_data(backend, ctx, data) - outlen = backend._ffi.new("int *") - res = backend._lib.EVP_CipherFinal_ex(ctx, backend._ffi.NULL, outlen) - if res == 0: - backend._consume_errors() - raise InvalidTag - - return processed_data -- cgit v1.2.3