parse certificates with pure rust (#6147)

* parse certificates with pure rust

* fix coverage

* various review comments

* save the buffer

* more feedback
This commit is contained in:
Paul Kehrer 2021-07-25 15:03:14 -07:00 committed by GitHub
parent 6030181109
commit 87f43fb77e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 730 additions and 301 deletions

View file

@ -11,6 +11,13 @@ Changelog
* Changed the :ref:`version scheme <api-stability:versioning>`. This will
result in us incrementing the major version more frequently, but does not
change our existing backwards compatibility policy.
* **BACKWARDS INCOMPATIBLE:** The X.509 certificate parser no longer allows
negative serial numbers. :rfc:`5280` has always prohibited these.
* **BACKWARDS INCOMPATIBLE:** Invalid ASN.1 found during certificate parsing
will raise an error on initial parse rather than when the invalid field is
accessed.
* **BACKWARDS INCOMPATIBLE:** Values passed to the X.509 PEM parser must be
a single PEM payload and will error on extraneous data.
* Added support for
:class:`~cryptography.hazmat.primitives.hashes.SM3` and
:class:`~cryptography.hazmat.primitives.ciphers.algorithms.SM4`,

View file

@ -500,18 +500,6 @@ A specific ``backend`` may provide one or more of these interfaces.
A backend with methods for working with X.509 objects.
.. method:: load_pem_x509_certificate(data)
:param bytes data: PEM formatted certificate data.
:returns: An instance of :class:`~cryptography.x509.Certificate`.
.. method:: load_der_x509_certificate(data)
:param bytes data: DER formatted certificate data.
:returns: An instance of :class:`~cryptography.x509.Certificate`.
.. method:: load_pem_x509_csr(data)
.. versionadded:: 0.9

View file

@ -280,18 +280,6 @@ class DERSerializationBackend(metaclass=abc.ABCMeta):
class X509Backend(metaclass=abc.ABCMeta):
@abc.abstractmethod
def load_pem_x509_certificate(self, data: bytes) -> "Certificate":
"""
Load an X.509 certificate from PEM encoded data.
"""
@abc.abstractmethod
def load_der_x509_certificate(self, data: bytes) -> "Certificate":
"""
Load an X.509 certificate from DER encoded data.
"""
@abc.abstractmethod
def load_der_x509_csr(self, data: bytes) -> "CertificateSigningRequest":
"""

View file

@ -75,7 +75,6 @@ from cryptography.hazmat.backends.openssl.x448 import (
_X448PublicKey,
)
from cryptography.hazmat.backends.openssl.x509 import (
_Certificate,
_CertificateRevocationList,
_CertificateSigningRequest,
_RevokedCertificate,
@ -383,17 +382,11 @@ class Backend(BackendInterface):
)
def _register_x509_ext_parsers(self):
self._certificate_extension_parser = _X509ExtensionParser(
self,
ext_count=self._lib.X509_get_ext_count,
get_ext=self._lib.X509_get_ext,
rust_callback=rust_x509.parse_x509_extension,
)
self._csr_extension_parser = _X509ExtensionParser(
self,
ext_count=self._lib.sk_X509_EXTENSION_num,
get_ext=self._lib.sk_X509_EXTENSION_value,
rust_callback=rust_x509.parse_x509_extension,
rust_callback=rust_x509.parse_csr_extension,
)
self._revoked_cert_extension_parser = _X509ExtensionParser(
self,
@ -471,6 +464,7 @@ class Backend(BackendInterface):
def _bn_to_int(self, bn):
assert bn != self._ffi.NULL
self.openssl_assert(not self._lib.BN_is_negative(bn))
bn_num_bytes = self._lib.BN_num_bytes(bn)
bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
@ -478,8 +472,6 @@ class Backend(BackendInterface):
# A zero length means the BN has value 0
self.openssl_assert(bin_len >= 0)
val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
if self._lib.BN_is_negative(bn):
val = -val
return val
def _int_to_bn(self, num, bn=None):
@ -957,7 +949,7 @@ class Backend(BackendInterface):
builder: x509.CertificateBuilder,
private_key: PRIVATE_KEY_TYPES,
algorithm: typing.Optional[hashes.HashAlgorithm],
) -> _Certificate:
) -> x509.Certificate:
if not isinstance(builder, x509.CertificateBuilder):
raise TypeError("Builder type mismatch.")
if builder._public_key is None:
@ -1028,7 +1020,7 @@ class Backend(BackendInterface):
errors = self._consume_errors_with_text()
raise ValueError("Signing failed", errors)
return _Certificate(self, x509_cert)
return self._ossl2cert(x509_cert)
def _evp_md_x509_null_if_eddsa(self, private_key, algorithm):
if isinstance(
@ -1333,31 +1325,19 @@ class Backend(BackendInterface):
self._handle_key_loading_error()
def load_pem_x509_certificate(self, data: bytes) -> _Certificate:
mem_bio = self._bytes_to_bio(data)
x509 = self._lib.PEM_read_bio_X509(
mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
)
if x509 == self._ffi.NULL:
self._consume_errors()
raise ValueError(
"Unable to load certificate. See https://cryptography.io/en/"
"latest/faq.html#why-can-t-i-import-my-pem-file for more"
" details."
)
x509 = self._ffi.gc(x509, self._lib.X509_free)
return _Certificate(self, x509)
def load_der_x509_certificate(self, data: bytes) -> _Certificate:
def _cert2ossl(self, cert: x509.Certificate) -> typing.Any:
data = cert.public_bytes(serialization.Encoding.DER)
mem_bio = self._bytes_to_bio(data)
x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL)
if x509 == self._ffi.NULL:
self._consume_errors()
raise ValueError("Unable to load certificate")
self.openssl_assert(x509 != self._ffi.NULL)
x509 = self._ffi.gc(x509, self._lib.X509_free)
return _Certificate(self, x509)
return x509
def _ossl2cert(self, x509: typing.Any) -> x509.Certificate:
bio = self._create_mem_bio_gc()
res = self._lib.i2d_X509_bio(bio, x509)
self.openssl_assert(res == 1)
return rust_x509.load_der_x509_certificate(self._read_mem_bio(bio))
def load_pem_x509_crl(self, data: bytes) -> _CertificateRevocationList:
mem_bio = self._bytes_to_bio(data)
@ -1661,7 +1641,9 @@ class Backend(BackendInterface):
ocsp_req = self._ffi.gc(ocsp_req, self._lib.OCSP_REQUEST_free)
cert, issuer, algorithm = builder._request
evp_md = self._evp_md_non_null_from_algorithm(algorithm)
certid = self._lib.OCSP_cert_to_id(evp_md, cert._x509, issuer._x509)
ossl_cert = self._cert2ossl(cert)
ossl_issuer = self._cert2ossl(issuer)
certid = self._lib.OCSP_cert_to_id(evp_md, ossl_cert, ossl_issuer)
self.openssl_assert(certid != self._ffi.NULL)
onereq = self._lib.OCSP_request_add0_id(ocsp_req, certid)
self.openssl_assert(onereq != self._ffi.NULL)
@ -1687,10 +1669,12 @@ class Backend(BackendInterface):
evp_md = self._evp_md_non_null_from_algorithm(
builder._response._algorithm
)
ossl_cert = self._cert2ossl(builder._response._cert)
ossl_issuer = self._cert2ossl(builder._response._issuer)
certid = self._lib.OCSP_cert_to_id(
evp_md,
builder._response._cert._x509,
builder._response._issuer._x509,
ossl_cert,
ossl_issuer,
)
self.openssl_assert(certid != self._ffi.NULL)
certid = self._ffi.gc(certid, self._lib.OCSP_CERTID_free)
@ -1732,9 +1716,13 @@ class Backend(BackendInterface):
if responder_encoding is ocsp.OCSPResponderEncoding.HASH:
flags |= self._lib.OCSP_RESPID_KEY
# This list is to keep the x509 values alive until end of function
ossl_certs = []
if builder._certs is not None:
for cert in builder._certs:
res = self._lib.OCSP_basic_add1_cert(basic, cert._x509)
ossl_cert = self._cert2ossl(cert)
ossl_certs.append(ossl_cert)
res = self._lib.OCSP_basic_add1_cert(basic, ossl_cert)
self.openssl_assert(res == 1)
self._create_x509_extensions(
@ -1745,9 +1733,10 @@ class Backend(BackendInterface):
gc=True,
)
ossl_cert = self._cert2ossl(responder_cert)
res = self._lib.OCSP_basic_sign(
basic,
responder_cert._x509,
ossl_cert,
private_key._evp_pkey,
evp_md,
self._ffi.NULL,
@ -2523,7 +2512,7 @@ class Backend(BackendInterface):
if x509_ptr[0] != self._ffi.NULL:
x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free)
cert = _Certificate(self, x509)
cert = self._ossl2cert(x509)
if sk_x509_ptr[0] != self._ffi.NULL:
sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free)
@ -2541,7 +2530,8 @@ class Backend(BackendInterface):
x509 = self._lib.sk_X509_value(sk_x509, i)
self.openssl_assert(x509 != self._ffi.NULL)
x509 = self._ffi.gc(x509, self._lib.X509_free)
additional_certificates.append(_Certificate(self, x509))
addl_cert = self._ossl2cert(x509)
additional_certificates.append(addl_cert)
return (key, cert, additional_certificates)
@ -2580,17 +2570,23 @@ class Backend(BackendInterface):
sk_x509 = self._lib.sk_X509_new_null()
sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free)
# This list is to keep the x509 values alive until end of function
ossl_cas = []
for ca in cas:
res = self._lib.sk_X509_push(sk_x509, ca._x509)
ossl_ca = self._cert2ossl(ca)
ossl_cas.append(ossl_ca)
res = self._lib.sk_X509_push(sk_x509, ossl_ca)
backend.openssl_assert(res >= 1)
with self._zeroed_null_terminated_buf(password) as password_buf:
with self._zeroed_null_terminated_buf(name) as name_buf:
ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL
evp_pkey = key._evp_pkey if key else self._ffi.NULL
p12 = self._lib.PKCS12_create(
password_buf,
name_buf,
key._evp_pkey if key else self._ffi.NULL,
cert._x509 if cert else self._ffi.NULL,
evp_pkey,
ossl_cert,
sk_x509,
nid_key,
nid_cert,
@ -2664,7 +2660,8 @@ class Backend(BackendInterface):
# refcount. On 1.1.0+ it returns 1 for success.
self.openssl_assert(res >= 1)
x509 = self._ffi.gc(x509, self._lib.X509_free)
certs.append(_Certificate(self, x509))
cert = self._ossl2cert(x509)
certs.append(cert)
return certs
@ -2678,8 +2675,12 @@ class Backend(BackendInterface):
else:
certs = self._lib.sk_X509_new_null()
certs = self._ffi.gc(certs, self._lib.sk_X509_free)
# This list is to keep the x509 values alive until end of function
ossl_certs = []
for cert in builder._additional_certs:
res = self._lib.sk_X509_push(certs, cert._x509)
ossl_cert = self._cert2ossl(cert)
ossl_certs.append(ossl_cert)
res = self._lib.sk_X509_push(certs, ossl_cert)
self.openssl_assert(res >= 1)
if pkcs7.PKCS7Options.DetachedSignature in options:
@ -2711,9 +2712,10 @@ class Backend(BackendInterface):
signer_flags |= self._lib.PKCS7_NOCERTS
for certificate, private_key, hash_algorithm in builder._signers:
ossl_cert = self._cert2ossl(certificate)
md = self._evp_md_non_null_from_algorithm(hash_algorithm)
p7signerinfo = self._lib.PKCS7_sign_add_signer(
p7, certificate._x509, private_key._evp_pkey, md, signer_flags
p7, ossl_cert, private_key._evp_pkey, md, signer_flags
)
self.openssl_assert(p7signerinfo != self._ffi.NULL)

View file

@ -169,11 +169,7 @@ def _asn1_string_to_ascii(backend, asn1_string):
def _asn1_string_to_utf8(backend, asn1_string) -> str:
buf = backend._ffi.new("unsigned char **")
res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
if res == -1:
raise ValueError(
"Unsupported ASN1 string type. Type: {}".format(asn1_string.type)
)
backend.openssl_assert(res >= 0)
backend.openssl_assert(buf[0] != backend._ffi.NULL)
buf = backend._ffi.gc(
buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])

View file

@ -15,7 +15,6 @@ from cryptography.hazmat.backends.openssl.decode_asn1 import (
_obj2txt,
_parse_asn1_generalized_time,
)
from cryptography.hazmat.backends.openssl.x509 import _Certificate
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.x509.ocsp import (
OCSPCertStatus,
@ -178,11 +177,7 @@ class _OCSPResponse(OCSPResponse):
for i in range(num):
x509_ptr = self._backend._lib.sk_X509_value(sk_x509, i)
self._backend.openssl_assert(x509_ptr != self._backend._ffi.NULL)
cert = _Certificate(self._backend, x509_ptr)
# We need to keep the OCSP response that the certificate came from
# alive until the Certificate object itself goes out of scope, so
# we give it a private reference.
cert._ocsp_resp_ref = self
cert = self._backend._ossl2cert(x509_ptr)
certs.append(cert)
return certs

View file

@ -6,6 +6,7 @@
import datetime
import operator
import typing
import warnings
from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm
@ -26,145 +27,15 @@ from cryptography.x509.base import PUBLIC_KEY_TYPES
from cryptography.x509.name import _ASN1Type
class _Certificate(x509.Certificate):
# Keep-alive reference used by OCSP
_ocsp_resp_ref: typing.Any
def __init__(self, backend, x509_cert):
self._backend = backend
self._x509 = x509_cert
version = self._backend._lib.X509_get_version(self._x509)
if version == 0:
self._version = x509.Version.v1
elif version == 2:
self._version = x509.Version.v3
else:
raise x509.InvalidVersion(
"{} is not a valid X509 version".format(version), version
# This exists for pyOpenSSL compatibility and SHOULD NOT BE USED
# WE WILL REMOVE THIS VERY SOON.
def _Certificate(backend, x509) -> x509.Certificate: # noqa: N802
warnings.warn(
"This version of cryptography contains a temporary pyOpenSSL "
"fallback path. Upgrade pyOpenSSL now.",
utils.DeprecatedIn35,
)
def __repr__(self):
return "<Certificate(subject={}, ...)>".format(self.subject)
def __eq__(self, other: object) -> bool:
if not isinstance(other, _Certificate):
return NotImplemented
res = self._backend._lib.X509_cmp(self._x509, other._x509)
return res == 0
def __ne__(self, other: object) -> bool:
return not self == other
def __hash__(self) -> int:
return hash(self.public_bytes(serialization.Encoding.DER))
def __deepcopy__(self, memo):
return self
def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
h = hashes.Hash(algorithm, self._backend)
h.update(self.public_bytes(serialization.Encoding.DER))
return h.finalize()
version = utils.read_only_property("_version")
@property
def serial_number(self) -> int:
asn1_int = self._backend._lib.X509_get_serialNumber(self._x509)
self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
return _asn1_integer_to_int(self._backend, asn1_int)
def public_key(self) -> PUBLIC_KEY_TYPES:
pkey = self._backend._lib.X509_get_pubkey(self._x509)
if pkey == self._backend._ffi.NULL:
# Remove errors from the stack.
self._backend._consume_errors()
raise ValueError("Certificate public key is of an unknown type")
pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
return self._backend._evp_pkey_to_public_key(pkey)
@property
def not_valid_before(self) -> datetime.datetime:
asn1_time = self._backend._lib.X509_get0_notBefore(self._x509)
return _parse_asn1_time(self._backend, asn1_time)
@property
def not_valid_after(self) -> datetime.datetime:
asn1_time = self._backend._lib.X509_get0_notAfter(self._x509)
return _parse_asn1_time(self._backend, asn1_time)
@property
def issuer(self) -> x509.Name:
issuer = self._backend._lib.X509_get_issuer_name(self._x509)
self._backend.openssl_assert(issuer != self._backend._ffi.NULL)
return _decode_x509_name(self._backend, issuer)
@property
def subject(self) -> x509.Name:
subject = self._backend._lib.X509_get_subject_name(self._x509)
self._backend.openssl_assert(subject != self._backend._ffi.NULL)
return _decode_x509_name(self._backend, subject)
@property
def signature_hash_algorithm(
self,
) -> typing.Optional[hashes.HashAlgorithm]:
oid = self.signature_algorithm_oid
try:
return x509._SIG_OIDS_TO_HASH[oid]
except KeyError:
raise UnsupportedAlgorithm(
"Signature algorithm OID:{} not recognized".format(oid)
)
@property
def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
alg = self._backend._ffi.new("X509_ALGOR **")
self._backend._lib.X509_get0_signature(
self._backend._ffi.NULL, alg, self._x509
)
self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
oid = _obj2txt(self._backend, alg[0].algorithm)
return x509.ObjectIdentifier(oid)
@utils.cached_property
def extensions(self) -> x509.Extensions:
return self._backend._certificate_extension_parser.parse(self._x509)
@property
def signature(self) -> bytes:
sig = self._backend._ffi.new("ASN1_BIT_STRING **")
self._backend._lib.X509_get0_signature(
sig, self._backend._ffi.NULL, self._x509
)
self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
return _asn1_string_to_bytes(self._backend, sig[0])
@property
def tbs_certificate_bytes(self) -> bytes:
pp = self._backend._ffi.new("unsigned char **")
res = self._backend._lib.i2d_re_X509_tbs(self._x509, pp)
self._backend.openssl_assert(res > 0)
pp = self._backend._ffi.gc(
pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
)
return self._backend._ffi.buffer(pp[0], res)[:]
def public_bytes(self, encoding: serialization.Encoding) -> bytes:
bio = self._backend._create_mem_bio_gc()
if encoding is serialization.Encoding.PEM:
res = self._backend._lib.PEM_write_bio_X509(bio, self._x509)
elif encoding is serialization.Encoding.DER:
res = self._backend._lib.i2d_X509_bio(bio, self._x509)
else:
raise TypeError("encoding must be an item from the Encoding enum")
self._backend.openssl_assert(res == 1)
return self._backend._read_mem_bio(bio)
return backend._ossl2cert(x509)
class _RevokedCertificate(x509.RevokedCertificate):

View file

@ -1,13 +1,20 @@
from cryptography.x509 import (
ExtensionType,
PrecertificateSignedCertificateTimestamps,
)
import datetime
import typing
def parse_x509_extension(der_oid: bytes, ext_data: bytes) -> ExtensionType: ...
def parse_crl_entry_ext(der_oid: bytes, data: bytes) -> ExtensionType: ...
def parse_crl_extension(der_oid: bytes, ext_data: bytes) -> ExtensionType: ...
from cryptography import x509
def parse_csr_extension(
der_oid: bytes, ext_data: bytes
) -> x509.ExtensionType: ...
def parse_crl_entry_ext(der_oid: bytes, data: bytes) -> x509.ExtensionType: ...
def parse_crl_extension(
der_oid: bytes, ext_data: bytes
) -> x509.ExtensionType: ...
def load_pem_x509_certificate(data: bytes) -> x509.Certificate: ...
def load_der_x509_certificate(data: bytes) -> x509.Certificate: ...
def encode_precertificate_signed_certificate_timestamps(
extension: PrecertificateSignedCertificateTimestamps,
extension: x509.PrecertificateSignedCertificateTimestamps,
) -> bytes: ...
class Sct: ...
class Certificate: ...

View file

@ -23,6 +23,7 @@ class CryptographyDeprecationWarning(UserWarning):
PersistentlyDeprecated2017 = CryptographyDeprecationWarning
PersistentlyDeprecated2019 = CryptographyDeprecationWarning
DeprecatedIn34 = CryptographyDeprecationWarning
DeprecatedIn35 = CryptographyDeprecationWarning
def _check_bytes(name: str, value: bytes) -> None:

View file

@ -11,6 +11,7 @@ import typing
from cryptography import utils
from cryptography.hazmat.backends import _get_backend
from cryptography.hazmat.backends.interfaces import Backend
from cryptography.hazmat.bindings._rust import x509 as rust_x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import (
dsa,
@ -189,6 +190,10 @@ class Certificate(metaclass=abc.ABCMeta):
"""
# Runtime isinstance checks need this since the rust class is not a subclass.
Certificate.register(rust_x509.Certificate)
class RevokedCertificate(metaclass=abc.ABCMeta):
@abc.abstractproperty
def serial_number(self) -> int:
@ -413,18 +418,18 @@ class CertificateSigningRequest(metaclass=abc.ABCMeta):
"""
# Backend argument preserved for API compatibility, but ignored.
def load_pem_x509_certificate(
data: bytes, backend: typing.Optional[Backend] = None
data: bytes, backend: typing.Any = None
) -> Certificate:
backend = _get_backend(backend)
return backend.load_pem_x509_certificate(data)
return rust_x509.load_pem_x509_certificate(data)
# Backend argument preserved for API compatibility, but ignored.
def load_der_x509_certificate(
data: bytes, backend: typing.Optional[Backend] = None
data: bytes, backend: typing.Any = None
) -> Certificate:
backend = _get_backend(backend)
return backend.load_der_x509_certificate(data)
return rust_x509.load_der_x509_certificate(data)
def load_pem_x509_csr(

33
src/rust/Cargo.lock generated
View file

@ -41,6 +41,12 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "base64"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bitflags"
version = "1.2.1"
@ -71,6 +77,7 @@ dependencies = [
"chrono",
"lazy_static",
"ouroboros",
"pem",
"pyo3",
]
@ -220,6 +227,17 @@ dependencies = [
"proc-macro-hack",
]
[[package]]
name = "pem"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd56cbd21fea48d0c440b41cd69c589faacade08c992d9a54e471b79d0fd13eb"
dependencies = [
"base64",
"once_cell",
"regex",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
@ -325,6 +343,21 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "scopeguard"
version = "1.1.0"

View file

@ -9,6 +9,7 @@ publish = false
lazy_static = "1"
pyo3 = { version = "0.14.1" }
asn1 = { version = "0.6", default-features = false, features = ["derive"] }
pem = "0.8"
chrono = { version = "0.4", default-features = false, features = ["alloc"] }
ouroboros = "0.10"

View file

@ -2,6 +2,7 @@
// 2.0, and the BSD License. See the LICENSE file in the root of this repository
// for complete details.
use crate::x509::Name;
use pyo3::class::basic::CompareOp;
use pyo3::conversion::ToPyObject;
@ -22,6 +23,15 @@ impl From<pyo3::PyErr> for PyAsn1Error {
}
}
impl From<pem::PemError> for PyAsn1Error {
fn from(e: pem::PemError) -> PyAsn1Error {
PyAsn1Error::Py(pyo3::exceptions::PyValueError::new_err(format!(
"Unable to load PEM file. See https://cryptography.io/en/latest/faq.html#why-can-t-i-import-my-pem-file for more details. {:?}",
e
)))
}
}
impl From<PyAsn1Error> for pyo3::PyErr {
fn from(e: PyAsn1Error) -> pyo3::PyErr {
match e {
@ -178,14 +188,6 @@ struct TbsCertificate<'a> {
_extensions: Option<asn1::Sequence<'a>>,
}
pub(crate) type Name<'a> = asn1::SequenceOf<'a, asn1::SetOf<'a, AttributeTypeValue<'a>>>;
#[derive(asn1::Asn1Read)]
pub(crate) struct AttributeTypeValue<'a> {
pub(crate) type_id: asn1::ObjectIdentifier<'a>,
pub(crate) value: asn1::Tlv<'a>,
}
#[derive(asn1::Asn1Read)]
struct Validity<'a> {
not_before: asn1::Tlv<'a>,

View file

@ -2,7 +2,7 @@
// 2.0, and the BSD License. See the LICENSE file in the root of this repository
// for complete details.
use crate::asn1::{big_asn1_uint_to_py, AttributeTypeValue, Name, PyAsn1Error, PyAsn1Result};
use crate::asn1::{big_asn1_uint_to_py, PyAsn1Error, PyAsn1Result};
use chrono::{Datelike, Timelike};
use pyo3::conversion::ToPyObject;
use pyo3::types::IntoPyDict;
@ -43,6 +43,502 @@ lazy_static::lazy_static! {
static ref CP_USER_NOTICE_OID: asn1::ObjectIdentifier<'static> = asn1::ObjectIdentifier::from_string("1.3.6.1.5.5.7.2.2").unwrap();
}
#[derive(asn1::Asn1Read, asn1::Asn1Write)]
struct RawCertificate<'a> {
tbs_cert: TbsCertificate<'a>,
signature_alg: AlgorithmIdentifier<'a>,
signature: asn1::BitString<'a>,
}
#[derive(asn1::Asn1Read, asn1::Asn1Write)]
struct TbsCertificate<'a> {
#[explicit(0)]
#[default(0)]
version: u8,
serial: asn1::BigUint<'a>,
_signature_alg: asn1::Sequence<'a>,
issuer: Name<'a>,
validity: Validity,
subject: Name<'a>,
spki: asn1::Sequence<'a>,
#[implicit(1)]
_issuer_unique_id: Option<asn1::BitString<'a>>,
#[implicit(2)]
_subject_unique_id: Option<asn1::BitString<'a>>,
#[explicit(3)]
extensions: Option<Extensions<'a>>,
}
pub(crate) type Name<'a> = asn1::SequenceOf<'a, asn1::SetOf<'a, AttributeTypeValue<'a>>>;
#[derive(asn1::Asn1Read, asn1::Asn1Write)]
pub(crate) struct AttributeTypeValue<'a> {
pub(crate) type_id: asn1::ObjectIdentifier<'a>,
pub(crate) value: asn1::Tlv<'a>,
}
#[derive(asn1::Asn1Read, asn1::Asn1Write)]
enum Time {
UtcTime(asn1::UtcTime),
GeneralizedTime(asn1::GeneralizedTime),
}
impl Time {
fn as_chrono(&self) -> &chrono::DateTime<chrono::Utc> {
match self {
Time::UtcTime(data) => data.as_chrono(),
Time::GeneralizedTime(data) => data.as_chrono(),
}
}
}
#[derive(asn1::Asn1Read, asn1::Asn1Write)]
pub(crate) struct Validity {
not_before: Time,
not_after: Time,
}
#[ouroboros::self_referencing]
struct OwnedRawCertificate {
data: Vec<u8>,
#[borrows(data)]
#[covariant]
value: RawCertificate<'this>,
}
#[pyo3::prelude::pyclass]
struct Certificate {
raw: OwnedRawCertificate,
cached_extensions: Option<pyo3::PyObject>,
}
#[pyo3::prelude::pyproto]
impl pyo3::class::basic::PyObjectProtocol for Certificate {
fn __hash__(&self) -> u64 {
let mut hasher = DefaultHasher::new();
self.raw.borrow_data().hash(&mut hasher);
hasher.finish()
}
fn __richcmp__(
&self,
other: pyo3::pycell::PyRef<Certificate>,
op: pyo3::class::basic::CompareOp,
) -> pyo3::PyResult<bool> {
match op {
pyo3::class::basic::CompareOp::Eq => {
Ok(self.raw.borrow_data() == other.raw.borrow_data())
}
pyo3::class::basic::CompareOp::Ne => {
Ok(self.raw.borrow_data() != other.raw.borrow_data())
}
_ => Err(pyo3::exceptions::PyTypeError::new_err(
"Certificates cannot be ordered",
)),
}
}
fn __repr__(&self) -> pyo3::PyResult<String> {
let mut repr = String::from("<Certificate(subject=<Name(");
for rdn in self.raw.borrow_value().tbs_cert.subject.clone() {
for attribute in rdn {
let mut attr = attribute.type_id.to_string();
attr.push('=');
attr.push_str(std::str::from_utf8(attribute.value.data()).unwrap());
repr.push_str(&attr);
repr.push_str(", ");
}
}
repr.push_str(")>, ...)>");
Ok(repr)
}
}
#[pyo3::prelude::pymethods]
impl Certificate {
fn __deepcopy__(
slf: pyo3::pycell::PyRef<'_, Self>,
_memo: pyo3::PyObject,
) -> pyo3::pycell::PyRef<'_, Self> {
slf
}
fn public_key<'p>(&self, py: pyo3::Python<'p>) -> pyo3::PyResult<&'p pyo3::PyAny> {
// This makes an unnecessary copy. It'd be nice to get rid of it.
let serialized = pyo3::types::PyBytes::new(
py,
&asn1::write_single(&self.raw.borrow_value().tbs_cert.spki),
);
py.import("cryptography.hazmat.primitives.serialization")?
.getattr("load_der_public_key")?
.call1((serialized,))
}
fn fingerprint<'p>(
&self,
py: pyo3::Python<'p>,
algorithm: pyo3::PyObject,
) -> pyo3::PyResult<&'p pyo3::PyAny> {
let hasher = py
.import("cryptography.hazmat.primitives.hashes")?
.getattr("Hash")?
.call1((algorithm,))?;
// This makes an unnecessary copy. It'd be nice to get rid of it.
let serialized =
pyo3::types::PyBytes::new(py, &asn1::write_single(&self.raw.borrow_value()));
hasher.call_method1("update", (serialized,))?;
hasher.call_method0("finalize")
}
fn public_bytes<'p>(
&self,
py: pyo3::Python<'p>,
encoding: &pyo3::PyAny,
) -> pyo3::PyResult<&'p pyo3::types::PyBytes> {
let encoding_class = py
.import("cryptography.hazmat.primitives.serialization")?
.getattr("Encoding")?;
let result = asn1::write_single(self.raw.borrow_value());
if encoding == encoding_class.getattr("DER")? {
Ok(pyo3::types::PyBytes::new(py, &result))
} else if encoding == encoding_class.getattr("PEM")? {
let pem = pem::encode_config(
&pem::Pem {
tag: "CERTIFICATE".to_string(),
contents: result,
},
pem::EncodeConfig {
line_ending: pem::LineEnding::LF,
},
)
.into_bytes();
Ok(pyo3::types::PyBytes::new(py, &pem))
} else {
Err(pyo3::exceptions::PyTypeError::new_err(
"encoding must be an item from the Encoding enum",
))
}
}
#[getter]
fn serial_number<'p>(&self, py: pyo3::Python<'p>) -> Result<&'p pyo3::PyAny, PyAsn1Error> {
Ok(big_asn1_uint_to_py(
py,
self.raw.borrow_value().tbs_cert.serial,
)?)
}
#[getter]
fn version<'p>(&self, py: pyo3::Python<'p>) -> Result<&'p pyo3::PyAny, PyAsn1Error> {
let version = &self.raw.borrow_value().tbs_cert.version;
cert_version(py, *version)
}
#[getter]
fn issuer<'p>(&self, py: pyo3::Python<'p>) -> pyo3::PyResult<&'p pyo3::PyAny> {
parse_name(py, &self.raw.borrow_value().tbs_cert.issuer)
}
#[getter]
fn subject<'p>(&self, py: pyo3::Python<'p>) -> pyo3::PyResult<&'p pyo3::PyAny> {
parse_name(py, &self.raw.borrow_value().tbs_cert.subject)
}
#[getter]
fn tbs_certificate_bytes<'p>(
&self,
py: pyo3::Python<'p>,
) -> Result<&'p pyo3::types::PyBytes, PyAsn1Error> {
let result = asn1::write_single(&self.raw.borrow_value().tbs_cert);
Ok(pyo3::types::PyBytes::new(py, &result))
}
#[getter]
fn signature<'p>(&self, py: pyo3::Python<'p>) -> Result<&'p pyo3::types::PyBytes, PyAsn1Error> {
Ok(pyo3::types::PyBytes::new(
py,
self.raw.borrow_value().signature.as_bytes(),
))
}
#[getter]
fn not_valid_before<'p>(&self, py: pyo3::Python<'p>) -> pyo3::PyResult<&'p pyo3::PyAny> {
let chrono = &self
.raw
.borrow_value()
.tbs_cert
.validity
.not_before
.as_chrono();
chrono_to_py(py, chrono)
}
#[getter]
fn not_valid_after<'p>(&self, py: pyo3::Python<'p>) -> pyo3::PyResult<&'p pyo3::PyAny> {
let chrono = &self
.raw
.borrow_value()
.tbs_cert
.validity
.not_after
.as_chrono();
chrono_to_py(py, chrono)
}
#[getter]
fn signature_hash_algorithm<'p>(
&self,
py: pyo3::Python<'p>,
) -> Result<&'p pyo3::PyAny, PyAsn1Error> {
let sig_oids_to_hash = py
.import("cryptography.x509")?
.getattr("_SIG_OIDS_TO_HASH")?;
let hash_alg = sig_oids_to_hash.get_item(self.signature_algorithm_oid(py)?);
match hash_alg {
Ok(data) => Ok(data),
Err(_) => Err(PyAsn1Error::from(pyo3::PyErr::from_instance(
py.import("cryptography.exceptions")?.call_method1(
"UnsupportedAlgorithm",
(format!(
"Signature algorithm OID: {} not recognized",
self.raw.borrow_value().signature_alg.oid.to_string()
),),
)?,
))),
}
}
#[getter]
fn signature_algorithm_oid<'p>(&self, py: pyo3::Python<'p>) -> pyo3::PyResult<&'p pyo3::PyAny> {
py.import("cryptography.x509")?.call_method1(
"ObjectIdentifier",
(self.raw.borrow_value().signature_alg.oid.to_string(),),
)
}
#[getter]
fn extensions(&mut self, py: pyo3::Python<'_>) -> pyo3::PyResult<pyo3::PyObject> {
let x509_module = py.import("cryptography.x509")?;
parse_and_cache_extensions(
py,
&mut self.cached_extensions,
&self.raw.borrow_value().tbs_cert.extensions,
|oid, ext_data| {
if oid == &*SUBJECT_ALTERNATIVE_NAME_OID {
let gn_seq =
asn1::parse_single::<asn1::SequenceOf<'_, GeneralName<'_>>>(ext_data)?;
let sans = parse_general_names(py, gn_seq)?;
Ok(Some(
x509_module
.getattr("SubjectAlternativeName")?
.call1((sans,))?,
))
} else if oid == &*ISSUER_ALTERNATIVE_NAME_OID {
let gn_seq =
asn1::parse_single::<asn1::SequenceOf<'_, GeneralName<'_>>>(ext_data)?;
let ians = parse_general_names(py, gn_seq)?;
Ok(Some(
x509_module
.getattr("IssuerAlternativeName")?
.call1((ians,))?,
))
} else if oid == &*TLS_FEATURE_OID {
let tls_feature_type_to_enum = py
.import("cryptography.x509.extensions")?
.getattr("_TLS_FEATURE_TYPE_TO_ENUM")?;
let features = pyo3::types::PyList::empty(py);
for feature in asn1::parse_single::<asn1::SequenceOf<'_, u64>>(ext_data)? {
let py_feature =
tls_feature_type_to_enum.get_item(feature.to_object(py))?;
features.append(py_feature)?;
}
Ok(Some(x509_module.getattr("TLSFeature")?.call1((features,))?))
} else if oid == &*SUBJECT_KEY_IDENTIFIER_OID {
let identifier = asn1::parse_single::<&[u8]>(ext_data)?;
Ok(Some(
x509_module
.getattr("SubjectKeyIdentifier")?
.call1((identifier,))?,
))
} else if oid == &*EXTENDED_KEY_USAGE_OID {
let ekus = pyo3::types::PyList::empty(py);
for oid in asn1::parse_single::<asn1::SequenceOf<'_, asn1::ObjectIdentifier<'_>>>(
ext_data,
)? {
let oid_obj =
x509_module.call_method1("ObjectIdentifier", (oid.to_string(),))?;
ekus.append(oid_obj)?;
}
Ok(Some(
x509_module.getattr("ExtendedKeyUsage")?.call1((ekus,))?,
))
} else if oid == &*KEY_USAGE_OID {
let kus = asn1::parse_single::<asn1::BitString<'_>>(ext_data)?;
let digital_signature = kus.has_bit_set(0);
let content_comitment = kus.has_bit_set(1);
let key_encipherment = kus.has_bit_set(2);
let data_encipherment = kus.has_bit_set(3);
let key_agreement = kus.has_bit_set(4);
let key_cert_sign = kus.has_bit_set(5);
let crl_sign = kus.has_bit_set(6);
let encipher_only = kus.has_bit_set(7);
let decipher_only = kus.has_bit_set(8);
Ok(Some(x509_module.getattr("KeyUsage")?.call1((
digital_signature,
content_comitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only,
))?))
} else if oid == &*AUTHORITY_INFORMATION_ACCESS_OID {
let ads = parse_access_descriptions(py, ext_data)?;
Ok(Some(
x509_module
.getattr("AuthorityInformationAccess")?
.call1((ads,))?,
))
} else if oid == &*SUBJECT_INFORMATION_ACCESS_OID {
let ads = parse_access_descriptions(py, ext_data)?;
Ok(Some(
x509_module
.getattr("SubjectInformationAccess")?
.call1((ads,))?,
))
} else if oid == &*CERTIFICATE_POLICIES_OID {
let cp = parse_cp(py, ext_data)?;
Ok(Some(
x509_module.call_method1("CertificatePolicies", (cp,))?,
))
} else if oid == &*POLICY_CONSTRAINTS_OID {
let pc = asn1::parse_single::<PolicyConstraints>(ext_data)?;
Ok(Some(x509_module.getattr("PolicyConstraints")?.call1((
pc.require_explicit_policy,
pc.inhibit_policy_mapping,
))?))
} else if oid == &*PRECERT_POISON_OID {
asn1::parse_single::<()>(ext_data)?;
Ok(Some(x509_module.getattr("PrecertPoison")?.call0()?))
} else if oid == &*OCSP_NO_CHECK_OID {
asn1::parse_single::<()>(ext_data)?;
Ok(Some(x509_module.getattr("OCSPNoCheck")?.call0()?))
} else if oid == &*INHIBIT_ANY_POLICY_OID {
let bignum = asn1::parse_single::<asn1::BigUint<'_>>(ext_data)?;
let pynum = big_asn1_uint_to_py(py, bignum)?;
Ok(Some(
x509_module.getattr("InhibitAnyPolicy")?.call1((pynum,))?,
))
} else if oid == &*BASIC_CONSTRAINTS_OID {
let bc = asn1::parse_single::<BasicConstraints>(ext_data)?;
Ok(Some(
x509_module
.getattr("BasicConstraints")?
.call1((bc.ca, bc.path_length))?,
))
} else if oid == &*AUTHORITY_KEY_IDENTIFIER_OID {
Ok(Some(parse_authority_key_identifier(py, ext_data)?))
} else if oid == &*CRL_DISTRIBUTION_POINTS_OID {
let dp = parse_distribution_points(py, ext_data)?;
Ok(Some(
x509_module.getattr("CRLDistributionPoints")?.call1((dp,))?,
))
} else if oid == &*FRESHEST_CRL_OID {
let dp = parse_distribution_points(py, ext_data)?;
Ok(Some(x509_module.getattr("FreshestCRL")?.call1((dp,))?))
} else if oid == &*PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS_OID {
let contents = asn1::parse_single::<&[u8]>(ext_data)?;
let scts = parse_scts(py, contents, LogEntryType::PreCertificate)?;
Ok(Some(
x509_module
.getattr("PrecertificateSignedCertificateTimestamps")?
.call1((scts,))?,
))
} else if oid == &*NAME_CONSTRAINTS_OID {
let nc = asn1::parse_single::<NameConstraints<'_>>(ext_data)?;
let permitted_subtrees = match nc.permitted_subtrees {
Some(data) => parse_general_subtrees(py, data)?,
None => py.None(),
};
let excluded_subtrees = match nc.excluded_subtrees {
Some(data) => parse_general_subtrees(py, data)?,
None => py.None(),
};
Ok(Some(
x509_module
.getattr("NameConstraints")?
.call1((permitted_subtrees, excluded_subtrees))?,
))
} else {
Ok(None)
}
},
)
}
// This getter exists for compatibility with pyOpenSSL and will be removed.
// DO NOT RELY ON IT. WE WILL BREAK YOU WHEN WE FEEL LIKE IT.
#[getter]
fn _x509<'p>(
slf: pyo3::pycell::PyRef<'_, Self>,
py: pyo3::Python<'p>,
) -> Result<&'p pyo3::PyAny, PyAsn1Error> {
let cryptography_warning = py.import("cryptography.utils")?.getattr("DeprecatedIn35")?;
let warnings = py.import("warnings")?;
warnings.call_method1(
"warn",
(
"This version of cryptography contains a temporary pyOpenSSL fallback path. Upgrade pyOpenSSL now.",
cryptography_warning,
),
)?;
let backend = py
.import("cryptography.hazmat.backends.openssl.backend")?
.getattr("backend")?;
Ok(backend.call_method1("_cert2ossl", (slf,))?)
}
}
fn cert_version(py: pyo3::Python<'_>, version: u8) -> Result<&pyo3::PyAny, PyAsn1Error> {
let x509_module = py.import("cryptography.x509")?;
match version {
0 => Ok(x509_module.getattr("Version")?.get_item("v1")?),
2 => Ok(x509_module.getattr("Version")?.get_item("v3")?),
_ => Err(PyAsn1Error::from(pyo3::PyErr::from_instance(
x509_module
.getattr("InvalidVersion")?
.call1((format!("{} is not a valid X509 version", version), version))?,
))),
}
}
#[pyo3::prelude::pyfunction]
fn load_pem_x509_certificate(py: pyo3::Python<'_>, data: &[u8]) -> PyAsn1Result<Certificate> {
let parsed = pem::parse(data)?;
if parsed.tag != "CERTIFICATE" {
return Err(PyAsn1Error::from(pyo3::exceptions::PyValueError::new_err(
"Valid PEM but no BEGIN CERTIFICATE/END CERTIFICATE delimiters. Are you sure this is a certificate?"
)));
}
load_der_x509_certificate(py, &parsed.contents)
}
#[pyo3::prelude::pyfunction]
fn load_der_x509_certificate(py: pyo3::Python<'_>, data: &[u8]) -> PyAsn1Result<Certificate> {
let raw = OwnedRawCertificate::try_new(data.to_vec(), |data| asn1::parse_single(data))?;
// Parse cert version immediately so we can raise error on parse if it is invalid.
cert_version(py, raw.borrow_value().tbs_cert.version)?;
Ok(Certificate {
raw,
cached_extensions: None,
})
}
pub(crate) fn parse_and_cache_extensions<
'p,
F: Fn(&asn1::ObjectIdentifier<'_>, &[u8]) -> Result<Option<&'p pyo3::PyAny>, PyAsn1Error>,
@ -848,7 +1344,7 @@ pub(crate) fn parse_scts(
}
#[pyo3::prelude::pyfunction]
fn parse_x509_extension(py: pyo3::Python<'_>, der_oid: &[u8], ext_data: &[u8]) -> PyAsn1Result {
fn parse_csr_extension(py: pyo3::Python<'_>, der_oid: &[u8], ext_data: &[u8]) -> PyAsn1Result {
let oid = asn1::ObjectIdentifier::from_der(der_oid).unwrap();
let x509_module = py.import("cryptography.x509")?;
@ -945,9 +1441,6 @@ fn parse_x509_extension(py: pyo3::Python<'_>, der_oid: &[u8], ext_data: &[u8]) -
.getattr("PolicyConstraints")?
.call1((pc.require_explicit_policy, pc.inhibit_policy_mapping))?
.to_object(py))
} else if oid == *PRECERT_POISON_OID {
asn1::parse_single::<()>(ext_data)?;
Ok(x509_module.getattr("PrecertPoison")?.call0()?.to_object(py))
} else if oid == *OCSP_NO_CHECK_OID {
asn1::parse_single::<()>(ext_data)?;
Ok(x509_module.getattr("OCSPNoCheck")?.call0()?.to_object(py))
@ -977,13 +1470,6 @@ fn parse_x509_extension(py: pyo3::Python<'_>, der_oid: &[u8], ext_data: &[u8]) -
.getattr("FreshestCRL")?
.call1((parse_distribution_points(py, ext_data)?,))?
.to_object(py))
} else if oid == *PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS_OID {
let contents = asn1::parse_single::<&[u8]>(ext_data)?;
let scts = parse_scts(py, contents, LogEntryType::PreCertificate)?;
Ok(x509_module
.getattr("PrecertificateSignedCertificateTimestamps")?
.call1((scts,))?
.to_object(py))
} else if oid == *NAME_CONSTRAINTS_OID {
let nc = asn1::parse_single::<NameConstraints<'_>>(ext_data)?;
let permitted_subtrees = match nc.permitted_subtrees {
@ -1116,12 +1602,15 @@ fn parse_crl_extension(py: pyo3::Python<'_>, der_oid: &[u8], ext_data: &[u8]) ->
pub(crate) fn create_submodule(py: pyo3::Python<'_>) -> pyo3::PyResult<&pyo3::prelude::PyModule> {
let submod = pyo3::prelude::PyModule::new(py, "x509")?;
submod.add_wrapped(pyo3::wrap_pyfunction!(parse_x509_extension))?;
submod.add_wrapped(pyo3::wrap_pyfunction!(load_der_x509_certificate))?;
submod.add_wrapped(pyo3::wrap_pyfunction!(load_pem_x509_certificate))?;
submod.add_wrapped(pyo3::wrap_pyfunction!(parse_csr_extension))?;
submod.add_wrapped(pyo3::wrap_pyfunction!(parse_crl_entry_ext))?;
submod.add_wrapped(pyo3::wrap_pyfunction!(parse_crl_extension))?;
submod.add_wrapped(pyo3::wrap_pyfunction!(
encode_precertificate_signed_certificate_timestamps
))?;
submod.add_class::<Certificate>()?;
submod.add_class::<Sct>()?;
Ok(submod)

View file

@ -11,7 +11,7 @@ import textwrap
import pytest
from cryptography import x509
from cryptography import utils, x509
from cryptography.exceptions import InternalError, _Reasons
from cryptography.hazmat.backends.openssl import decode_asn1, encode_asn1
from cryptography.hazmat.backends.openssl.backend import Backend, backend
@ -609,30 +609,6 @@ class TestRSAPEMSerialization(object):
)
class TestGOSTCertificate(object):
def test_numeric_string_x509_name_entry(self):
cert = _load_cert(
os.path.join("x509", "e-trust.ru.der"),
x509.load_der_x509_certificate,
backend,
)
if backend._lib.CRYPTOGRAPHY_IS_LIBRESSL:
with pytest.raises(ValueError) as exc:
cert.subject
# We assert on the message in this case because if the certificate
# fails to load it will also raise a ValueError and this test could
# erroneously pass.
assert str(exc.value) == "Unsupported ASN1 string type. Type: 18"
else:
assert (
cert.subject.get_attributes_for_oid(
x509.ObjectIdentifier("1.2.643.3.131.1.1")
)[0].value
== "007710474375"
)
@pytest.mark.skipif(
backend._lib.Cryptography_HAS_EVP_PKEY_DHX == 1,
reason="Requires OpenSSL without EVP_PKEY_DHX (< 1.0.2)",
@ -709,3 +685,19 @@ class TestOpenSSLDHSerialization(object):
)
with pytest.raises(ValueError):
loader_func(key_bytes, backend)
def test_pyopenssl_cert_fallback():
cert = _load_cert(
os.path.join("x509", "cryptography.io.pem"),
x509.load_pem_x509_certificate,
)
x509_ossl = None
with pytest.warns(utils.CryptographyDeprecationWarning):
x509_ossl = cert._x509
assert x509_ossl is not None
from cryptography.hazmat.backends.openssl.x509 import _Certificate
with pytest.warns(utils.CryptographyDeprecationWarning):
_Certificate(backend, x509_ossl)

View file

@ -104,8 +104,12 @@ def _pkcs7_verify(encoding, sig, msg, certs, options, backend):
store = backend._lib.X509_STORE_new()
backend.openssl_assert(store != backend._ffi.NULL)
store = backend._ffi.gc(store, backend._lib.X509_STORE_free)
# This list is to keep the x509 values alive until end of function
ossl_certs = []
for cert in certs:
res = backend._lib.X509_STORE_add_cert(store, cert._x509)
ossl_cert = backend._cert2ossl(cert)
ossl_certs.append(ossl_cert)
res = backend._lib.X509_STORE_add_cert(store, ossl_cert)
backend.openssl_assert(res == 1)
if msg is None:
res = backend._lib.PKCS7_verify(

View file

@ -60,7 +60,7 @@ class FakeGeneralName(object):
value = utils.read_only_property("_value")
def _load_cert(filename, loader, backend):
def _load_cert(filename, loader, backend=None):
cert = load_vectors_from_file(
filename=filename,
loader=lambda pemfile: loader(pemfile.read(), backend),
@ -645,12 +645,12 @@ class TestRSACertificate(object):
)
def test_negative_serial_number(self, backend):
cert = _load_cert(
with pytest.raises(ValueError, match="TbsCertificate::serial"):
_load_cert(
os.path.join("x509", "custom", "negative_serial.pem"),
x509.load_pem_x509_certificate,
backend,
)
assert cert.serial_number == -18008675309
def test_alternate_rsa_with_sha1_oid(self, backend):
cert = _load_cert(
@ -1039,6 +1039,20 @@ class TestRSACertificate(object):
assert cert != cert2
assert cert != object()
def test_ordering_unsupported(self, backend):
cert = _load_cert(
os.path.join("x509", "custom", "post2000utctime.pem"),
x509.load_pem_x509_certificate,
backend,
)
cert2 = _load_cert(
os.path.join("x509", "custom", "post2000utctime.pem"),
x509.load_pem_x509_certificate,
backend,
)
with pytest.raises(TypeError, match="cannot be ordered"):
cert > cert2
def test_hash(self, backend):
cert1 = _load_cert(
os.path.join("x509", "custom", "post2000utctime.pem"),
@ -1073,9 +1087,17 @@ class TestRSACertificate(object):
assert cert.version is x509.Version.v1
def test_invalid_pem(self, backend):
with pytest.raises(ValueError):
with pytest.raises(ValueError, match="Unable to load"):
x509.load_pem_x509_certificate(b"notacert", backend)
crl = load_vectors_from_file(
filename=os.path.join("x509", "custom", "crl_empty.pem"),
loader=lambda pemfile: pemfile.read(),
mode="rb",
)
with pytest.raises(ValueError, match="Valid PEM but no"):
x509.load_pem_x509_certificate(crl, backend)
def test_invalid_der(self, backend):
with pytest.raises(ValueError):
x509.load_der_x509_certificate(b"notacert", backend)
@ -1183,9 +1205,9 @@ class TestRSACertificate(object):
backend,
)
assert repr(cert) == (
"<Certificate(subject=<Name(OU=GT48742965,OU=See www.rapidssl.com"
"/resources/cps (c)14,OU=Domain Control Validated - RapidSSL(R),"
"CN=www.cryptography.io)>, ...)>"
"<Certificate(subject=<Name(2.5.4.11=GT48742965, 2.5.4.11=See www."
"rapidssl.com/resources/cps (c)14, 2.5.4.11=Domain Control Validat"
"ed - RapidSSL(R), 2.5.4.3=www.cryptography.io, )>, ...)>"
)
def test_parse_tls_feature_extension(self, backend):
@ -3204,6 +3226,7 @@ class TestCertificateBuilder(object):
subject_private_key.public_key()
)
# Cert
cert = (
x509.CertificateBuilder()
.subject_name(
@ -3224,6 +3247,19 @@ class TestCertificateBuilder(object):
assert ext.critical is False
assert ext.value == add_ext
# CSR
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(
x509.Name([x509.NameAttribute(NameOID.COUNTRY_NAME, "US")])
)
.add_extension(add_ext, False)
.sign(subject_private_key, hashes.SHA256())
)
ext = csr.extensions.get_extension_for_class(type(add_ext))
assert ext.critical is False
assert ext.value == add_ext
def test_build_ca_request_with_path_length_none(self, backend):
private_key = RSA_KEY_2048.private_key(backend)
@ -4178,6 +4214,20 @@ class TestDSACertificateRequest(object):
)
class TestGOSTCertificate(object):
def test_numeric_string_x509_name_entry(self):
cert = _load_cert(
os.path.join("x509", "e-trust.ru.der"),
x509.load_der_x509_certificate,
)
assert (
cert.subject.get_attributes_for_oid(
x509.ObjectIdentifier("1.2.643.3.131.1.1")
)[0].value
== "007710474375"
)
class TestECDSACertificate(object):
def test_load_ecdsa_cert(self, backend):
_skip_curve_unsupported(backend, ec.SECP384R1())
@ -4348,15 +4398,13 @@ class TestOtherCertificate(object):
cert.public_key()
def test_bad_time_in_validity(self, backend):
cert = _load_cert(
with pytest.raises(ValueError, match="Validity::not_after"):
_load_cert(
os.path.join("x509", "badasn1time.pem"),
x509.load_pem_x509_certificate,
backend,
)
with pytest.raises(ValueError, match="19020701025736Z"):
cert.not_valid_after
class TestNameAttribute(object):
EXPECTED_TYPES = [