diff options
author | Alex Gaynor <alex.gaynor@gmail.com> | 2016-01-12 07:23:15 -0500 |
---|---|---|
committer | Alex Gaynor <alex.gaynor@gmail.com> | 2016-01-12 07:23:15 -0500 |
commit | 4b88c2d091f844c03f083ab2d6964f3980982419 (patch) | |
tree | a94b2292f0290fbd002ad2be9abceff42b0c5787 | |
parent | 0c2feae1b35131f270a17af3a8573b5cffb54e6c (diff) | |
parent | 54baf9bd14cda0e122e9b9477d1152ee81f61195 (diff) | |
download | cryptography-4b88c2d091f844c03f083ab2d6964f3980982419.tar.gz cryptography-4b88c2d091f844c03f083ab2d6964f3980982419.tar.bz2 cryptography-4b88c2d091f844c03f083ab2d6964f3980982419.zip |
Merge pull request #2663 from reaperhulk/move-more
move more functions out of the openssl backend class
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/backend.py | 50 | ||||
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/decode_asn1.py | 89 | ||||
-rw-r--r-- | src/cryptography/hazmat/backends/openssl/x509.py | 25 |
3 files changed, 87 insertions, 77 deletions
diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index 57d36acd..4a8fda99 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -6,7 +6,6 @@ from __future__ import absolute_import, division, print_function import calendar import collections -import datetime import itertools from contextlib import contextmanager @@ -1658,55 +1657,6 @@ class Backend(object): self.openssl_assert(res == 1) return self._read_mem_bio(bio) - def _asn1_integer_to_int(self, asn1_int): - bn = self._lib.ASN1_INTEGER_to_BN(asn1_int, self._ffi.NULL) - self.openssl_assert(bn != self._ffi.NULL) - bn = self._ffi.gc(bn, self._lib.BN_free) - return self._bn_to_int(bn) - - def _asn1_string_to_bytes(self, asn1_string): - return self._ffi.buffer(asn1_string.data, asn1_string.length)[:] - - def _asn1_string_to_ascii(self, asn1_string): - return self._asn1_string_to_bytes(asn1_string).decode("ascii") - - def _asn1_string_to_utf8(self, asn1_string): - buf = self._ffi.new("unsigned char **") - res = self._lib.ASN1_STRING_to_UTF8(buf, asn1_string) - self.openssl_assert(res >= 0) - self.openssl_assert(buf[0] != self._ffi.NULL) - buf = self._ffi.gc( - buf, lambda buffer: self._lib.OPENSSL_free(buffer[0]) - ) - return self._ffi.buffer(buf[0], res)[:].decode('utf8') - - def _asn1_to_der(self, asn1_type): - buf = self._ffi.new("unsigned char **") - res = self._lib.i2d_ASN1_TYPE(asn1_type, buf) - self.openssl_assert(res >= 0) - self.openssl_assert(buf[0] != self._ffi.NULL) - buf = self._ffi.gc( - buf, lambda buffer: self._lib.OPENSSL_free(buffer[0]) - ) - return self._ffi.buffer(buf[0], res)[:] - - def _parse_asn1_time(self, asn1_time): - self.openssl_assert(asn1_time != self._ffi.NULL) - generalized_time = self._lib.ASN1_TIME_to_generalizedtime( - asn1_time, self._ffi.NULL - ) - self.openssl_assert(generalized_time != self._ffi.NULL) - generalized_time = self._ffi.gc( - generalized_time, self._lib.ASN1_GENERALIZEDTIME_free - ) - return self._parse_asn1_generalized_time(generalized_time) - - def _parse_asn1_generalized_time(self, generalized_time): - time = self._asn1_string_to_ascii( - self._ffi.cast("ASN1_STRING *", generalized_time) - ) - return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ") - class GetCipherByName(object): def __init__(self, fmt): diff --git a/src/cryptography/hazmat/backends/openssl/decode_asn1.py b/src/cryptography/hazmat/backends/openssl/decode_asn1.py index c0cc01a8..42d6c858 100644 --- a/src/cryptography/hazmat/backends/openssl/decode_asn1.py +++ b/src/cryptography/hazmat/backends/openssl/decode_asn1.py @@ -4,6 +4,7 @@ from __future__ import absolute_import, division, print_function +import datetime import ipaddress from email.utils import parseaddr @@ -35,7 +36,7 @@ def _decode_x509_name_entry(backend, x509_name_entry): backend.openssl_assert(obj != backend._ffi.NULL) data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry) backend.openssl_assert(data != backend._ffi.NULL) - value = backend._asn1_string_to_utf8(data) + value = _asn1_string_to_utf8(backend, data) oid = _obj2txt(backend, obj) return x509.NameAttribute(x509.ObjectIdentifier(oid), value) @@ -64,7 +65,7 @@ def _decode_general_names(backend, gns): def _decode_general_name(backend, gn): if gn.type == backend._lib.GEN_DNS: - data = backend._asn1_string_to_bytes(gn.d.dNSName) + data = _asn1_string_to_bytes(backend, gn.d.dNSName) if not data: decoded = u"" elif data.startswith(b"*."): @@ -83,7 +84,7 @@ def _decode_general_name(backend, gn): return x509.DNSName(decoded) elif gn.type == backend._lib.GEN_URI: - data = backend._asn1_string_to_ascii(gn.d.uniformResourceIdentifier) + data = _asn1_string_to_ascii(backend, gn.d.uniformResourceIdentifier) parsed = urllib_parse.urlparse(data) if parsed.hostname: hostname = idna.decode(parsed.hostname) @@ -110,7 +111,7 @@ def _decode_general_name(backend, gn): oid = _obj2txt(backend, gn.d.registeredID) return x509.RegisteredID(x509.ObjectIdentifier(oid)) elif gn.type == backend._lib.GEN_IPADD: - data = backend._asn1_string_to_bytes(gn.d.iPAddress) + data = _asn1_string_to_bytes(backend, gn.d.iPAddress) data_len = len(data) if data_len == 8 or data_len == 32: # This is an IPv4 or IPv6 Network and not a single IP. This @@ -141,7 +142,7 @@ def _decode_general_name(backend, gn): _decode_x509_name(backend, gn.d.directoryName) ) elif gn.type == backend._lib.GEN_EMAIL: - data = backend._asn1_string_to_ascii(gn.d.rfc822Name) + data = _asn1_string_to_ascii(backend, gn.d.rfc822Name) name, address = parseaddr(data) parts = address.split(u"@") if name or not address: @@ -160,7 +161,7 @@ def _decode_general_name(backend, gn): ) elif gn.type == backend._lib.GEN_OTHERNAME: type_id = _obj2txt(backend, gn.d.otherName.type_id) - value = backend._asn1_to_der(gn.d.otherName.value) + value = _asn1_to_der(backend, gn.d.otherName.value) return x509.OtherName(x509.ObjectIdentifier(type_id), value) else: # x400Address or ediPartyName @@ -179,7 +180,7 @@ def _decode_ocsp_no_check(backend, ext): def _decode_crl_number(backend, ext): asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext) asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free) - return x509.CRLNumber(backend._asn1_integer_to_int(asn1_int)) + return x509.CRLNumber(_asn1_integer_to_int(backend, asn1_int)) class _X509ExtensionParser(object): @@ -285,10 +286,12 @@ def _decode_user_notice(backend, un): notice_reference = None if un.exptext != backend._ffi.NULL: - explicit_text = backend._asn1_string_to_utf8(un.exptext) + explicit_text = _asn1_string_to_utf8(backend, un.exptext) if un.noticeref != backend._ffi.NULL: - organization = backend._asn1_string_to_utf8(un.noticeref.organization) + organization = _asn1_string_to_utf8( + backend, un.noticeref.organization + ) num = backend._lib.sk_ASN1_INTEGER_num( un.noticeref.noticenos @@ -298,7 +301,7 @@ def _decode_user_notice(backend, un): asn1_int = backend._lib.sk_ASN1_INTEGER_value( un.noticeref.noticenos, i ) - notice_num = backend._asn1_integer_to_int(asn1_int) + notice_num = _asn1_integer_to_int(backend, asn1_int) notice_numbers.append(notice_num) notice_reference = x509.NoticeReference( @@ -320,7 +323,7 @@ def _decode_basic_constraints(backend, bc_st): if basic_constraints.pathlen == backend._ffi.NULL: path_length = None else: - path_length = backend._asn1_integer_to_int(basic_constraints.pathlen) + path_length = _asn1_integer_to_int(backend, basic_constraints.pathlen) return x509.BasicConstraints(ca, path_length) @@ -353,8 +356,8 @@ def _decode_authority_key_identifier(backend, akid): ) if akid.serial != backend._ffi.NULL: - authority_cert_serial_number = backend._asn1_integer_to_int( - akid.serial + authority_cert_serial_number = _asn1_integer_to_int( + backend, akid.serial ) return x509.AuthorityKeyIdentifier( @@ -561,7 +564,7 @@ def _decode_crl_distribution_points(backend, cdps): def _decode_inhibit_any_policy(backend, asn1_int): asn1_int = backend._ffi.cast("ASN1_INTEGER *", asn1_int) asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free) - skip_certs = backend._asn1_integer_to_int(asn1_int) + skip_certs = _asn1_integer_to_int(backend, asn1_int) return x509.InhibitAnyPolicy(skip_certs) @@ -624,7 +627,7 @@ def _decode_invalidity_date(backend, inv_date): generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free ) return x509.InvalidityDate( - backend._parse_asn1_generalized_time(generalized_time) + _parse_asn1_generalized_time(backend, generalized_time) ) @@ -654,6 +657,62 @@ def _decode_cert_issuer(backend, ext): return x509.CertificateIssuer(_decode_general_names(backend, gns)) +def _asn1_to_der(backend, asn1_type): + buf = backend._ffi.new("unsigned char **") + res = backend._lib.i2d_ASN1_TYPE(asn1_type, buf) + backend.openssl_assert(res >= 0) + backend.openssl_assert(buf[0] != backend._ffi.NULL) + buf = backend._ffi.gc( + buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0]) + ) + return backend._ffi.buffer(buf[0], res)[:] + + +def _asn1_integer_to_int(backend, asn1_int): + bn = backend._lib.ASN1_INTEGER_to_BN(asn1_int, backend._ffi.NULL) + backend.openssl_assert(bn != backend._ffi.NULL) + bn = backend._ffi.gc(bn, backend._lib.BN_free) + return backend._bn_to_int(bn) + + +def _asn1_string_to_bytes(backend, asn1_string): + return backend._ffi.buffer(asn1_string.data, asn1_string.length)[:] + + +def _asn1_string_to_ascii(backend, asn1_string): + return _asn1_string_to_bytes(backend, asn1_string).decode("ascii") + + +def _asn1_string_to_utf8(backend, asn1_string): + buf = backend._ffi.new("unsigned char **") + res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string) + backend.openssl_assert(res >= 0) + backend.openssl_assert(buf[0] != backend._ffi.NULL) + buf = backend._ffi.gc( + buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0]) + ) + return backend._ffi.buffer(buf[0], res)[:].decode('utf8') + + +def _parse_asn1_time(backend, asn1_time): + backend.openssl_assert(asn1_time != backend._ffi.NULL) + generalized_time = backend._lib.ASN1_TIME_to_generalizedtime( + asn1_time, backend._ffi.NULL + ) + backend.openssl_assert(generalized_time != backend._ffi.NULL) + generalized_time = backend._ffi.gc( + generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free + ) + return _parse_asn1_generalized_time(backend, generalized_time) + + +def _parse_asn1_generalized_time(backend, generalized_time): + time = _asn1_string_to_ascii( + backend, backend._ffi.cast("ASN1_STRING *", generalized_time) + ) + return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ") + + _EXTENSION_HANDLERS = { ExtensionOID.BASIC_CONSTRAINTS: _decode_basic_constraints, ExtensionOID.SUBJECT_KEY_IDENTIFIER: _decode_subject_key_identifier, diff --git a/src/cryptography/hazmat/backends/openssl/x509.py b/src/cryptography/hazmat/backends/openssl/x509.py index 529ffa3e..a6f7d69e 100644 --- a/src/cryptography/hazmat/backends/openssl/x509.py +++ b/src/cryptography/hazmat/backends/openssl/x509.py @@ -11,7 +11,8 @@ from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat.backends.openssl.decode_asn1 import ( _CERTIFICATE_EXTENSION_PARSER, _CRL_EXTENSION_PARSER, _CSR_EXTENSION_PARSER, _REVOKED_CERTIFICATE_EXTENSION_PARSER, - _decode_x509_name, _obj2txt, + _asn1_integer_to_int, _asn1_string_to_bytes, _decode_x509_name, _obj2txt, + _parse_asn1_time ) from cryptography.hazmat.primitives import hashes, serialization @@ -59,7 +60,7 @@ class _Certificate(object): def serial(self): asn1_int = self._backend._lib.X509_get_serialNumber(self._x509) self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL) - return self._backend._asn1_integer_to_int(asn1_int) + return _asn1_integer_to_int(self._backend, asn1_int) def public_key(self): pkey = self._backend._lib.X509_get_pubkey(self._x509) @@ -75,12 +76,12 @@ class _Certificate(object): @property def not_valid_before(self): asn1_time = self._backend._lib.X509_get_notBefore(self._x509) - return self._backend._parse_asn1_time(asn1_time) + return _parse_asn1_time(self._backend, asn1_time) @property def not_valid_after(self): asn1_time = self._backend._lib.X509_get_notAfter(self._x509) - return self._backend._parse_asn1_time(asn1_time) + return _parse_asn1_time(self._backend, asn1_time) @property def issuer(self): @@ -110,7 +111,7 @@ class _Certificate(object): @property def signature(self): - return self._backend._asn1_string_to_bytes(self._x509.signature) + return _asn1_string_to_bytes(self._backend, self._x509.signature) @property def tbs_certificate_bytes(self): @@ -154,12 +155,12 @@ class _RevokedCertificate(object): def serial_number(self): asn1_int = self._x509_revoked.serialNumber self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL) - return self._backend._asn1_integer_to_int(asn1_int) + return _asn1_integer_to_int(self._backend, asn1_int) @property def revocation_date(self): - return self._backend._parse_asn1_time( - self._x509_revoked.revocationDate) + return _parse_asn1_time( + self._backend, self._x509_revoked.revocationDate) @property def extensions(self): @@ -215,17 +216,17 @@ class _CertificateRevocationList(object): def next_update(self): nu = self._backend._lib.X509_CRL_get_nextUpdate(self._x509_crl) self._backend.openssl_assert(nu != self._backend._ffi.NULL) - return self._backend._parse_asn1_time(nu) + return _parse_asn1_time(self._backend, nu) @property def last_update(self): lu = self._backend._lib.X509_CRL_get_lastUpdate(self._x509_crl) self._backend.openssl_assert(lu != self._backend._ffi.NULL) - return self._backend._parse_asn1_time(lu) + return _parse_asn1_time(self._backend, lu) @property def signature(self): - return self._backend._asn1_string_to_bytes(self._x509_crl.signature) + return _asn1_string_to_bytes(self._backend, self._x509_crl.signature) @property def tbs_certlist_bytes(self): @@ -360,4 +361,4 @@ class _CertificateSigningRequest(object): @property def signature(self): - return self._backend._asn1_string_to_bytes(self._x509_req.signature) + return _asn1_string_to_bytes(self._backend, self._x509_req.signature) |