aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPaul Kehrer <paul.l.kehrer@gmail.com>2013-10-19 14:12:04 -0500
committerPaul Kehrer <paul.l.kehrer@gmail.com>2013-10-21 08:50:05 -0500
commit620c2aec10423c11e49cbffc71efe19a190f9187 (patch)
treed8b3abaf1a64b720ab5410087f82309c1bb43c22
parente58820924094b8846bf739c2eb66f049a9a75939 (diff)
downloadcryptography-620c2aec10423c11e49cbffc71efe19a190f9187.tar.gz
cryptography-620c2aec10423c11e49cbffc71efe19a190f9187.tar.bz2
cryptography-620c2aec10423c11e49cbffc71efe19a190f9187.zip
block cipher decryption support
This is a squash of previous commits plus new ones. Ran into a pile of conflicts during the rebase and decided this was an easier way to retain a sane commit history
-rw-r--r--cryptography/bindings/openssl/api.py70
-rw-r--r--cryptography/bindings/openssl/evp.py5
-rw-r--r--cryptography/primitives/block/base.py60
-rw-r--r--tests/primitives/test_block.py54
-rw-r--r--tests/primitives/utils.py9
5 files changed, 132 insertions, 66 deletions
diff --git a/cryptography/bindings/openssl/api.py b/cryptography/bindings/openssl/api.py
index f5e042e7..fdac4e91 100644
--- a/cryptography/bindings/openssl/api.py
+++ b/cryptography/bindings/openssl/api.py
@@ -98,9 +98,25 @@ class API(object):
return (self.ffi.NULL !=
self.lib.EVP_get_cipherbyname(ciphername.encode("ascii")))
- def create_block_cipher_context(self, cipher, mode):
- ctx = self.lib.EVP_CIPHER_CTX_new()
- ctx = self.ffi.gc(ctx, self.lib.EVP_CIPHER_CTX_free)
+ def create_block_cipher_encrypt_context(self, cipher, mode):
+ ctx, args = self._create_block_cipher_context(cipher, mode)
+ res = self.lib.EVP_EncryptInit_ex(*args)
+ assert res != 0
+ self._disable_padding(ctx)
+ return ctx
+
+ def create_block_cipher_decrypt_context(self, cipher, mode):
+ ctx, args = self._create_block_cipher_context(cipher, mode)
+ res = self.lib.EVP_DecryptInit_ex(*args)
+ assert res != 0
+ self._disable_padding(ctx)
+ return ctx
+
+ def _create_block_cipher_context(self, cipher, mode):
+ ctx = self.ffi.new("EVP_CIPHER_CTX *")
+ res = self.lib.EVP_CIPHER_CTX_init(ctx)
+ assert res != 0
+ ctx = self.ffi.gc(ctx, self.lib.EVP_CIPHER_CTX_cleanup)
# TODO: compute name using a better algorithm
ciphername = "{0}-{1}-{2}".format(
cipher.name, cipher.key_size, mode.name
@@ -114,36 +130,51 @@ class API(object):
else:
iv_nonce = self.ffi.NULL
- # TODO: Sometimes this needs to be a DecryptInit, when?
- res = self.lib.EVP_EncryptInit_ex(
- ctx, evp_cipher, self.ffi.NULL, cipher.key, iv_nonce
- )
- assert res != 0
+ return (ctx, (ctx, evp_cipher, self.ffi.NULL, cipher.key, iv_nonce))
+ def _disable_padding(self, ctx):
# We purposely disable padding here as it's handled higher up in the
# API.
self.lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
- return ctx
- def update_encrypt_context(self, ctx, plaintext):
- block_size = self.lib.EVP_CIPHER_CTX_block_size(ctx)
- buf = self.ffi.new("unsigned char[]", len(plaintext) + block_size - 1)
- outlen = self.ffi.new("int *")
- res = self.lib.EVP_EncryptUpdate(
- ctx, buf, outlen, plaintext, len(plaintext)
- )
+ def update_encrypt_context(self, ctx, data):
+ buf, outlen = self._create_buf_out(ctx, len(data))
+ res = self.lib.EVP_EncryptUpdate(ctx, buf, outlen, data, len(data))
assert res != 0
return self.ffi.buffer(buf)[:outlen[0]]
- def finalize_encrypt_context(self, ctx):
+ def update_decrypt_context(self, ctx, data):
+ buf, outlen = self._create_buf_out(ctx, len(data))
+ res = self.lib.EVP_DecryptUpdate(ctx, buf, outlen, data, len(data))
+ assert res != 0
+ return self.ffi.buffer(buf)[:outlen[0]]
+
+ def _create_buf_out(self, ctx, data_len):
block_size = self.lib.EVP_CIPHER_CTX_block_size(ctx)
- buf = self.ffi.new("unsigned char[]", block_size)
+ buf = self.ffi.new("unsigned char[]", data_len + block_size - 1)
outlen = self.ffi.new("int *")
+ return (buf, outlen)
+
+ def finalize_encrypt_context(self, ctx):
+ buf, outlen = self._create_final_buf_out(ctx)
res = self.lib.EVP_EncryptFinal_ex(ctx, buf, outlen)
assert res != 0
+ self._cleanup_block_cipher(ctx)
+ return self.ffi.buffer(buf)[:outlen[0]]
+
+ def finalize_decrypt_context(self, ctx):
+ buf, outlen = self._create_final_buf_out(ctx)
+ res = self.lib.EVP_DecryptFinal_ex(ctx, buf, outlen)
+ assert res != 0
+ self._cleanup_block_cipher(ctx)
+ return self.ffi.buffer(buf)[:outlen[0]]
+
+ def _create_final_buf_out(self, ctx):
+ return self._create_buf_out(ctx, 1)
+
+ def _cleanup_block_cipher(self, ctx):
res = self.lib.EVP_CIPHER_CTX_cleanup(ctx)
assert res == 1
- return self.ffi.buffer(buf)[:outlen[0]]
def supports_hash(self, hash_cls):
return (self.ffi.NULL !=
@@ -177,5 +208,4 @@ class API(object):
assert res != 0
return copied_ctx
-
api = API()
diff --git a/cryptography/bindings/openssl/evp.py b/cryptography/bindings/openssl/evp.py
index 2bb5b0f7..41df1056 100644
--- a/cryptography/bindings/openssl/evp.py
+++ b/cryptography/bindings/openssl/evp.py
@@ -41,6 +41,11 @@ int EVP_CIPHER_CTX_set_padding(EVP_CIPHER_CTX *, int);
int EVP_EncryptUpdate(EVP_CIPHER_CTX *, unsigned char *, int *,
const unsigned char *, int);
int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *, unsigned char *, int *);
+int EVP_DecryptInit_ex(EVP_CIPHER_CTX *, const EVP_CIPHER *, ENGINE *,
+ const unsigned char *, const unsigned char *);
+int EVP_DecryptUpdate(EVP_CIPHER_CTX *, unsigned char *, int *,
+ const unsigned char *, int);
+int EVP_DecryptFinal_ex(EVP_CIPHER_CTX *, unsigned char *, int *);
int EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *);
const EVP_CIPHER *EVP_CIPHER_CTX_cipher(const EVP_CIPHER_CTX *);
int EVP_CIPHER_block_size(const EVP_CIPHER *);
diff --git a/cryptography/primitives/block/base.py b/cryptography/primitives/block/base.py
index 50e9e9e5..b6f45778 100644
--- a/cryptography/primitives/block/base.py
+++ b/cryptography/primitives/block/base.py
@@ -13,14 +13,11 @@
from __future__ import absolute_import, division, print_function
-from enum import Enum
-
-from cryptography.bindings import _default_api
+import abc
+import six
-class _Operation(Enum):
- encrypt = 0
- decrypt = 1
+from cryptography.bindings import _default_api
class BlockCipher(object):
@@ -33,30 +30,51 @@ class BlockCipher(object):
self.cipher = cipher
self.mode = mode
self._api = api
- self._ctx = api.create_block_cipher_context(cipher, mode)
- self._operation = None
- def encrypt(self, plaintext):
- if self._ctx is None:
- raise ValueError("BlockCipher was already finalized")
+ def encryptor(self):
+ return _BlockCipherEncryptionContext(self.cipher, self.mode, self._api)
+
+ def decryptor(self):
+ return _BlockCipherDecryptionContext(self.cipher, self.mode, self._api)
- if self._operation is None:
- self._operation = _Operation.encrypt
- elif self._operation is not _Operation.encrypt:
- raise ValueError("BlockCipher cannot encrypt when the operation is"
- " set to %s" % self._operation.name)
- return self._api.update_encrypt_context(self._ctx, plaintext)
+class _BlockCipherContext(six.with_metaclass(abc.ABCMeta)):
+ def __init__(self, cipher, mode, api):
+ super(_BlockCipherContext, self).__init__()
+ self.cipher = cipher
+ self.mode = mode
+ self._api = api
+ if isinstance(self, _BlockCipherEncryptionContext):
+ ctx_method = self._api.create_block_cipher_encrypt_context
+ else:
+ ctx_method = self._api.create_block_cipher_decrypt_context
+ self._ctx = ctx_method(self.cipher, self.mode)
def finalize(self):
if self._ctx is None:
- raise ValueError("BlockCipher was already finalized")
+ raise ValueError("Context was already finalized")
- if self._operation is _Operation.encrypt:
+ if isinstance(self, _BlockCipherEncryptionContext):
result = self._api.finalize_encrypt_context(self._ctx)
else:
- raise ValueError("BlockCipher cannot finalize the unknown "
- "operation %s" % self._operation.name)
+ result = self._api.finalize_decrypt_context(self._ctx)
self._ctx = None
return result
+
+ def update(self, data):
+ if self._ctx is None:
+ raise ValueError("Context was already finalized")
+
+ if isinstance(self, _BlockCipherEncryptionContext):
+ return self._api.update_encrypt_context(self._ctx, data)
+ else:
+ return self._api.update_decrypt_context(self._ctx, data)
+
+
+class _BlockCipherEncryptionContext(_BlockCipherContext):
+ pass
+
+
+class _BlockCipherDecryptionContext(_BlockCipherContext):
+ pass
diff --git a/tests/primitives/test_block.py b/tests/primitives/test_block.py
index 9f5905bf..4a67002f 100644
--- a/tests/primitives/test_block.py
+++ b/tests/primitives/test_block.py
@@ -15,11 +15,9 @@ from __future__ import absolute_import, division, print_function
import binascii
-import pretend
import pytest
from cryptography.primitives.block import BlockCipher, ciphers, modes
-from cryptography.primitives.block.base import _Operation
class TestBlockCipher(object):
@@ -29,40 +27,42 @@ class TestBlockCipher(object):
modes.CBC(binascii.unhexlify(b"0" * 32))
)
- def test_use_after_finalize(self, api):
+ def test_creates_encryptor(self):
cipher = BlockCipher(
ciphers.AES(binascii.unhexlify(b"0" * 32)),
- modes.CBC(binascii.unhexlify(b"0" * 32)),
- api
+ modes.CBC(binascii.unhexlify(b"0" * 32))
)
- cipher.encrypt(b"a" * 16)
- cipher.finalize()
- with pytest.raises(ValueError):
- cipher.encrypt(b"b" * 16)
- with pytest.raises(ValueError):
- cipher.finalize()
+ assert cipher.encryptor() is not None
- def test_encrypt_with_invalid_operation(self, api):
+ def test_creates_decryptor(self):
cipher = BlockCipher(
ciphers.AES(binascii.unhexlify(b"0" * 32)),
- modes.CBC(binascii.unhexlify(b"0" * 32)),
- api
+ modes.CBC(binascii.unhexlify(b"0" * 32))
)
- cipher._operation = _Operation.decrypt
+ assert cipher.decryptor() is not None
- with pytest.raises(ValueError):
- cipher.encrypt(b"b" * 16)
- def test_finalize_with_invalid_operation(self, api):
+class TestBlockCipherContext(object):
+ def test_use_after_finalize(self, api):
cipher = BlockCipher(
ciphers.AES(binascii.unhexlify(b"0" * 32)),
modes.CBC(binascii.unhexlify(b"0" * 32)),
api
)
- cipher._operation = pretend.stub(name="wat")
-
+ context = cipher.encryptor()
+ context.update(b"a" * 16)
+ context.finalize()
+ with pytest.raises(ValueError):
+ context.update(b"b" * 16)
+ with pytest.raises(ValueError):
+ context.finalize()
+ context = cipher.decryptor()
+ context.update(b"a" * 16)
+ context.finalize()
+ with pytest.raises(ValueError):
+ context.update(b"b" * 16)
with pytest.raises(ValueError):
- cipher.finalize()
+ context.finalize()
def test_unaligned_block_encryption(self, api):
cipher = BlockCipher(
@@ -70,7 +70,15 @@ class TestBlockCipher(object):
modes.ECB(),
api
)
- ct = cipher.encrypt(b"a" * 15)
+ context = cipher.encryptor()
+ ct = context.update(b"a" * 15)
assert ct == b""
- ct += cipher.encrypt(b"a" * 65)
+ ct += context.update(b"a" * 65)
assert len(ct) == 80
+ ct += context.finalize()
+ context = cipher.decryptor()
+ pt = context.update(ct[:3])
+ assert pt == b""
+ pt += context.update(ct[3:])
+ assert len(pt) == 80
+ context.finalize()
diff --git a/tests/primitives/utils.py b/tests/primitives/utils.py
index a3759b03..70ece52a 100644
--- a/tests/primitives/utils.py
+++ b/tests/primitives/utils.py
@@ -37,9 +37,14 @@ def encrypt_test(api, cipher_factory, mode_factory, params, only_if,
mode_factory(**params),
api
)
- actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext))
- actual_ciphertext += cipher.finalize()
+ encryptor = cipher.encryptor()
+ actual_ciphertext = encryptor.update(binascii.unhexlify(plaintext))
+ actual_ciphertext += encryptor.finalize()
assert actual_ciphertext == binascii.unhexlify(ciphertext)
+ decryptor = cipher.decryptor()
+ actual_plaintext = decryptor.update(binascii.unhexlify(ciphertext))
+ actual_plaintext += decryptor.finalize()
+ assert actual_plaintext == binascii.unhexlify(plaintext)
def generate_hash_test(param_loader, path, file_names, hash_cls,