From 0d0d70bd78f432397b91eee4d9743000686037a6 Mon Sep 17 00:00:00 2001 From: Jakub Stasiak Date: Sun, 14 Jun 2020 20:30:18 +0200 Subject: Add a way to pass current time to Fernet (#5256) * Add a way to pass current time to Fernet The motivation behind this is to be able to unit test code using Fernet easily without having to monkey patch global state. * Reformat to satisfy flake8 * Trigger a Fernet.encrypt() branch missing from coverage * Revert specifying explicit current time in MultiFernet.rotate() Message's timestamp is not verified anyway since ttl is None. * Change the Fernet's explicit current time API slightly This's been suggested in code review. * Fix a typo * Fix a typo * Restore full MultiFernet test coverage and fix a typo * Restore more coverage time.time() is not called by MultiFernet.rotate() anymore so the monkey patching and lambda need to go, because the patched function is not used and coverage calculation will rightfully notice it. * Remove an unused import * Document when the *_at_time Fernet methods were added --- docs/fernet.rst | 39 +++++++++++++++++++++++++++++++++++++++ src/cryptography/fernet.py | 24 +++++++++++++++++------- tests/test_fernet.py | 24 ++++++++++++++---------- 3 files changed, 70 insertions(+), 17 deletions(-) diff --git a/docs/fernet.rst b/docs/fernet.rst index c01d18ca..dd9d75bd 100644 --- a/docs/fernet.rst +++ b/docs/fernet.rst @@ -54,6 +54,28 @@ has support for implementing key rotation via :class:`MultiFernet`. generated in *plaintext*, the time a message was created will therefore be visible to a possible attacker. + .. method:: encrypt_at_time(data, current_time) + + .. versionadded:: 3.0 + + Encrypts data passed using explicitly passed current time. See + :meth:`encrypt` for the documentation of the ``data`` parameter, the + return type and the exceptions raised. + + The motivation behind this method is for the client code to be able to + test token expiration. Since this method can be used in an insecure + manner one should make sure the correct time (``int(time.time())``) + is passed as ``current_time`` outside testing. + + :param int current_time: The current time. + + .. note:: + + Similarly to :meth:`encrypt` the encrypted message contains the + timestamp in *plaintext*, in this case the timestamp is the value + of the ``current_time`` parameter. + + .. method:: decrypt(token, ttl=None) Decrypts a Fernet token. If successfully decrypted you will receive the @@ -81,6 +103,23 @@ has support for implementing key rotation via :class:`MultiFernet`. :raises TypeError: This exception is raised if ``token`` is not ``bytes``. + .. method:: decrypt_at_time(token, ttl, current_time) + + .. versionadded:: 3.0 + + Decrypts a token using explicitly passed current time. See + :meth:`decrypt` for the documentation of the ``token`` and ``ttl`` + parameters (``ttl`` is required here), the return type and the exceptions + raised. + + The motivation behind this method is for the client code to be able to + test token expiration. Since this method can be used in an insecure + manner one should make sure the correct time (``int(time.time())``) + is passed as ``current_time`` outside testing. + + :param int current_time: The current time. + + .. method:: extract_timestamp(token) .. versionadded:: 2.3 diff --git a/src/cryptography/fernet.py b/src/cryptography/fernet.py index b990defa..862d9466 100644 --- a/src/cryptography/fernet.py +++ b/src/cryptography/fernet.py @@ -47,7 +47,9 @@ class Fernet(object): return base64.urlsafe_b64encode(os.urandom(32)) def encrypt(self, data): - current_time = int(time.time()) + return self.encrypt_at_time(data, int(time.time())) + + def encrypt_at_time(self, data, current_time): iv = os.urandom(16) return self._encrypt_from_parts(data, current_time, iv) @@ -71,8 +73,11 @@ class Fernet(object): return base64.urlsafe_b64encode(basic_parts + hmac) def decrypt(self, token, ttl=None): + return self.decrypt_at_time(token, ttl, int(time.time())) + + def decrypt_at_time(self, token, ttl, current_time): timestamp, data = Fernet._get_unverified_token_data(token) - return self._decrypt_data(data, timestamp, ttl) + return self._decrypt_data(data, timestamp, ttl, current_time) def extract_timestamp(self, token): timestamp, data = Fernet._get_unverified_token_data(token) @@ -105,8 +110,7 @@ class Fernet(object): except InvalidSignature: raise InvalidToken - def _decrypt_data(self, data, timestamp, ttl): - current_time = int(time.time()) + def _decrypt_data(self, data, timestamp, ttl, current_time): if ttl is not None: if timestamp + ttl < current_time: raise InvalidToken @@ -146,13 +150,16 @@ class MultiFernet(object): self._fernets = fernets def encrypt(self, msg): - return self._fernets[0].encrypt(msg) + return self.encrypt_at_time(msg, int(time.time())) + + def encrypt_at_time(self, msg, current_time): + return self._fernets[0].encrypt_at_time(msg, current_time) def rotate(self, msg): timestamp, data = Fernet._get_unverified_token_data(msg) for f in self._fernets: try: - p = f._decrypt_data(data, timestamp, None) + p = f._decrypt_data(data, timestamp, None, None) break except InvalidToken: pass @@ -163,9 +170,12 @@ class MultiFernet(object): return self._fernets[0]._encrypt_from_parts(p, timestamp, iv) def decrypt(self, msg, ttl=None): + return self.decrypt_at_time(msg, ttl, int(time.time())) + + def decrypt_at_time(self, msg, ttl, current_time): for f in self._fernets: try: - return f.decrypt(msg, ttl) + return f.decrypt_at_time(msg, ttl, current_time) except InvalidToken: pass raise InvalidToken diff --git a/tests/test_fernet.py b/tests/test_fernet.py index 75ecc356..da2096fb 100644 --- a/tests/test_fernet.py +++ b/tests/test_fernet.py @@ -6,7 +6,6 @@ from __future__ import absolute_import, division, print_function import base64 import calendar -import datetime import json import os import time @@ -70,6 +69,10 @@ class TestFernet(object): monkeypatch): f = Fernet(secret.encode("ascii"), backend=backend) current_time = calendar.timegm(iso8601.parse_date(now).utctimetuple()) + payload = f.decrypt_at_time( + token.encode("ascii"), ttl=ttl_sec, current_time=current_time, + ) + assert payload == src.encode("ascii") monkeypatch.setattr(time, "time", lambda: current_time) payload = f.decrypt(token.encode("ascii"), ttl=ttl_sec) assert payload == src.encode("ascii") @@ -78,6 +81,10 @@ class TestFernet(object): def test_invalid(self, secret, token, now, ttl_sec, backend, monkeypatch): f = Fernet(secret.encode("ascii"), backend=backend) current_time = calendar.timegm(iso8601.parse_date(now).utctimetuple()) + with pytest.raises(InvalidToken): + f.decrypt_at_time( + token.encode("ascii"), ttl=ttl_sec, current_time=current_time, + ) monkeypatch.setattr(time, "time", lambda: current_time) with pytest.raises(InvalidToken): f.decrypt(token.encode("ascii"), ttl=ttl_sec) @@ -110,6 +117,8 @@ class TestFernet(object): token = f.encrypt(pt) ts = "1985-10-26T01:20:01-07:00" current_time = calendar.timegm(iso8601.parse_date(ts).utctimetuple()) + assert f.decrypt_at_time( + token, ttl=None, current_time=current_time) == pt monkeypatch.setattr(time, "time", lambda: current_time) assert f.decrypt(token, ttl=None) == pt @@ -125,8 +134,7 @@ class TestFernet(object): def test_extract_timestamp(self, monkeypatch, backend): f = Fernet(base64.urlsafe_b64encode(b"\x00" * 32), backend=backend) current_time = 1526138327 - monkeypatch.setattr(time, "time", lambda: current_time) - token = f.encrypt(b'encrypt me') + token = f.encrypt_at_time(b'encrypt me', current_time) assert f.extract_timestamp(token) == current_time with pytest.raises(InvalidToken): f.extract_timestamp(b"nonsensetoken") @@ -195,18 +203,14 @@ class TestMultiFernet(object): mf2 = MultiFernet([f2, f1]) plaintext = b"abc" - mf1_ciphertext = mf1.encrypt(plaintext) - - later = datetime.datetime.now() + datetime.timedelta(minutes=5) - later_time = time.mktime(later.timetuple()) - monkeypatch.setattr(time, "time", lambda: later_time) + original_time = int(time.time()) - 5 * 60 + mf1_ciphertext = mf1.encrypt_at_time(plaintext, original_time) - original_time, _ = Fernet._get_unverified_token_data(mf1_ciphertext) rotated_time, _ = Fernet._get_unverified_token_data( mf2.rotate(mf1_ciphertext) ) - assert later_time != rotated_time + assert int(time.time()) != rotated_time assert original_time == rotated_time def test_rotate_decrypt_no_shared_keys(self, backend): -- cgit v1.2.3