diff options
-rw-r--r-- | cryptography/hazmat/backends/commoncrypto/backend.py | 173 | ||||
-rw-r--r-- | cryptography/hazmat/backends/commoncrypto/ciphers.py | 191 |
2 files changed, 194 insertions, 170 deletions
diff --git a/cryptography/hazmat/backends/commoncrypto/backend.py b/cryptography/hazmat/backends/commoncrypto/backend.py index 6147a996..f78370c4 100644 --- a/cryptography/hazmat/backends/commoncrypto/backend.py +++ b/cryptography/hazmat/backends/commoncrypto/backend.py @@ -16,8 +16,9 @@ from __future__ import absolute_import, division, print_function from collections import namedtuple from cryptography import utils -from cryptography.exceptions import ( - InternalError, InvalidTag, UnsupportedAlgorithm, _Reasons +from cryptography.exceptions import InternalError +from cryptography.hazmat.backends.commoncrypto.ciphers import ( + _CipherContext, _GCMCipherContext ) from cryptography.hazmat.backends.commoncrypto.hashes import _HashContext from cryptography.hazmat.backends.commoncrypto.hmac import _HMACContext @@ -25,7 +26,6 @@ from cryptography.hazmat.backends.interfaces import ( CipherBackend, HMACBackend, HashBackend, PBKDF2HMACBackend ) from cryptography.hazmat.bindings.commoncrypto.binding import Binding -from cryptography.hazmat.primitives import constant_time, interfaces from cryptography.hazmat.primitives.ciphers.algorithms import ( AES, ARC4, Blowfish, CAST5, TripleDES ) @@ -250,171 +250,4 @@ class Backend(object): ctx[0] = self._ffi.NULL -@utils.register_interface(interfaces.CipherContext) -class _CipherContext(object): - def __init__(self, backend, cipher, mode, operation): - self._backend = backend - self._cipher = cipher - self._mode = mode - self._operation = operation - # There is a bug in CommonCrypto where block ciphers do not raise - # kCCAlignmentError when finalizing if you supply non-block aligned - # data. To work around this we need to keep track of the block - # alignment ourselves, but only for alg+mode combos that require - # block alignment. OFB, CFB, and CTR make a block cipher algorithm - # into a stream cipher so we don't need to track them (and thus their - # block size is effectively 1 byte just like OpenSSL/CommonCrypto - # treat RC4 and other stream cipher block sizes). - # This bug has been filed as rdar://15589470 - self._bytes_processed = 0 - if (isinstance(cipher, interfaces.BlockCipherAlgorithm) and not - isinstance(mode, (OFB, CFB, CFB8, CTR))): - self._byte_block_size = cipher.block_size // 8 - else: - self._byte_block_size = 1 - - registry = self._backend._cipher_registry - try: - cipher_enum, mode_enum = registry[type(cipher), type(mode)] - except KeyError: - raise UnsupportedAlgorithm( - "cipher {0} in {1} mode is not supported " - "by this backend.".format( - cipher.name, mode.name if mode else mode), - _Reasons.UNSUPPORTED_CIPHER - ) - - ctx = self._backend._ffi.new("CCCryptorRef *") - ctx = self._backend._ffi.gc(ctx, self._backend._release_cipher_ctx) - - if isinstance(mode, interfaces.ModeWithInitializationVector): - iv_nonce = mode.initialization_vector - elif isinstance(mode, interfaces.ModeWithNonce): - iv_nonce = mode.nonce - else: - iv_nonce = self._backend._ffi.NULL - - if isinstance(mode, CTR): - mode_option = self._backend._lib.kCCModeOptionCTR_BE - else: - mode_option = 0 - - res = self._backend._lib.CCCryptorCreateWithMode( - operation, - mode_enum, cipher_enum, - self._backend._lib.ccNoPadding, iv_nonce, - cipher.key, len(cipher.key), - self._backend._ffi.NULL, 0, 0, mode_option, ctx) - self._backend._check_response(res) - - self._ctx = ctx - - def update(self, data): - # Count bytes processed to handle block alignment. - self._bytes_processed += len(data) - buf = self._backend._ffi.new( - "unsigned char[]", len(data) + self._byte_block_size - 1) - outlen = self._backend._ffi.new("size_t *") - res = self._backend._lib.CCCryptorUpdate( - self._ctx[0], data, len(data), buf, - len(data) + self._byte_block_size - 1, outlen) - self._backend._check_response(res) - return self._backend._ffi.buffer(buf)[:outlen[0]] - - def finalize(self): - # Raise error if block alignment is wrong. - if self._bytes_processed % self._byte_block_size: - raise ValueError( - "The length of the provided data is not a multiple of " - "the block length." - ) - buf = self._backend._ffi.new("unsigned char[]", self._byte_block_size) - outlen = self._backend._ffi.new("size_t *") - res = self._backend._lib.CCCryptorFinal( - self._ctx[0], buf, len(buf), outlen) - self._backend._check_response(res) - self._backend._release_cipher_ctx(self._ctx) - return self._backend._ffi.buffer(buf)[:outlen[0]] - - -@utils.register_interface(interfaces.AEADCipherContext) -@utils.register_interface(interfaces.AEADEncryptionContext) -class _GCMCipherContext(object): - def __init__(self, backend, cipher, mode, operation): - self._backend = backend - self._cipher = cipher - self._mode = mode - self._operation = operation - self._tag = None - - registry = self._backend._cipher_registry - try: - cipher_enum, mode_enum = registry[type(cipher), type(mode)] - except KeyError: - raise UnsupportedAlgorithm( - "cipher {0} in {1} mode is not supported " - "by this backend.".format( - cipher.name, mode.name if mode else mode), - _Reasons.UNSUPPORTED_CIPHER - ) - - ctx = self._backend._ffi.new("CCCryptorRef *") - ctx = self._backend._ffi.gc(ctx, self._backend._release_cipher_ctx) - - self._ctx = ctx - - res = self._backend._lib.CCCryptorCreateWithMode( - operation, - mode_enum, cipher_enum, - self._backend._lib.ccNoPadding, - self._backend._ffi.NULL, - cipher.key, len(cipher.key), - self._backend._ffi.NULL, 0, 0, 0, self._ctx) - self._backend._check_response(res) - - res = self._backend._lib.CCCryptorGCMAddIV( - self._ctx[0], - mode.initialization_vector, - len(mode.initialization_vector) - ) - self._backend._check_response(res) - - def update(self, data): - buf = self._backend._ffi.new("unsigned char[]", len(data)) - args = (self._ctx[0], data, len(data), buf) - if self._operation == self._backend._lib.kCCEncrypt: - res = self._backend._lib.CCCryptorGCMEncrypt(*args) - else: - res = self._backend._lib.CCCryptorGCMDecrypt(*args) - - self._backend._check_response(res) - return self._backend._ffi.buffer(buf)[:] - - def finalize(self): - tag_size = self._cipher.block_size // 8 - tag_buf = self._backend._ffi.new("unsigned char[]", tag_size) - tag_len = self._backend._ffi.new("size_t *", tag_size) - res = self._backend._lib.CCCryptorGCMFinal( - self._ctx[0], tag_buf, tag_len) - self._backend._check_response(res) - self._backend._release_cipher_ctx(self._ctx) - self._tag = self._backend._ffi.buffer(tag_buf)[:] - if (self._operation == self._backend._lib.kCCDecrypt and - not constant_time.bytes_eq( - self._tag[:len(self._mode.tag)], self._mode.tag - )): - raise InvalidTag - return b"" - - def authenticate_additional_data(self, data): - res = self._backend._lib.CCCryptorGCMAddAAD( - self._ctx[0], data, len(data) - ) - self._backend._check_response(res) - - @property - def tag(self): - return self._tag - - backend = Backend() diff --git a/cryptography/hazmat/backends/commoncrypto/ciphers.py b/cryptography/hazmat/backends/commoncrypto/ciphers.py new file mode 100644 index 00000000..2820fd0e --- /dev/null +++ b/cryptography/hazmat/backends/commoncrypto/ciphers.py @@ -0,0 +1,191 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function + +from cryptography import utils +from cryptography.exceptions import ( + InvalidTag, UnsupportedAlgorithm, _Reasons +) +from cryptography.hazmat.primitives import constant_time, interfaces +from cryptography.hazmat.primitives.ciphers.modes import ( + CFB, CFB8, CTR, OFB +) + + +@utils.register_interface(interfaces.CipherContext) +class _CipherContext(object): + def __init__(self, backend, cipher, mode, operation): + self._backend = backend + self._cipher = cipher + self._mode = mode + self._operation = operation + # There is a bug in CommonCrypto where block ciphers do not raise + # kCCAlignmentError when finalizing if you supply non-block aligned + # data. To work around this we need to keep track of the block + # alignment ourselves, but only for alg+mode combos that require + # block alignment. OFB, CFB, and CTR make a block cipher algorithm + # into a stream cipher so we don't need to track them (and thus their + # block size is effectively 1 byte just like OpenSSL/CommonCrypto + # treat RC4 and other stream cipher block sizes). + # This bug has been filed as rdar://15589470 + self._bytes_processed = 0 + if (isinstance(cipher, interfaces.BlockCipherAlgorithm) and not + isinstance(mode, (OFB, CFB, CFB8, CTR))): + self._byte_block_size = cipher.block_size // 8 + else: + self._byte_block_size = 1 + + registry = self._backend._cipher_registry + try: + cipher_enum, mode_enum = registry[type(cipher), type(mode)] + except KeyError: + raise UnsupportedAlgorithm( + "cipher {0} in {1} mode is not supported " + "by this backend.".format( + cipher.name, mode.name if mode else mode), + _Reasons.UNSUPPORTED_CIPHER + ) + + ctx = self._backend._ffi.new("CCCryptorRef *") + ctx = self._backend._ffi.gc(ctx, self._backend._release_cipher_ctx) + + if isinstance(mode, interfaces.ModeWithInitializationVector): + iv_nonce = mode.initialization_vector + elif isinstance(mode, interfaces.ModeWithNonce): + iv_nonce = mode.nonce + else: + iv_nonce = self._backend._ffi.NULL + + if isinstance(mode, CTR): + mode_option = self._backend._lib.kCCModeOptionCTR_BE + else: + mode_option = 0 + + res = self._backend._lib.CCCryptorCreateWithMode( + operation, + mode_enum, cipher_enum, + self._backend._lib.ccNoPadding, iv_nonce, + cipher.key, len(cipher.key), + self._backend._ffi.NULL, 0, 0, mode_option, ctx) + self._backend._check_response(res) + + self._ctx = ctx + + def update(self, data): + # Count bytes processed to handle block alignment. + self._bytes_processed += len(data) + buf = self._backend._ffi.new( + "unsigned char[]", len(data) + self._byte_block_size - 1) + outlen = self._backend._ffi.new("size_t *") + res = self._backend._lib.CCCryptorUpdate( + self._ctx[0], data, len(data), buf, + len(data) + self._byte_block_size - 1, outlen) + self._backend._check_response(res) + return self._backend._ffi.buffer(buf)[:outlen[0]] + + def finalize(self): + # Raise error if block alignment is wrong. + if self._bytes_processed % self._byte_block_size: + raise ValueError( + "The length of the provided data is not a multiple of " + "the block length." + ) + buf = self._backend._ffi.new("unsigned char[]", self._byte_block_size) + outlen = self._backend._ffi.new("size_t *") + res = self._backend._lib.CCCryptorFinal( + self._ctx[0], buf, len(buf), outlen) + self._backend._check_response(res) + self._backend._release_cipher_ctx(self._ctx) + return self._backend._ffi.buffer(buf)[:outlen[0]] + + +@utils.register_interface(interfaces.AEADCipherContext) +@utils.register_interface(interfaces.AEADEncryptionContext) +class _GCMCipherContext(object): + def __init__(self, backend, cipher, mode, operation): + self._backend = backend + self._cipher = cipher + self._mode = mode + self._operation = operation + self._tag = None + + registry = self._backend._cipher_registry + try: + cipher_enum, mode_enum = registry[type(cipher), type(mode)] + except KeyError: + raise UnsupportedAlgorithm( + "cipher {0} in {1} mode is not supported " + "by this backend.".format( + cipher.name, mode.name if mode else mode), + _Reasons.UNSUPPORTED_CIPHER + ) + + ctx = self._backend._ffi.new("CCCryptorRef *") + ctx = self._backend._ffi.gc(ctx, self._backend._release_cipher_ctx) + + self._ctx = ctx + + res = self._backend._lib.CCCryptorCreateWithMode( + operation, + mode_enum, cipher_enum, + self._backend._lib.ccNoPadding, + self._backend._ffi.NULL, + cipher.key, len(cipher.key), + self._backend._ffi.NULL, 0, 0, 0, self._ctx) + self._backend._check_response(res) + + res = self._backend._lib.CCCryptorGCMAddIV( + self._ctx[0], + mode.initialization_vector, + len(mode.initialization_vector) + ) + self._backend._check_response(res) + + def update(self, data): + buf = self._backend._ffi.new("unsigned char[]", len(data)) + args = (self._ctx[0], data, len(data), buf) + if self._operation == self._backend._lib.kCCEncrypt: + res = self._backend._lib.CCCryptorGCMEncrypt(*args) + else: + res = self._backend._lib.CCCryptorGCMDecrypt(*args) + + self._backend._check_response(res) + return self._backend._ffi.buffer(buf)[:] + + def finalize(self): + tag_size = self._cipher.block_size // 8 + tag_buf = self._backend._ffi.new("unsigned char[]", tag_size) + tag_len = self._backend._ffi.new("size_t *", tag_size) + res = self._backend._lib.CCCryptorGCMFinal( + self._ctx[0], tag_buf, tag_len + ) + self._backend._check_response(res) + self._backend._release_cipher_ctx(self._ctx) + self._tag = self._backend._ffi.buffer(tag_buf)[:] + if (self._operation == self._backend._lib.kCCDecrypt and + not constant_time.bytes_eq( + self._tag[:len(self._mode.tag)], self._mode.tag + )): + raise InvalidTag + return b"" + + def authenticate_additional_data(self, data): + res = self._backend._lib.CCCryptorGCMAddAAD( + self._ctx[0], data, len(data) + ) + self._backend._check_response(res) + + @property + def tag(self): + return self._tag |