diff options
Diffstat (limited to 'src/cryptography/hazmat/primitives/padding.py')
-rw-r--r-- | src/cryptography/hazmat/primitives/padding.py | 164 |
1 files changed, 164 insertions, 0 deletions
diff --git a/src/cryptography/hazmat/primitives/padding.py b/src/cryptography/hazmat/primitives/padding.py new file mode 100644 index 00000000..7aeeff7c --- /dev/null +++ b/src/cryptography/hazmat/primitives/padding.py @@ -0,0 +1,164 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function + +import six + +from cryptography import utils +from cryptography.exceptions import AlreadyFinalized +from cryptography.hazmat.bindings.utils import LazyLibrary, build_ffi +from cryptography.hazmat.primitives import interfaces + + +TYPES = """ +uint8_t Cryptography_check_pkcs7_padding(const uint8_t *, uint8_t); +""" + +FUNCTIONS = """ +/* Returns the value of the input with the most-significant-bit copied to all + of the bits. */ +static uint8_t Cryptography_DUPLICATE_MSB_TO_ALL(uint8_t a) { + return (1 - (a >> (sizeof(uint8_t) * 8 - 1))) - 1; +} + +/* This returns 0xFF if a < b else 0x00, but does so in a constant time + fashion */ +static uint8_t Cryptography_constant_time_lt(uint8_t a, uint8_t b) { + a -= b; + return Cryptography_DUPLICATE_MSB_TO_ALL(a); +} + +uint8_t Cryptography_check_pkcs7_padding(const uint8_t *data, + uint8_t block_len) { + uint8_t i; + uint8_t pad_size = data[block_len - 1]; + uint8_t mismatch = 0; + for (i = 0; i < block_len; i++) { + unsigned int mask = Cryptography_constant_time_lt(i, pad_size); + uint8_t b = data[block_len - 1 - i]; + mismatch |= (mask & (pad_size ^ b)); + } + + /* Check to make sure the pad_size was within the valid range. */ + mismatch |= ~Cryptography_constant_time_lt(0, pad_size); + mismatch |= Cryptography_constant_time_lt(block_len, pad_size); + + /* Make sure any bits set are copied to the lowest bit */ + mismatch |= mismatch >> 4; + mismatch |= mismatch >> 2; + mismatch |= mismatch >> 1; + /* Now check the low bit to see if it's set */ + return (mismatch & 1) == 0; +} +""" + + +_ffi = build_ffi(cdef_source=TYPES, verify_source=FUNCTIONS) +_lib = LazyLibrary(_ffi) + + +class PKCS7(object): + def __init__(self, block_size): + if not (0 <= block_size < 256): + raise ValueError("block_size must be in range(0, 256).") + + if block_size % 8 != 0: + raise ValueError("block_size must be a multiple of 8.") + + self.block_size = block_size + + def padder(self): + return _PKCS7PaddingContext(self.block_size) + + def unpadder(self): + return _PKCS7UnpaddingContext(self.block_size) + + +@utils.register_interface(interfaces.PaddingContext) +class _PKCS7PaddingContext(object): + def __init__(self, block_size): + self.block_size = block_size + # TODO: more copies than necessary, we should use zero-buffer (#193) + self._buffer = b"" + + def update(self, data): + if self._buffer is None: + raise AlreadyFinalized("Context was already finalized.") + + if not isinstance(data, bytes): + raise TypeError("data must be bytes.") + + self._buffer += data + + finished_blocks = len(self._buffer) // (self.block_size // 8) + + result = self._buffer[:finished_blocks * (self.block_size // 8)] + self._buffer = self._buffer[finished_blocks * (self.block_size // 8):] + + return result + + def finalize(self): + if self._buffer is None: + raise AlreadyFinalized("Context was already finalized.") + + pad_size = self.block_size // 8 - len(self._buffer) + result = self._buffer + six.int2byte(pad_size) * pad_size + self._buffer = None + return result + + +@utils.register_interface(interfaces.PaddingContext) +class _PKCS7UnpaddingContext(object): + def __init__(self, block_size): + self.block_size = block_size + # TODO: more copies than necessary, we should use zero-buffer (#193) + self._buffer = b"" + + def update(self, data): + if self._buffer is None: + raise AlreadyFinalized("Context was already finalized.") + + if not isinstance(data, bytes): + raise TypeError("data must be bytes.") + + self._buffer += data + + finished_blocks = max( + len(self._buffer) // (self.block_size // 8) - 1, + 0 + ) + + result = self._buffer[:finished_blocks * (self.block_size // 8)] + self._buffer = self._buffer[finished_blocks * (self.block_size // 8):] + + return result + + def finalize(self): + if self._buffer is None: + raise AlreadyFinalized("Context was already finalized.") + + if len(self._buffer) != self.block_size // 8: + raise ValueError("Invalid padding bytes.") + + valid = _lib.Cryptography_check_pkcs7_padding( + self._buffer, self.block_size // 8 + ) + + if not valid: + raise ValueError("Invalid padding bytes.") + + pad_size = six.indexbytes(self._buffer, -1) + res = self._buffer[:-pad_size] + self._buffer = None + return res |