diff options
-rw-r--r-- | src/cryptography/hazmat/bindings/openssl/binding.py | 125 |
1 files changed, 58 insertions, 67 deletions
diff --git a/src/cryptography/hazmat/bindings/openssl/binding.py b/src/cryptography/hazmat/bindings/openssl/binding.py index f85429d8..17dad4c1 100644 --- a/src/cryptography/hazmat/bindings/openssl/binding.py +++ b/src/cryptography/hazmat/bindings/openssl/binding.py @@ -9,6 +9,58 @@ import threading from cryptography.hazmat.bindings._openssl import ffi, lib +_osrandom_engine_id = ffi.new("const char[]", b"osrandom") +_osrandom_engine_name = ffi.new("const char[]", b"osrandom_engine") + +@ffi.callback("int (*)(unsigned char *, int)", error=0) +def osrandom_rand_bytes(buf, size): + signed = ffi.cast("char*", buf) + result = os.urandom(size) + signed[0:size] = result + return 1 + + +@ffi.callback("int (*)(unsigned char *, int)", error=0) +def osrandom_pseudo_rand_bytes(buf, size): + result = osrandom_rand_bytes(buf, size) + if result == 0: + return -1 + else: + return result + + +@ffi.callback("int (*)(void)") +def osrandom_rand_status(): + return 1 + + +def _register_osrandom_engine(): + looked_up_engine = lib.ENGINE_by_id(_osrandom_engine_id) + if looked_up_engine != ffi.NULL: + return 2 + + method = ffi.new( + "RAND_METHOD*", dict(bytes=osrandom_rand_bytes, + pseudorand=osrandom_pseudo_rand_bytes, + status=osrandom_rand_status) + ) + engine = lib.ENGINE_new() + try: + result = lib.ENGINE_set_id(engine, _osrandom_engine_id) + assert result == 1 + result = lib.ENGINE_set_name(engine, _osrandom_engine_name) + assert result == 1 + result = lib.ENGINE_set_RAND(engine, method) + assert result == 1 + result = lib.ENGINE_add(engine) + assert result == 1 + finally: + result = lib.ENGINE_free(engine) + assert result == 1 + looked_up_engine = lib.ENGINE_by_id(_osrandom_engine_id) + assert looked_up_engine != ffi.NULL + return 1 + class Binding(object): """ @@ -22,9 +74,11 @@ class Binding(object): _rand_method = None _init_lock = threading.Lock() _lock_init_lock = threading.Lock() - _osrandom_engine_id = ffi.new("const char[]", b"osrandom") - _osrandom_engine_name = ffi.new("const char[]", b"osrandom_engine") - _retained = [] + + # aliases for the convenience of tests. + _osrandom_engine_id = _osrandom_engine_id + _osrandom_engine_name = _osrandom_engine_name + _register_osrandom_engine = staticmethod(_register_osrandom_engine) def __init__(self): self._ensure_ffi_initialized() @@ -37,70 +91,7 @@ class Binding(object): with cls._init_lock: if not cls._lib_loaded: cls._lib_loaded = True - res = cls._register_osrandom_engine() - assert res != 0 - - @classmethod - def _register_osrandom_engine(cls): - if cls._retained: - return 2 - - def retain(it): - cls._retained.append(it) - return it - method = cls.ffi.new("RAND_METHOD*") - retain(method) - method.seed = cls.ffi.NULL - - @retain - @cls.ffi.callback("int (*)(unsigned char *, int)", error=0) - def osrandom_rand_bytes(buf, size): - signed = cls.ffi.cast("char*", buf) - result = os.urandom(size) - signed[0:size] = result - return 1 - - @retain - @cls.ffi.callback("int (*)(unsigned char *, int)", error=0) - def osrandom_pseudo_rand_bytes(buf, size): - result = osrandom_rand_bytes(buf, size) - if result == 0: - return -1 - else: - return result - - @retain - @cls.ffi.callback("int (*)(void)", error=0) - def osrandom_rand_status(): - return 1 - - @retain - @cls.ffi.callback("ENGINE_GEN_INT_FUNC_PTR", error=0) - def osrandom_init(engine): - return 1 - - @retain - @cls.ffi.callback("ENGINE_GEN_INT_FUNC_PTR", error=0) - def osrandom_finish(engine): - return 1 - - method.bytes = osrandom_rand_bytes - method.cleanup = cls.ffi.NULL - method.add = cls.ffi.NULL - method.pseudorand = osrandom_pseudo_rand_bytes - method.status = osrandom_rand_status - - e = cls.lib.ENGINE_new() - result = (cls.lib.ENGINE_set_id(e, cls._osrandom_engine_id) - and cls.lib.ENGINE_set_name(e, cls._osrandom_engine_name) - and cls.lib.ENGINE_set_RAND(e, method) - and cls.lib.ENGINE_set_init_function(e, osrandom_init) - and cls.lib.ENGINE_set_finish_function(e, osrandom_finish) - and cls.lib.ENGINE_add(e)) - if not cls.lib.ENGINE_free(e): - return 0 - assert cls.lib.ENGINE_by_id(cls._osrandom_engine_id) != cls.ffi.NULL - return result + _register_osrandom_engine() @classmethod def init_static_locks(cls): |