From 085d1e44c6fe0480a4f67c34caf747943f4a2f20 Mon Sep 17 00:00:00 2001 From: Paul Kehrer Date: Sun, 25 Oct 2020 06:11:48 -0700 Subject: [PATCH] allow additional certificates to be added to a pkcs7 (#5498) * allow additional certificates to be added to a pkcs7 * be more verbose about what these additional certs might be used for * missing test --- .../primitives/asymmetric/serialization.rst | 8 +++ .../hazmat/backends/openssl/backend.py | 11 ++++- .../hazmat/primitives/serialization/pkcs7.py | 11 ++++- tests/hazmat/primitives/test_pkcs7.py | 49 +++++++++++++++++++ 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/docs/hazmat/primitives/asymmetric/serialization.rst b/docs/hazmat/primitives/asymmetric/serialization.rst index cc6d2bae5..bb278d8e1 100644 --- a/docs/hazmat/primitives/asymmetric/serialization.rst +++ b/docs/hazmat/primitives/asymmetric/serialization.rst @@ -647,6 +647,14 @@ contain certificates, CRLs, and much more. PKCS7 files commonly have a ``p7b``, :class:`~cryptography.hazmat.primitives.hashes.SHA384`, or :class:`~cryptography.hazmat.primitives.hashes.SHA512`. + .. method:: add_certificate(certificate) + + Add an additional certificate (typically used to help build a + verification chain) to the PKCS7 structure. This method may + be called multiple times to add as many certificates as desired. + + :param certificate: The :class:`~cryptography.x509.Certificate` to add. + .. method:: sign(encoding, options, backend=None) :param encoding: :attr:`~cryptography.hazmat.primitives.serialization.Encoding.PEM`, diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index 81e4289e8..76600dc08 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -2695,6 +2695,15 @@ class Backend(object): init_flags = self._lib.PKCS7_PARTIAL final_flags = 0 + if len(builder._additional_certs) == 0: + certs = self._ffi.NULL + else: + certs = self._lib.sk_X509_new_null() + certs = self._ffi.gc(certs, self._lib.sk_X509_free) + for cert in builder._additional_certs: + res = self._lib.sk_X509_push(certs, cert._x509) + self.openssl_assert(res >= 1) + if pkcs7.PKCS7Options.DetachedSignature in options: # Don't embed the data in the PKCS7 structure init_flags |= self._lib.PKCS7_DETACHED @@ -2705,7 +2714,7 @@ class Backend(object): p7 = self._lib.PKCS7_sign( self._ffi.NULL, self._ffi.NULL, - self._ffi.NULL, + certs, self._ffi.NULL, init_flags, ) diff --git a/src/cryptography/hazmat/primitives/serialization/pkcs7.py b/src/cryptography/hazmat/primitives/serialization/pkcs7.py index b33cb7094..275d77083 100644 --- a/src/cryptography/hazmat/primitives/serialization/pkcs7.py +++ b/src/cryptography/hazmat/primitives/serialization/pkcs7.py @@ -24,9 +24,10 @@ def load_der_pkcs7_certificates(data): class PKCS7SignatureBuilder(object): - def __init__(self, data=None, signers=[]): + def __init__(self, data=None, signers=[], additional_certs=[]): self._data = data self._signers = signers + self._additional_certs = additional_certs def set_data(self, data): _check_byteslike("data", data) @@ -63,6 +64,14 @@ class PKCS7SignatureBuilder(object): self._signers + [(certificate, private_key, hash_algorithm)], ) + def add_certificate(self, certificate): + if not isinstance(certificate, x509.Certificate): + raise TypeError("certificate must be a x509.Certificate") + + return PKCS7SignatureBuilder( + self._data, self._signers, self._additional_certs + [certificate] + ) + def sign(self, encoding, options, backend=None): if len(self._signers) == 0: raise ValueError("Must have at least one signer") diff --git a/tests/hazmat/primitives/test_pkcs7.py b/tests/hazmat/primitives/test_pkcs7.py index f60467c29..03a928352 100644 --- a/tests/hazmat/primitives/test_pkcs7.py +++ b/tests/hazmat/primitives/test_pkcs7.py @@ -607,3 +607,52 @@ class TestPKCS7Builder(object): options, backend, ) + + def test_add_additional_cert_not_a_cert(self, backend): + with pytest.raises(TypeError): + pkcs7.PKCS7SignatureBuilder().add_certificate(b"notacert") + + def test_add_additional_cert(self, backend): + data = b"hello world" + cert, key = _load_cert_key() + rsa_cert = load_vectors_from_file( + os.path.join("x509", "custom", "ca", "rsa_ca.pem"), + loader=lambda pemfile: x509.load_pem_x509_certificate( + pemfile.read() + ), + mode="rb", + ) + builder = ( + pkcs7.PKCS7SignatureBuilder() + .set_data(data) + .add_signer(cert, key, hashes.SHA384()) + .add_certificate(rsa_cert) + ) + options = [] + sig = builder.sign(serialization.Encoding.DER, options) + assert ( + sig.count(rsa_cert.public_bytes(serialization.Encoding.DER)) == 1 + ) + + def test_add_multiple_additional_certs(self, backend): + data = b"hello world" + cert, key = _load_cert_key() + rsa_cert = load_vectors_from_file( + os.path.join("x509", "custom", "ca", "rsa_ca.pem"), + loader=lambda pemfile: x509.load_pem_x509_certificate( + pemfile.read() + ), + mode="rb", + ) + builder = ( + pkcs7.PKCS7SignatureBuilder() + .set_data(data) + .add_signer(cert, key, hashes.SHA384()) + .add_certificate(rsa_cert) + .add_certificate(rsa_cert) + ) + options = [] + sig = builder.sign(serialization.Encoding.DER, options) + assert ( + sig.count(rsa_cert.public_bytes(serialization.Encoding.DER)) == 2 + )