From 653463f0e133def71425a26fdd80bfe7c8ad5961 Mon Sep 17 00:00:00 2001 From: Paul Kehrer Date: Mon, 21 Oct 2013 17:55:01 -0500 Subject: address review comments * inline some methods * refactor enc/dec classes * modify docs --- cryptography/bindings/openssl/api.py | 56 +++++++++++++++----------------- cryptography/primitives/block/base.py | 55 +++++++++++++++---------------- docs/primitives/symmetric-encryption.rst | 9 +++-- tests/primitives/test_block.py | 37 +++++++++++---------- 4 files changed, 79 insertions(+), 78 deletions(-) diff --git a/cryptography/bindings/openssl/api.py b/cryptography/bindings/openssl/api.py index fc61c4b8..a3198c1a 100644 --- a/cryptography/bindings/openssl/api.py +++ b/cryptography/bindings/openssl/api.py @@ -99,17 +99,23 @@ class API(object): self.lib.EVP_get_cipherbyname(ciphername.encode("ascii"))) def create_block_cipher_encrypt_context(self, cipher, mode): - ctx, args = self._create_block_cipher_context(cipher, mode) - res = self.lib.EVP_EncryptInit_ex(*args) + ctx, evp, iv_nonce = self._create_block_cipher_context(cipher, mode) + res = self.lib.EVP_EncryptInit_ex(ctx, evp, api.ffi.NULL, cipher.key, + iv_nonce) assert res != 0 - self._disable_padding(ctx) + # We purposely disable padding here as it's handled higher up in the + # API. + self.lib.EVP_CIPHER_CTX_set_padding(ctx, 0) return ctx def create_block_cipher_decrypt_context(self, cipher, mode): - ctx, args = self._create_block_cipher_context(cipher, mode) - res = self.lib.EVP_DecryptInit_ex(*args) + ctx, evp, iv_nonce = self._create_block_cipher_context(cipher, mode) + res = self.lib.EVP_DecryptInit_ex(ctx, evp, api.ffi.NULL, cipher.key, + iv_nonce) assert res != 0 - self._disable_padding(ctx) + # We purposely disable padding here as it's handled higher up in the + # API. + self.lib.EVP_CIPHER_CTX_set_padding(ctx, 0) return ctx def _create_block_cipher_context(self, cipher, mode): @@ -130,51 +136,43 @@ class API(object): else: iv_nonce = self.ffi.NULL - return (ctx, (ctx, evp_cipher, self.ffi.NULL, cipher.key, iv_nonce)) - - def _disable_padding(self, ctx): - # We purposely disable padding here as it's handled higher up in the - # API. - self.lib.EVP_CIPHER_CTX_set_padding(ctx, 0) + return (ctx, evp_cipher, iv_nonce) def update_encrypt_context(self, ctx, data): - buf, outlen = self._create_buf_out(ctx, len(data)) + block_size = self.lib.EVP_CIPHER_CTX_block_size(ctx) + buf = self.ffi.new("unsigned char[]", len(data) + block_size - 1) + outlen = self.ffi.new("int *") res = self.lib.EVP_EncryptUpdate(ctx, buf, outlen, data, len(data)) assert res != 0 return self.ffi.buffer(buf)[:outlen[0]] def update_decrypt_context(self, ctx, data): - buf, outlen = self._create_buf_out(ctx, len(data)) + block_size = self.lib.EVP_CIPHER_CTX_block_size(ctx) + buf = self.ffi.new("unsigned char[]", len(data) + block_size - 1) + outlen = self.ffi.new("int *") res = self.lib.EVP_DecryptUpdate(ctx, buf, outlen, data, len(data)) assert res != 0 return self.ffi.buffer(buf)[:outlen[0]] - def _create_buf_out(self, ctx, data_len): + def finalize_encrypt_context(self, ctx): block_size = self.lib.EVP_CIPHER_CTX_block_size(ctx) - buf = self.ffi.new("unsigned char[]", data_len + block_size - 1) + buf = self.ffi.new("unsigned char[]", block_size) outlen = self.ffi.new("int *") - return (buf, outlen) - - def finalize_encrypt_context(self, ctx): - buf, outlen = self._create_final_buf_out(ctx) res = self.lib.EVP_EncryptFinal_ex(ctx, buf, outlen) assert res != 0 - self._cleanup_block_cipher(ctx) + res = self.lib.EVP_CIPHER_CTX_cleanup(ctx) + assert res == 1 return self.ffi.buffer(buf)[:outlen[0]] def finalize_decrypt_context(self, ctx): - buf, outlen = self._create_final_buf_out(ctx) + block_size = self.lib.EVP_CIPHER_CTX_block_size(ctx) + buf = self.ffi.new("unsigned char[]", block_size) + outlen = self.ffi.new("int *") res = self.lib.EVP_DecryptFinal_ex(ctx, buf, outlen) assert res != 0 - self._cleanup_block_cipher(ctx) - return self.ffi.buffer(buf)[:outlen[0]] - - def _create_final_buf_out(self, ctx): - return self._create_buf_out(ctx, 1) - - def _cleanup_block_cipher(self, ctx): res = self.lib.EVP_CIPHER_CTX_cleanup(ctx) assert res == 1 + return self.ffi.buffer(buf)[:outlen[0]] def supports_hash(self, hash_cls): return (self.ffi.NULL != diff --git a/cryptography/primitives/block/base.py b/cryptography/primitives/block/base.py index 650e39c1..14704ffe 100644 --- a/cryptography/primitives/block/base.py +++ b/cryptography/primitives/block/base.py @@ -13,16 +13,9 @@ from __future__ import absolute_import, division, print_function -from enum import Enum - from cryptography.bindings import _default_api -class _Operation(Enum): - encrypt = 0 - decrypt = 1 - - class BlockCipher(object): def __init__(self, cipher, mode, api=None): super(BlockCipher, self).__init__() @@ -35,39 +28,45 @@ class BlockCipher(object): self._api = api def encryptor(self): - return _BlockCipherContext(self.cipher, self.mode, self._api, - _Operation.encrypt) + return _BlockCipherEncryptionContext(self.cipher, self.mode, self._api) def decryptor(self): - return _BlockCipherContext(self.cipher, self.mode, self._api, - _Operation.decrypt) + return _BlockCipherDecryptionContext(self.cipher, self.mode, self._api) + + +class _BlockCipherEncryptionContext(object): + def __init__(self, cipher, mode, api): + super(_BlockCipherEncryptionContext, self).__init__() + self._api = api + self._ctx = self._api.create_block_cipher_encrypt_context(cipher, mode) + + def update(self, data): + if self._ctx is None: + raise ValueError("Context was already finalized") + return self._api.update_encrypt_context(self._ctx, data) + + def finalize(self): + if self._ctx is None: + raise ValueError("Context was already finalized") + data = self._api.finalize_encrypt_context(self._ctx) + self._ctx = None + return data -class _BlockCipherContext(object): - def __init__(self, cipher, mode, api, operation): - super(_BlockCipherContext, self).__init__() +class _BlockCipherDecryptionContext(object): + def __init__(self, cipher, mode, api): + super(_BlockCipherDecryptionContext, self).__init__() self._api = api - self._operation = operation - args = (cipher, mode) - if self._operation == _Operation.encrypt: - self._ctx = self._api.create_block_cipher_encrypt_context(*args) - else: - self._ctx = self._api.create_block_cipher_decrypt_context(*args) + self._ctx = self._api.create_block_cipher_decrypt_context(cipher, mode) def update(self, data): if self._ctx is None: raise ValueError("Context was already finalized") - if self._operation == _Operation.encrypt: - return self._api.update_encrypt_context(self._ctx, data) - else: - return self._api.update_decrypt_context(self._ctx, data) + return self._api.update_decrypt_context(self._ctx, data) def finalize(self): if self._ctx is None: raise ValueError("Context was already finalized") - if self._operation == _Operation.encrypt: - data = self._api.finalize_encrypt_context(self._ctx) - else: - data = self._api.finalize_decrypt_context(self._ctx) + data = self._api.finalize_decrypt_context(self._ctx) self._ctx = None return data diff --git a/docs/primitives/symmetric-encryption.rst b/docs/primitives/symmetric-encryption.rst index 4f404789..a8d9485d 100644 --- a/docs/primitives/symmetric-encryption.rst +++ b/docs/primitives/symmetric-encryption.rst @@ -15,14 +15,17 @@ where the encrypter and decrypter both use the same key. Block ciphers work by encrypting content in chunks, often 64- or 128-bits. They combine an underlying algorithm (such as AES), with a mode (such as - CBC, CTR, or GCM). A simple example of encrypting content with AES is: + CBC, CTR, or GCM). A simple example of encrypting (and then decrypting) + content with AES is: .. doctest:: >>> from cryptography.primitives.block import BlockCipher, ciphers, modes >>> cipher = BlockCipher(ciphers.AES(key), modes.CBC(iv)) - >>> context = cipher.encryptor() - >>> context.update(b"a secret message") + context.finalize() + >>> encrypt = cipher.encryptor() + >>> ct = encrypt.update(b"a secret message") + encrypt.finalize() + >>> decrypt = cipher.decryptor() + >>> decrypt.update(ct) + decrypt.finalize() '...' :param cipher: One of the ciphers described below. diff --git a/tests/primitives/test_block.py b/tests/primitives/test_block.py index 4a67002f..8e429085 100644 --- a/tests/primitives/test_block.py +++ b/tests/primitives/test_block.py @@ -49,20 +49,20 @@ class TestBlockCipherContext(object): modes.CBC(binascii.unhexlify(b"0" * 32)), api ) - context = cipher.encryptor() - context.update(b"a" * 16) - context.finalize() + encryptor = cipher.encryptor() + encryptor.update(b"a" * 16) + encryptor.finalize() with pytest.raises(ValueError): - context.update(b"b" * 16) + encryptor.update(b"b" * 16) with pytest.raises(ValueError): - context.finalize() - context = cipher.decryptor() - context.update(b"a" * 16) - context.finalize() + encryptor.finalize() + decryptor = cipher.decryptor() + decryptor.update(b"a" * 16) + decryptor.finalize() with pytest.raises(ValueError): - context.update(b"b" * 16) + decryptor.update(b"b" * 16) with pytest.raises(ValueError): - context.finalize() + decryptor.finalize() def test_unaligned_block_encryption(self, api): cipher = BlockCipher( @@ -70,15 +70,16 @@ class TestBlockCipherContext(object): modes.ECB(), api ) - context = cipher.encryptor() - ct = context.update(b"a" * 15) + encryptor = cipher.encryptor() + ct = encryptor.update(b"a" * 15) assert ct == b"" - ct += context.update(b"a" * 65) + ct += encryptor.update(b"a" * 65) assert len(ct) == 80 - ct += context.finalize() - context = cipher.decryptor() - pt = context.update(ct[:3]) + ct += encryptor.finalize() + decryptor = cipher.decryptor() + pt = decryptor.update(ct[:3]) assert pt == b"" - pt += context.update(ct[3:]) + pt += decryptor.update(ct[3:]) assert len(pt) == 80 - context.finalize() + assert pt == b"a" * 80 + decryptor.finalize() -- cgit v1.2.3