diff options
-rw-r--r-- | cryptography/bindings/openssl/api.py | 96 | ||||
-rw-r--r-- | cryptography/primitives/__init__.py | 0 | ||||
-rw-r--r-- | cryptography/primitives/block/__init__.py | 21 | ||||
-rw-r--r-- | cryptography/primitives/block/base.py | 64 | ||||
-rw-r--r-- | cryptography/primitives/block/ciphers.py | 34 | ||||
-rw-r--r-- | cryptography/primitives/block/modes.py | 22 | ||||
-rw-r--r-- | setup.py | 9 | ||||
-rw-r--r-- | tests/primitives/__init__.py | 0 | ||||
-rw-r--r-- | tests/primitives/test_block.py | 63 | ||||
-rw-r--r-- | tests/primitives/test_ciphers.py | 35 | ||||
-rw-r--r-- | tests/primitives/test_nist.py | 199 | ||||
-rw-r--r-- | tox.ini | 4 |
12 files changed, 546 insertions, 1 deletions
diff --git a/cryptography/bindings/openssl/api.py b/cryptography/bindings/openssl/api.py index 3cc6a0e9..2da156cd 100644 --- a/cryptography/bindings/openssl/api.py +++ b/cryptography/bindings/openssl/api.py @@ -13,11 +13,107 @@ from __future__ import absolute_import, division, print_function +import cffi + + +class OpenSSLError(Exception): + def __init__(self, api): + e = api._lib.ERR_get_error() + if e == 0: + raise SystemError( + "Tried to create an OpenSSLError when there was None" + ) + msg = api._ffi.new("char[]", 120) + api._lib.ERR_error_string(e, msg) + super(OpenSSLError, self).__init__(api._ffi.string(msg)) + class API(object): """ OpenSSL API wrapper. """ + def __init__(self): + ffi = cffi.FFI() + self._populate_ffi(ffi) + self._ffi = ffi + self._lib = ffi.verify(""" + #include <openssl/evp.h> + """) + self._lib.OpenSSL_add_all_algorithms() + self._lib.ERR_load_crypto_strings() + + def _populate_ffi(self, ffi): + ffi.cdef(""" + typedef struct { + ...; + } EVP_CIPHER_CTX; + typedef ... EVP_CIPHER; + typedef ... ENGINE; + + void OpenSSL_add_all_algorithms(); + void ERR_load_crypto_strings(); + + const EVP_CIPHER *EVP_get_cipherbyname(const char *); + int EVP_EncryptInit_ex(EVP_CIPHER_CTX *, const EVP_CIPHER *, + ENGINE *, unsigned char *, unsigned char *); + int EVP_CIPHER_CTX_set_padding(EVP_CIPHER_CTX *, int); + int EVP_EncryptUpdate(EVP_CIPHER_CTX *, unsigned char *, int *, + unsigned char *, int); + int EVP_EncryptFinal_ex(EVP_CIPHER_CTX *, unsigned char *, int *); + int EVP_CIPHER_CTX_cleanup(EVP_CIPHER_CTX *); + EVP_CIPHER *EVP_CIPHER_CTX_cipher(const EVP_CIPHER_CTX *); + int EVP_CIPHER_block_size(const EVP_CIPHER *); + + unsigned long ERR_get_error(); + """) + + def create_block_cipher_context(self, cipher, mode): + ctx = self._ffi.new("EVP_CIPHER_CTX *") + ctx = self._ffi.gc(ctx, self._lib.EVP_CIPHER_CTX_cleanup) + # TODO: compute name using a better algorithm + ciphername = "{0}-{1}-{2}".format( + cipher.name, cipher.key_size, mode.name + ) + evp_cipher = self._lib.EVP_get_cipherbyname(ciphername.encode("ascii")) + if evp_cipher == self._ffi.NULL: + raise OpenSSLError(self) + # TODO: only use the key and initialization_vector as needed. Sometimes + # this needs to be a DecryptInit, when? + res = self._lib.EVP_EncryptInit_ex( + ctx, evp_cipher, self._ffi.NULL, cipher.key, + mode.initialization_vector + ) + if res == 0: + raise OpenSSLError(self) + + # We purposely disable padding here as it's handled higher up in the + # API. + self._lib.EVP_CIPHER_CTX_set_padding(ctx, 0) + return ctx + + def update_encrypt_context(self, ctx, plaintext): + buf = self._ffi.new("unsigned char[]", len(plaintext)) + outlen = self._ffi.new("int *") + res = self._lib.EVP_EncryptUpdate( + ctx, buf, outlen, plaintext, len(plaintext) + ) + if res == 0: + raise OpenSSLError(self) + return self._ffi.buffer(buf)[:outlen[0]] + + def finalize_encrypt_context(self, ctx): + cipher = self._lib.EVP_CIPHER_CTX_cipher(ctx) + block_size = self._lib.EVP_CIPHER_block_size(cipher) + buf = self._ffi.new("unsigned char[]", block_size) + outlen = self._ffi.new("int *") + res = self._lib.EVP_EncryptFinal_ex(ctx, buf, outlen) + if res == 0: + raise OpenSSLError(self) + res = self._lib.EVP_CIPHER_CTX_cleanup(ctx) + if res == 0: + raise OpenSSLError(self) + return self._ffi.buffer(buf)[:outlen[0]] + api = API() diff --git a/cryptography/primitives/__init__.py b/cryptography/primitives/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/cryptography/primitives/__init__.py diff --git a/cryptography/primitives/block/__init__.py b/cryptography/primitives/block/__init__.py new file mode 100644 index 00000000..a0c6d03f --- /dev/null +++ b/cryptography/primitives/block/__init__.py @@ -0,0 +1,21 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function + +from cryptography.primitives.block.base import BlockCipher + + +__all__ = [ + "BlockCipher", +] diff --git a/cryptography/primitives/block/base.py b/cryptography/primitives/block/base.py new file mode 100644 index 00000000..f4a04b35 --- /dev/null +++ b/cryptography/primitives/block/base.py @@ -0,0 +1,64 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function + +# TODO: which binding is used should be an option somewhere +from enum import Enum + +from cryptography.bindings.openssl import api + + +class _Operation(Enum): + encrypt = "encrypt" + decrypt = "decrypt" + + +class BlockCipher(object): + def __init__(self, cipher, mode): + super(BlockCipher, self).__init__() + self.cipher = cipher + self.mode = mode + self._ctx = api.create_block_cipher_context(cipher, mode) + self._operation = None + + @property + def name(self): + return "{0}-{1}-{2}".format( + self.cipher.name, self.cipher.key_size, self.mode.name, + ) + + def encrypt(self, plaintext): + if self._ctx is None: + raise ValueError("BlockCipher was already finalized") + + if self._operation is None: + self._operation = _Operation.encrypt + elif self._operation is not _Operation.encrypt: + raise ValueError("BlockCipher cannot encrypt when the operation is" + " set to %s" % self._operation.name) + + return api.update_encrypt_context(self._ctx, plaintext) + + def finalize(self): + if self._ctx is None: + raise ValueError("BlockCipher was already finalized") + + if self._operation is _Operation.encrypt: + result = api.finalize_encrypt_context(self._ctx) + else: + raise ValueError("BlockCipher cannot finalize the unknown " + "operation %s" % self._operation.name) + + self._ctx = None + return result diff --git a/cryptography/primitives/block/ciphers.py b/cryptography/primitives/block/ciphers.py new file mode 100644 index 00000000..2e478705 --- /dev/null +++ b/cryptography/primitives/block/ciphers.py @@ -0,0 +1,34 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function + + +class AES(object): + name = "AES" + block_size = 128 + key_sizes = set([128, 192, 256]) + + def __init__(self, key): + super(AES, self).__init__() + self.key = key + + # Verify that the key size matches the expected key size + if self.key_size not in self.key_sizes: + raise ValueError("Invalid key size (%s) for %s".format( + self.key_size, self.name + )) + + @property + def key_size(self): + return len(self.key) * 8 diff --git a/cryptography/primitives/block/modes.py b/cryptography/primitives/block/modes.py new file mode 100644 index 00000000..de31f086 --- /dev/null +++ b/cryptography/primitives/block/modes.py @@ -0,0 +1,22 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function + + +class CBC(object): + name = "CBC" + + def __init__(self, initialization_vector): + super(CBC, self).__init__() + self.initialization_vector = initialization_vector @@ -10,8 +10,11 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +import sys + from setuptools import setup, find_packages + about = {} with open("cryptography/__about__.py") as fp: exec(fp.read(), about) @@ -27,6 +30,12 @@ setup_requires = [ CFFI_DEPENDENCY, ] +install_requires = [ + "cffi>=0.6", +] + +if sys.version_info[:2] < (3, 4): + install_requires += ["enum34"] setup( name=about["__title__"], diff --git a/tests/primitives/__init__.py b/tests/primitives/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/primitives/__init__.py diff --git a/tests/primitives/test_block.py b/tests/primitives/test_block.py new file mode 100644 index 00000000..774409ca --- /dev/null +++ b/tests/primitives/test_block.py @@ -0,0 +1,63 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function + +import binascii + +import pretend +import pytest + +from cryptography.primitives.block import BlockCipher, ciphers, modes +from cryptography.primitives.block.base import _Operation + + +class TestBlockCipher(object): + def test_cipher_name(self): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(b"0" * 32)), + modes.CBC(binascii.unhexlify(b"0" * 32)) + ) + assert cipher.name == "AES-128-CBC" + + def test_use_after_finalize(self): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(b"0" * 32)), + modes.CBC(binascii.unhexlify(b"0" * 32)) + ) + cipher.encrypt(b"a" * 16) + cipher.finalize() + with pytest.raises(ValueError): + cipher.encrypt(b"b" * 16) + with pytest.raises(ValueError): + cipher.finalize() + + def test_encrypt_with_invalid_operation(self): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(b"0" * 32)), + modes.CBC(binascii.unhexlify(b"0" * 32)) + ) + cipher._operation = _Operation.decrypt + + with pytest.raises(ValueError): + cipher.encrypt(b"b" * 16) + + def test_finalize_with_invalid_operation(self): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(b"0" * 32)), + modes.CBC(binascii.unhexlify(b"0" * 32)) + ) + cipher._operation = pretend.stub(name="wat") + + with pytest.raises(ValueError): + cipher.finalize() diff --git a/tests/primitives/test_ciphers.py b/tests/primitives/test_ciphers.py new file mode 100644 index 00000000..5ee9f223 --- /dev/null +++ b/tests/primitives/test_ciphers.py @@ -0,0 +1,35 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function + +import binascii + +import pytest + +from cryptography.primitives.block.ciphers import AES + + +class TestAES(object): + @pytest.mark.parametrize(("key", "keysize"), [ + (b"0" * 32, 128), + (b"0" * 48, 192), + (b"0" * 64, 256), + ]) + def test_key_size(self, key, keysize): + cipher = AES(binascii.unhexlify(key)) + assert cipher.key_size == keysize + + def test_invalid_key_size(self): + with pytest.raises(ValueError): + AES(binascii.unhexlify(b"0" * 12)) diff --git a/tests/primitives/test_nist.py b/tests/primitives/test_nist.py new file mode 100644 index 00000000..db5e0344 --- /dev/null +++ b/tests/primitives/test_nist.py @@ -0,0 +1,199 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test using the NIST Test Vectors +""" + +from __future__ import absolute_import, division, print_function + +import binascii +import os + +import pytest + +from cryptography.primitives.block import BlockCipher, ciphers, modes + +from ..utils import load_nist_vectors_from_file + + +def parameterize_kat_encrypt(fname): + return pytest.mark.parametrize(("key", "iv", "plaintext", "ciphertext"), + load_nist_vectors_from_file( + os.path.join("AES/KAT/", fname), + "ENCRYPT", + ["key", "iv", "plaintext", "ciphertext"], + ), + ) + + +def paramterize_mmt_encrypt(fname): + return pytest.mark.parametrize(("key", "iv", "plaintext", "ciphertext"), + load_nist_vectors_from_file( + os.path.join("AES/MMT/", fname), + "ENCRYPT", + ["key", "iv", "plaintext", "ciphertext"], + ) + ) + + +class TestAES_CBC(object): + @parameterize_kat_encrypt("CBCGFSbox128.rsp") + def test_KAT_GFSbox_128_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @parameterize_kat_encrypt("CBCGFSbox192.rsp") + def test_KAT_GFSbox_192_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @parameterize_kat_encrypt("CBCGFSbox256.rsp") + def test_KAT_GFSbox_256_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @parameterize_kat_encrypt("CBCKeySbox128.rsp") + def test_KAT_KeySbox_128_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @parameterize_kat_encrypt("CBCKeySbox192.rsp") + def test_KAT_KeySbox_192_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @parameterize_kat_encrypt("CBCKeySbox256.rsp") + def test_KAT_KeySbox_256_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @parameterize_kat_encrypt("CBCVarKey128.rsp") + def test_KAT_VarKey_128_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @parameterize_kat_encrypt("CBCVarKey192.rsp") + def test_KAT_VarKey_192_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @parameterize_kat_encrypt("CBCVarKey256.rsp") + def test_KAT_VarKey_256_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @parameterize_kat_encrypt("CBCVarTxt128.rsp") + def test_KAT_VarTxt_128_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @parameterize_kat_encrypt("CBCVarTxt192.rsp") + def test_KAT_VarTxt_192_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @parameterize_kat_encrypt("CBCVarTxt256.rsp") + def test_KAT_VarTxt_256_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @paramterize_mmt_encrypt("CBCMMT128.rsp") + def test_MMT_128_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @paramterize_mmt_encrypt("CBCMMT192.rsp") + def test_MMT_192_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext + + @paramterize_mmt_encrypt("CBCMMT256.rsp") + def test_MMT_256_encrypt(self, key, iv, plaintext, ciphertext): + cipher = BlockCipher( + ciphers.AES(binascii.unhexlify(key)), + modes.CBC(binascii.unhexlify(iv)), + ) + actual_ciphertext = cipher.encrypt(binascii.unhexlify(plaintext)) + actual_ciphertext += cipher.finalize() + assert binascii.hexlify(actual_ciphertext) == ciphertext @@ -2,7 +2,9 @@ envlist = py26,py27,pypy,py32,py33,docs,pep8 [testenv] -deps = pytest-cov +deps = + pytest-cov + pretend commands = py.test --cov=cryptography/ --cov=tests/ [testenv:docs] |