# Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import absolute_import, division, print_function import binascii import os import itertools import pytest from cryptography.hazmat.primitives import hashes, hmac from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.ciphers import Cipher from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.exceptions import ( AlreadyFinalized, NotYetFinalized, AlreadyUpdated, InvalidTag, ) from ...utils import load_vectors_from_file def _load_all_params(path, file_names, param_loader): all_params = [] for file_name in file_names: all_params.extend( load_vectors_from_file(os.path.join(path, file_name), param_loader) ) return all_params def generate_encrypt_test(param_loader, path, file_names, cipher_factory, mode_factory): all_params = _load_all_params(path, file_names, param_loader) @pytest.mark.parametrize("params", all_params) def test_encryption(self, backend, params): encrypt_test(backend, cipher_factory, mode_factory, params) return test_encryption def encrypt_test(backend, cipher_factory, mode_factory, params): plaintext = params["plaintext"] ciphertext = params["ciphertext"] cipher = Cipher( cipher_factory(**params), mode_factory(**params), backend=backend ) encryptor = cipher.encryptor() actual_ciphertext = encryptor.update(binascii.unhexlify(plaintext)) actual_ciphertext += encryptor.finalize() assert actual_ciphertext == binascii.unhexlify(ciphertext) decryptor = cipher.decryptor() actual_plaintext = decryptor.update(binascii.unhexlify(ciphertext)) actual_plaintext += decryptor.finalize() assert actual_plaintext == binascii.unhexlify(plaintext) def generate_aead_test(param_loader, path, file_names, cipher_factory, mode_factory): all_params = _load_all_params(path, file_names, param_loader) @pytest.mark.parametrize("params", all_params) def test_aead(self, backend, params): aead_test(backend, cipher_factory, mode_factory, params) return test_aead def aead_test(backend, cipher_factory, mode_factory, params): if params.get("pt") is not None: plaintext = params["pt"] ciphertext = params["ct"] aad = params["aad"] if params.get("fail") is True: cipher = Cipher( cipher_factory(binascii.unhexlify(params["key"])), mode_factory(binascii.unhexlify(params["iv"]), binascii.unhexlify(params["tag"])), backend ) decryptor = cipher.decryptor() decryptor.authenticate_additional_data(binascii.unhexlify(aad)) actual_plaintext = decryptor.update(binascii.unhexlify(ciphertext)) with pytest.raises(InvalidTag): decryptor.finalize() else: cipher = Cipher( cipher_factory(binascii.unhexlify(params["key"])), mode_factory(binascii.unhexlify(params["iv"]), None), backend ) encryptor = cipher.encryptor() encryptor.authenticate_additional_data(binascii.unhexlify(aad)) actual_ciphertext = encryptor.update(binascii.unhexlify(plaintext)) actual_ciphertext += encryptor.finalize() tag_len = len(params["tag"]) assert binascii.hexlify(encryptor.tag)[:tag_len] == params["tag"] cipher = Cipher( cipher_factory(binascii.unhexlify(params["key"])), mode_factory(binascii.unhexlify(params["iv"]), binascii.unhexlify(params["tag"])), backend ) decryptor = cipher.decryptor() decryptor.authenticate_additional_data(binascii.unhexlify(aad)) actual_plaintext = decryptor.update(binascii.unhexlify(ciphertext)) actual_plaintext += decryptor.finalize() assert actual_plaintext == binascii.unhexlify(plaintext) def generate_stream_encryption_test(param_loader, path, file_names, cipher_factory): all_params = _load_all_params(path, file_names, param_loader) @pytest.mark.parametrize("params", all_params) def test_stream_encryption(self, backend, params): stream_encryption_test(backend, cipher_factory, params) return test_stream_encryption def stream_encryption_test(backend, cipher_factory, params): plaintext = params["plaintext"] ciphertext = params["ciphertext"] offset = params["offset"] cipher = Cipher(cipher_factory(**params), None, backend=backend) encryptor = cipher.encryptor() # throw away offset bytes encryptor.update(b"\x00" * int(offset)) actual_ciphertext = encryptor.update(binascii.unhexlify(plaintext)) actual_ciphertext += encryptor.finalize() assert actual_ciphertext == binascii.unhexlify(ciphertext) decryptor = cipher.decryptor() decryptor.update(b"\x00" * int(offset)) actual_plaintext = decryptor.update(binascii.unhexlify(ciphertext)) actual_plaintext += decryptor.finalize() assert actual_plaintext == binascii.unhexlify(plaintext) def generate_hash_test(param_loader, path, file_names, hash_cls): all_params = _load_all_params(path, file_names, param_loader) @pytest.mark.parametrize("params", all_params) def test_hash(self, backend, params): hash_test(backend, hash_cls, params) return test_hash def hash_test(backend, algorithm, params): msg, md = params m = hashes.Hash(algorithm, backend=backend) m.update(binascii.unhexlify(msg)) expected_md = md.replace(" ", "").lower().encode("ascii") assert m.finalize() == binascii.unhexlify(expected_md) def generate_base_hash_test(algorithm, digest_size, block_size): def test_base_hash(self, backend): base_hash_test(backend, algorithm, digest_size, block_size) return test_base_hash def base_hash_test(backend, algorithm, digest_size, block_size): m = hashes.Hash(algorithm, backend=backend) assert m.algorithm.digest_size == digest_size assert m.algorithm.block_size == block_size m_copy = m.copy() assert m != m_copy assert m._ctx != m_copy._ctx m.update(b"abc") copy = m.copy() copy.update(b"123") m.update(b"123") assert copy.finalize() == m.finalize() def generate_long_string_hash_test(hash_factory, md): def test_long_string_hash(self, backend): long_string_hash_test(backend, hash_factory, md) return test_long_string_hash def long_string_hash_test(backend, algorithm, md): m = hashes.Hash(algorithm, backend=backend) m.update(b"a" * 1000000) assert m.finalize() == binascii.unhexlify(md.lower().encode("ascii")) def generate_base_hmac_test(hash_cls): def test_base_hmac(self, backend): base_hmac_test(backend, hash_cls) return test_base_hmac def base_hmac_test(backend, algorithm): key = b"ab" h = hmac.HMAC(binascii.unhexlify(key), algorithm, backend=backend) h_copy = h.copy() assert h != h_copy assert h._ctx != h_copy._ctx def generate_hmac_test(param_loader, path, file_names, algorithm): all_params = _load_all_params(path, file_names, param_loader) @pytest.mark.parametrize("params", all_params) def test_hmac(self, backend, params): hmac_test(backend, algorithm, params) return test_hmac def hmac_test(backend, algorithm, params): msg, md, key = params h = hmac.HMAC(binascii.unhexlify(key), algorithm, backend=backend) h.update(binascii.unhexlify(msg)) assert h.finalize() == binascii.unhexlify(md.encode("ascii")) def generate_pbkdf2_test(param_loader, path, file_names, algorithm): all_params = _load_all_params(path, file_names, param_loader) @pytest.mark.parametrize("params", all_params) def test_pbkdf2(self, backend, params): pbkdf2_test(backend, algorithm, params) return test_pbkdf2 def pbkdf2_test(backend, algorithm, params): # Password and salt can contain \0, which should be loaded as a null char. # The NIST loader loads them as literal strings so we replace with the # proper value. kdf = PBKDF2HMAC( algorithm, int(params["length"]), params["salt"], int(params["iterations"]), backend ) derived_key = kdf.derive(params["password"]) assert binascii.hexlify(derived_key) == params["derived_key"] def generate_aead_exception_test(cipher_factory, mode_factory): def test_aead_exception(self, backend): aead_exception_test(backend, cipher_factory, mode_factory) return test_aead_exception def aead_exception_test(backend, cipher_factory, mode_factory): cipher = Cipher( cipher_factory(binascii.unhexlify(b"0" * 32)), mode_factory(binascii.unhexlify(b"0" * 24)), backend ) encryptor = cipher.encryptor() encryptor.update(b"a" * 16) with pytest.raises(NotYetFinalized): encryptor.tag with pytest.raises(AlreadyUpdated): encryptor.authenticate_additional_data(b"b" * 16) encryptor.finalize() with pytest.raises(AlreadyFinalized): encryptor.authenticate_additional_data(b"b" * 16) with pytest.raises(AlreadyFinalized): encryptor.update(b"b" * 16) with pytest.raises(AlreadyFinalized): encryptor.finalize() cipher = Cipher( cipher_factory(binascii.unhexlify(b"0" * 32)), mode_factory(binascii.unhexlify(b"0" * 24), b"0" * 16), backend ) decryptor = cipher.decryptor() decryptor.update(b"a" * 16) with pytest.raises(AttributeError): decryptor.tag def generate_aead_tag_exception_test(cipher_factory, mode_factory): def test_aead_tag_exception(self, backend): aead_tag_exception_test(backend, cipher_factory, mode_factory) return test_aead_tag_exception def aead_tag_exception_test(backend, cipher_factory, mode_factory): cipher = Cipher( cipher_factory(binascii.unhexlify(b"0" * 32)), mode_factory(binascii.unhexlify(b"0" * 24)), backend ) with pytest.raises(ValueError): cipher.decryptor() with pytest.raises(ValueError): mode_factory(binascii.unhexlify(b"0" * 24), b"000") cipher = Cipher( cipher_factory(binascii.unhexlify(b"0" * 32)), mode_factory(binascii.unhexlify(b"0" * 24), b"0" * 16), backend ) with pytest.raises(ValueError): cipher.encryptor() def hkdf_derive_test(backend, algorithm, params): hkdf = HKDF( algorithm, int(params["l"]), salt=binascii.unhexlify(params["salt"]) or None, info=binascii.unhexlify(params["info"]) or None, backend=backend ) okm = hkdf.derive(binascii.unhexlify(params["ikm"])) assert okm == binascii.unhexlify(params["okm"]) def hkdf_extract_test(backend, algorithm, params): hkdf = HKDF( algorithm, int(params["l"]), salt=binascii.unhexlify(params["salt"]) or None, info=binascii.unhexlify(params["info"]) or None, backend=backend ) prk = hkdf._extract(binascii.unhexlify(params["ikm"])) assert prk == binascii.unhexlify(params["prk"]) def hkdf_expand_test(backend, algorithm, params): hkdf = HKDF( algorithm, int(params["l"]), salt=binascii.unhexlify(params["salt"]) or None, info=binascii.unhexlify(params["info"]) or None, backend=backend ) okm = hkdf._expand(binascii.unhexlify(params["prk"])) assert okm == binascii.unhexlify(params["okm"]) def generate_hkdf_test(param_loader, path, file_names, algorithm): all_params = _load_all_params(path, file_names, param_loader) all_tests = [hkdf_extract_test, hkdf_expand_test, hkdf_derive_test] @pytest.mark.parametrize( ("params", "hkdf_test"), itertools.product(all_params, all_tests) ) def test_hkdf(self, backend, params, hkdf_test): hkdf_test(backend, algorithm, params) return test_hkdf def generate_rsa_pss_test(param_loader, path, file_names, hash_name): all_params = _load_all_params(path, file_names, param_loader) all_params = [i for i in all_params if i["algorithm"] == hash_name] @pytest.mark.parametrize("params", all_params) def test_rsa_pss(self, backend, params): rsa_pss_test(backend, params) return test_rsa_pss def rsa_pss_test(backend, params): public_key = rsa.RSAPublicKey( public_exponent=params["public_exponent"], modulus=params["modulus"] ) hash_cls = getattr(hashes, params["algorithm"].decode("utf8")) verifier = public_key.verifier( binascii.unhexlify(params["s"]), padding.PSS( mgf=padding.MGF1( algorithm=hash_cls(), salt_length=params["salt_length"] ) ), hash_cls(), backend ) verifier.update(binascii.unhexlify(params["msg"])) verifier.verify()