aboutsummaryrefslogtreecommitdiffstats
path: root/cryptography/bindings/openssl/api.py
blob: ab8aa3b62f61709b5ee7a53f24be399af4ed14bf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import, division, print_function

import sys

import cffi

from cryptography.primitives import interfaces


class API(object):
    """
    OpenSSL API wrapper.
    """
    _modules = [
        "bignum",
        "crypto",
        "evp",
        "opensslv",
    ]

    def __init__(self):
        self.ffi = cffi.FFI()
        includes = []
        functions = []
        for name in self._modules:
            __import__("cryptography.bindings.openssl." + name)
            module = sys.modules["cryptography.bindings.openssl." + name]
            self.ffi.cdef(module.TYPES)
            self.ffi.cdef(module.FUNCTIONS)
            self.ffi.cdef(module.MACROS)

            functions.append(module.FUNCTIONS)
            includes.append(module.INCLUDES)

        # We include functions here so that if we got any of their definitions
        # wrong, the underlying C compiler will explode. In C you are allowed
        # to re-declare a function if it has the same signature. That is:
        #   int foo(int);
        #   int foo(int);
        # is legal, but the following will fail to compile:
        #   int foo(int);
        #   int foo(short);
        self.lib = self.ffi.verify(
            source="\n".join(includes + functions),
            libraries=["crypto"],
        )

        self.lib.OpenSSL_add_all_algorithms()

    def openssl_version_text(self):
        """
        Friendly string name of linked OpenSSL.

        Example: OpenSSL 1.0.1e 11 Feb 2013
        """
        return self.ffi.string(self.lib.OPENSSL_VERSION_TEXT).decode("ascii")

    def create_block_cipher_context(self, cipher, mode):
        ctx = self.ffi.new("EVP_CIPHER_CTX *")
        res = self.lib.EVP_CIPHER_CTX_init(ctx)
        assert res != 0
        ctx = self.ffi.gc(ctx, self.lib.EVP_CIPHER_CTX_cleanup)
        # TODO: compute name using a better algorithm
        ciphername = "{0}-{1}-{2}".format(
            cipher.name, cipher.key_size, mode.name
        )
        evp_cipher = self.lib.EVP_get_cipherbyname(ciphername.encode("ascii"))
        assert evp_cipher != self.ffi.NULL
        if isinstance(mode, interfaces.ModeWithInitializationVector):
            iv_nonce = mode.initialization_vector
        else:
            iv_nonce = self.ffi.NULL

        # TODO: Sometimes this needs to be a DecryptInit, when?
        res = self.lib.EVP_EncryptInit_ex(
            ctx, evp_cipher, self.ffi.NULL, cipher.key, iv_nonce
        )
        assert res != 0

        # We purposely disable padding here as it's handled higher up in the
        # API.
        self.lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
        return ctx

    def update_encrypt_context(self, ctx, plaintext):
        buf = self.ffi.new("unsigned char[]", len(plaintext))
        outlen = self.ffi.new("int *")
        res = self.lib.EVP_EncryptUpdate(
            ctx, buf, outlen, plaintext, len(plaintext)
        )
        assert res != 0
        return self.ffi.buffer(buf)[:outlen[0]]

    def finalize_encrypt_context(self, ctx):
        cipher = self.lib.EVP_CIPHER_CTX_cipher(ctx)
        block_size = self.lib.EVP_CIPHER_block_size(cipher)
        buf = self.ffi.new("unsigned char[]", block_size)
        outlen = self.ffi.new("int *")
        res = self.lib.EVP_EncryptFinal_ex(ctx, buf, outlen)
        assert res != 0
        res = self.lib.EVP_CIPHER_CTX_cleanup(ctx)
        assert res != 0
        return self.ffi.buffer(buf)[:outlen[0]]


api = API()