1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
import base64
import binascii
import os
import struct
import time
import cffi
import six
from cryptography.hazmat.primitives import padding, hashes
from cryptography.hazmat.primitives.hmac import HMAC
from cryptography.hazmat.primitives.block import BlockCipher, ciphers, modes
class InvalidToken(Exception):
pass
ffi = cffi.FFI()
ffi.cdef("""
bool constant_time_compare(uint8_t *, size_t, uint8_t *, size_t);
""")
lib = ffi.verify("""
#include <stdbool.h>
bool constant_time_compare(uint8_t *a, size_t len_a, uint8_t *b, size_t len_b) {
if (len_a != len_b) {
return false;
}
int result = 0;
for (size_t i = 0; i < len_a; i++) {
result |= a[i] ^ b[i];
}
return result == 0;
}
""")
class Fernet(object):
def __init__(self, key):
super(Fernet, self).__init__()
assert len(key) == 32
self.signing_key = key[:16]
self.encryption_key = key[16:]
def encrypt(self, data):
current_time = int(time.time())
iv = os.urandom(16)
return self._encrypt_from_parts(data, current_time, iv)
def _encrypt_from_parts(self, data, current_time, iv):
if isinstance(data, six.text_type):
raise TypeError(
"Unicode-objects must be encoded before encryption"
)
padder = padding.PKCS7(ciphers.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = BlockCipher(
ciphers.AES(self.encryption_key), modes.CBC(iv)
).encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
h = HMAC(self.signing_key, digestmod=hashes.SHA256)
h.update(b"\x80")
h.update(struct.pack(">Q", current_time))
h.update(iv)
h.update(ciphertext)
hmac = h.digest()
return base64.urlsafe_b64encode(
b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext + hmac
)
def decrypt(self, data, ttl=None, current_time=None):
if isinstance(data, six.text_type):
raise TypeError(
"Unicode-objects must be encoded before decryption"
)
if current_time is None:
current_time = int(time.time())
try:
data = base64.urlsafe_b64decode(data)
except (TypeError, binascii.Error):
raise InvalidToken
assert six.indexbytes(data, 0) == 0x80
timestamp = data[1:9]
iv = data[9:25]
ciphertext = data[25:-32]
if ttl is not None:
if struct.unpack(">Q", timestamp)[0] + ttl < current_time:
raise InvalidToken
h = HMAC(self.signing_key, digestmod=hashes.SHA256)
h.update(data[:-32])
hmac = h.digest()
if not lib.constant_time_compare(hmac, len(hmac), data[-32:], 32):
raise InvalidToken
decryptor = BlockCipher(
ciphers.AES(self.encryption_key), modes.CBC(iv)
).decryptor()
plaintext_padded = decryptor.update(ciphertext) + decryptor.finalize()
unpadder = padding.PKCS7(ciphers.AES.block_size).unpadder()
unpadded = unpadder.update(plaintext_padded)
try:
unpadded += unpadder.finalize()
except ValueError:
raise InvalidToken
return unpadded
|