aboutsummaryrefslogtreecommitdiffstats
path: root/cryptography/hazmat/backends/commoncrypto/backend.py
diff options
context:
space:
mode:
authorPaul Kehrer <paul.l.kehrer@gmail.com>2014-01-19 21:22:08 -0600
committerPaul Kehrer <paul.l.kehrer@gmail.com>2014-01-19 22:01:35 -0600
commit966821aa1ab232e52c71d1ad7a7745bc9c6a08fb (patch)
tree6425507db73134e65be07b6f216cd5773578f5c9 /cryptography/hazmat/backends/commoncrypto/backend.py
parent5f60acba1ed893cc746aab3b8a653abfcef92b41 (diff)
downloadcryptography-966821aa1ab232e52c71d1ad7a7745bc9c6a08fb.tar.gz
cryptography-966821aa1ab232e52c71d1ad7a7745bc9c6a08fb.tar.bz2
cryptography-966821aa1ab232e52c71d1ad7a7745bc9c6a08fb.zip
add cipher support to the commoncrypto backend
Diffstat (limited to 'cryptography/hazmat/backends/commoncrypto/backend.py')
-rw-r--r--cryptography/hazmat/backends/commoncrypto/backend.py164
1 files changed, 163 insertions, 1 deletions
diff --git a/cryptography/hazmat/backends/commoncrypto/backend.py b/cryptography/hazmat/backends/commoncrypto/backend.py
index 58e57efb..bdaff301 100644
--- a/cryptography/hazmat/backends/commoncrypto/backend.py
+++ b/cryptography/hazmat/backends/commoncrypto/backend.py
@@ -18,10 +18,16 @@ from collections import namedtuple
from cryptography import utils
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.interfaces import (
- HashBackend, HMACBackend,
+ HashBackend, HMACBackend, CipherBackend
)
from cryptography.hazmat.bindings.commoncrypto.binding import Binding
from cryptography.hazmat.primitives import interfaces
+from cryptography.hazmat.primitives.ciphers.algorithms import (
+ AES, Blowfish, TripleDES, ARC4
+)
+from cryptography.hazmat.primitives.ciphers.modes import (
+ CBC, CTR, ECB, OFB, CFB
+)
HashMethods = namedtuple(
@@ -29,6 +35,7 @@ HashMethods = namedtuple(
)
+@utils.register_interface(CipherBackend)
@utils.register_interface(HashBackend)
@utils.register_interface(HMACBackend)
class Backend(object):
@@ -42,6 +49,8 @@ class Backend(object):
self._ffi = self._binding.ffi
self._lib = self._binding.lib
+ self._cipher_registry = {}
+ self._register_default_ciphers()
self._hash_mapping = {
"md5": HashMethods(
"CC_MD5_CTX *", self._lib.CC_MD5_Init,
@@ -98,6 +107,159 @@ class Backend(object):
def create_hmac_ctx(self, key, algorithm):
return _HMACContext(self, key, algorithm)
+ def cipher_supported(self, cipher, mode):
+ try:
+ self._cipher_registry[type(cipher), type(mode)]
+ except KeyError:
+ return False
+ return True
+
+ def create_symmetric_encryption_ctx(self, cipher, mode):
+ return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
+
+ def create_symmetric_decryption_ctx(self, cipher, mode):
+ return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
+
+ def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
+ if (cipher_cls, mode_cls) in self._cipher_registry:
+ raise ValueError("Duplicate registration for: {0} {1}".format(
+ cipher_cls, mode_cls)
+ )
+ self._cipher_registry[cipher_cls, mode_cls] = adapter
+
+ def _register_default_ciphers(self):
+ for mode_cls in [CBC, ECB, CFB, OFB, CTR]:
+ self.register_cipher_adapter(
+ AES,
+ mode_cls,
+ _GetCipherModeEnum()
+ )
+ for mode_cls in [CBC, CFB, OFB]:
+ self.register_cipher_adapter(
+ TripleDES,
+ mode_cls,
+ _GetCipherModeEnum()
+ )
+ for mode_cls in [CBC, CFB, OFB, ECB]:
+ self.register_cipher_adapter(
+ Blowfish,
+ mode_cls,
+ _GetCipherModeEnum()
+ )
+ self.register_cipher_adapter(
+ ARC4,
+ type(None),
+ _GetCipherModeEnum()
+ )
+
+
+class _GetCipherModeEnum(object):
+ def __call__(self, backend, cipher, mode):
+ cipher_enum = {
+ AES: backend._lib.kCCAlgorithmAES128,
+ TripleDES: backend._lib.kCCAlgorithm3DES,
+ Blowfish: backend._lib.kCCAlgorithmBlowfish,
+ ARC4: backend._lib.kCCAlgorithmRC4,
+ }[type(cipher)]
+
+ mode_enum = {
+ ECB: backend._lib.kCCModeECB,
+ CBC: backend._lib.kCCModeCBC,
+ CTR: backend._lib.kCCModeCTR,
+ CFB: backend._lib.kCCModeCFB,
+ OFB: backend._lib.kCCModeOFB,
+ type(None): backend._lib.kCCModeRC4,
+ }[type(mode)]
+
+ return (cipher_enum, mode_enum)
+
+
+@utils.register_interface(interfaces.CipherContext)
+class _CipherContext(object):
+ _ENCRYPT = 0 # kCCEncrypt
+ _DECRYPT = 1 # kCCDecrypt
+
+ def __init__(self, backend, cipher, mode, operation):
+ self._backend = backend
+ self._cipher = cipher
+ self._mode = mode
+ self._operation = operation
+ # bytes_processed is needed to work around rdar://15589470, a bug where
+ # kCCAlignmentError fails to raise when supplying non-block-aligned
+ # data.
+ self._bytes_processed = 0
+ if (isinstance(cipher, interfaces.BlockCipherAlgorithm) and not
+ isinstance(mode, (OFB, CFB, CTR))):
+ self._byte_block_size = cipher.block_size // 8
+ else:
+ self._byte_block_size = 1
+
+ registry = self._backend._cipher_registry
+ try:
+ adapter = registry[type(cipher), type(mode)]
+ except KeyError:
+ raise UnsupportedAlgorithm(
+ "cipher {0} in {1} mode is not supported "
+ "by this backend".format(
+ cipher.name, mode.name if mode else mode)
+ )
+
+ cipher_enum, mode_enum = adapter(self._backend, cipher, mode)
+ ctx = self._backend._ffi.new("CCCryptorRef *")
+ ctx[0] = self._backend._ffi.gc(
+ ctx[0], self._backend._lib.CCCryptorRelease
+ )
+
+ if isinstance(mode, interfaces.ModeWithInitializationVector):
+ iv_nonce = mode.initialization_vector
+ elif isinstance(mode, interfaces.ModeWithNonce):
+ iv_nonce = mode.nonce
+ else:
+ iv_nonce = self._backend._ffi.NULL
+
+ if isinstance(mode, CTR):
+ mode_option = self._backend._lib.kCCModeOptionCTR_BE
+ else:
+ mode_option = 0
+
+ res = self._backend._lib.CCCryptorCreateWithMode(
+ operation,
+ mode_enum, cipher_enum,
+ self._backend._lib.ccNoPadding, iv_nonce,
+ cipher.key, len(cipher.key),
+ self._backend._ffi.NULL, 0, 0, mode_option, ctx)
+ assert res == self._backend._lib.kCCSuccess
+
+ self._ctx = ctx
+
+ def update(self, data):
+ # Count bytes processed to handle block alignment.
+ self._bytes_processed += len(data)
+ buf = self._backend._ffi.new(
+ "unsigned char[]", len(data) + self._byte_block_size - 1)
+ outlen = self._backend._ffi.new("size_t *")
+ res = self._backend._lib.CCCryptorUpdate(
+ self._ctx[0], data, len(data), buf,
+ len(data) + self._byte_block_size - 1, outlen)
+ assert res == self._backend._lib.kCCSuccess
+ return self._backend._ffi.buffer(buf)[:outlen[0]]
+
+ def finalize(self):
+ # Raise error if block alignment is wrong.
+ if self._bytes_processed % self._byte_block_size:
+ raise ValueError(
+ "The length of the provided data is not a multiple of "
+ "the block length"
+ )
+ buf = self._backend._ffi.new("unsigned char[]", self._byte_block_size)
+ outlen = self._backend._ffi.new("size_t *")
+ res = self._backend._lib.CCCryptorFinal(
+ self._ctx[0], buf, len(buf), outlen)
+ assert res == self._backend._lib.kCCSuccess
+ res = self._backend._lib.CCCryptorRelease(self._ctx[0])
+ assert res == self._backend._lib.kCCSuccess
+ return self._backend._ffi.buffer(buf)[:outlen[0]]
+
@utils.register_interface(interfaces.HashContext)
class _HashContext(object):