Add support for encrypting S/MIME messages (#10889)

* Add support for encrypting S/MIME messages

* Move PKCS7 decrypt test function to Rust

* Use symmetric encryption function from PKCS12

* Remove debug file write from tests

* Remove unneeded backend parameter

* docs and changelog
This commit is contained in:
Facundo Tuesca 2024-07-18 17:52:09 +02:00 committed by GitHub
parent ccb3a3277c
commit 0faaffc2f5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 632 additions and 9 deletions

View file

@ -66,6 +66,8 @@ Changelog
* :class:`~cryptography.x509.NameAttribute` now raises an exception when
attempting to create a common name whose length is shorter or longer than
:rfc:`5280` permits.
* Added basic support for PKCS7 encryption (including SMIME) via
:class:`~cryptography.hazmat.primitives.serialization.pkcs7.PKCS7EnvelopeBuilder`.
.. _v42-0-8:

View file

@ -1095,6 +1095,37 @@ contain certificates, CRLs, and much more. PKCS7 files commonly have a ``p7b``,
-----END CERTIFICATE-----
""".strip()
ca_cert_rsa = b"""
-----BEGIN CERTIFICATE-----
MIIExzCCAq+gAwIBAgIJAOcS06ClbtbJMA0GCSqGSIb3DQEBCwUAMBoxGDAWBgNV
BAMMD2NyeXB0b2dyYXBoeSBDQTAeFw0yMDA5MTQyMTQwNDJaFw00ODAxMzEyMTQw
NDJaMBoxGDAWBgNVBAMMD2NyeXB0b2dyYXBoeSBDQTCCAiIwDQYJKoZIhvcNAQEB
BQADggIPADCCAgoCggIBANBIheRc1HT4MzV5GvUbDk9CFU6DTomRApNqRmizriRq
m6OY4Ht3d71BXog6/IBkqAnZ4/XJQ40G4sVDb52k11oPvfJ/F5pc+6UqPBL+QGzY
GkJoubAqXFpI6ow0qayFNQLv0T9o4yh0QQOoGvgCmv91qmitLrZNXu4U9S76G+Di
GST+QyMkMxj+VsGRsRRBufV1urcnvFWjU6Q2+cr2cp0mMAG96NTyIskYiJ8vL03W
z4DX4klO4X47fPmDnU/OMn4SbvMZ896j1L0J04S+uVThTkxQWcFcqXhX5qM8kzcj
JUmybFlbf150j3WiucW48K/j7fJ0x9q3iUo4Gva0coScglJWcgo/BBCwFDw8NVba
7npxSRMiaS3qTv0dEFcRnvByc+7hyGxxlWdTE9tHisUI1eZVk9P9ziqNOZKscY8Z
X1+/C4M9X69Y7A8I74F5dO27IRycEgOrSo2z1NhfSwbqJr9a2TBtRsFinn8rjKBI
zNn0E5p9jO1WjxtkcjHfXXpLN8FFMvoYI9l/K+ZWDm9sboaF8jrgozSc004AFemA
H79mmCGVRKXn1vDAo4DLC6p3NiBFYQcYbW9V+beGD6srsF6xJtuY/UwtPROLWSzu
CCrZ/4BlmpNsR0ehIFFvzEKjX6rR2yp3YKlguDbMBMKMpfSGxAFwcZ7OiaxR20UH
AgMBAAGjEDAOMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggIBADSveDS4
y2V/N6Li2n9ChGNdCMr/45M0cl+GpL55aA36AWYMRLv0wip7MWV3yOj4mkjGBlTE
awKHH1FtetsE6B4a7M2hHhOXyXE60uUdptEx6ckGrJ1iyqu5cQUX1P+VnXbmOxfF
bl+Ugzjbgirx239rA4ezkDRuOvKcCbDOFV/gw3ZHfJ/IQeRXIQRl/y51wcnFUvFM
JEESYiijeDbEcY8r1/phmVQL0CO7WLMmTxlFj4X/TR3MTZWJQIap9GiLs5+n3QiO
jsZ3GuFOomB8oTebYkXniwbNu5hgLP/seRQzGA7B9VDZryAhCtvGgjtQh0eW2Qxt
sgmDJGOPKnKT3O5U0v3+IPLEYpe8JSzgAhhh6H1rAJRUNwP2gRcO4eOUJSkdl218
fRNT0ILzosuWxwprER9ciMQF8q0JJKMhcfHRMH0S5mWVJAIkj68KY05oCy2zNyYa
oruopKSWXe0Bzr40znm40P7xIkui2BGQMlDPpbCaEfLsLqyctfbdmMlxac/QgIfY
TltrbqmI3MNy5uqGViGFpWPCB+kD8EsJF9nlKJXlu/i55qgUr/2/2CdeWlZDBP8A
1fdzmpYpWnwhE0KobzLS2z3AwDxiY/RSWUfypLZA0K/lpaEtYB6UHMDZ0/8WqgZV
gNucCuty0cA4Kf7eX1TlAKVwH8hTkVmJc2rX
-----END CERTIFICATE-----
""".strip()
.. class:: PKCS7SignatureBuilder
@ -1174,11 +1205,72 @@ contain certificates, CRLs, and much more. PKCS7 files commonly have a ``p7b``,
:returns bytes: The signed PKCS7 message.
.. class:: PKCS7EnvelopeBuilder
The PKCS7 envelope builder can create encrypted S/MIME messages,
which are commonly used in email. S/MIME has multiple versions,
but this implements a subset of :rfc:`5751`, also known as S/MIME
Version 3.2.
.. versionadded:: 43.0.0
.. doctest::
>>> from cryptography import x509
>>> from cryptography.hazmat.primitives import serialization
>>> from cryptography.hazmat.primitives.serialization import pkcs7
>>> cert = x509.load_pem_x509_certificate(ca_cert_rsa)
>>> options = [pkcs7.PKCS7Options.Text]
>>> pkcs7.PKCS7EnvelopeBuilder().set_data(
... b"data to encrypt"
... ).add_recipient(
... cert
... ).encrypt(
... serialization.Encoding.SMIME, options
... )
b'...'
.. method:: set_data(data)
:param data: The data to be encrypted.
:type data: :term:`bytes-like`
.. method:: add_recipient(certificate)
Add a recipient for the message. Recipients will be able to use their private keys
to decrypt the message. This method may be called multiple times to add as many recipients
as desired.
:param certificate: A :class:`~cryptography.x509.Certificate` for an intended
recipient of the encrypted message. Only certificates with public RSA keys
are currently supported.
.. method:: encrypt(encoding, options)
The message is encrypted using AES-128-CBC. The encryption key used is included in
the envelope, encrypted using the recipient's public RSA key. If multiple recipients
are specified, the key is encrypted once with each recipient's public key, and all
encrypted keys are included in the envelope (one per recipient).
:param encoding: :attr:`~cryptography.hazmat.primitives.serialization.Encoding.PEM`,
:attr:`~cryptography.hazmat.primitives.serialization.Encoding.DER`,
or :attr:`~cryptography.hazmat.primitives.serialization.Encoding.SMIME`.
:param options: A list of
:class:`~cryptography.hazmat.primitives.serialization.pkcs7.PKCS7Options`. For
this operation only
:attr:`~cryptography.hazmat.primitives.serialization.pkcs7.PKCS7Options.Text` and
:attr:`~cryptography.hazmat.primitives.serialization.pkcs7.PKCS7Options.Binary`
are supported.
:returns bytes: The enveloped PKCS7 message.
.. class:: PKCS7Options
.. versionadded:: 3.2
An enumeration of options for PKCS7 signature creation.
An enumeration of options for PKCS7 signature and envelope creation.
.. attribute:: Text

View file

@ -12,6 +12,11 @@ def serialize_certificates(
certs: list[x509.Certificate],
encoding: serialization.Encoding,
) -> bytes: ...
def encrypt_and_serialize(
builder: pkcs7.PKCS7EnvelopeBuilder,
encoding: serialization.Encoding,
options: typing.Iterable[pkcs7.PKCS7Options],
) -> bytes: ...
def sign_and_serialize(
builder: pkcs7.PKCS7SignatureBuilder,
encoding: serialization.Encoding,

View file

@ -13,6 +13,13 @@ class TestCertificate:
subject_value_tags: list[int]
def test_parse_certificate(data: bytes) -> TestCertificate: ...
def pkcs7_decrypt(
encoding: serialization.Encoding,
msg: bytes,
pkey: serialization.pkcs7.PKCS7PrivateKeyTypes,
cert_recipient: x509.Certificate,
options: list[pkcs7.PKCS7Options],
) -> bytes: ...
def pkcs7_verify(
encoding: serialization.Encoding,
sig: bytes,

View file

@ -12,6 +12,7 @@ import io
import typing
from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.bindings._rust import pkcs7 as rust_pkcs7
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
@ -177,7 +178,92 @@ class PKCS7SignatureBuilder:
return rust_pkcs7.sign_and_serialize(self, encoding, options)
def _smime_encode(
class PKCS7EnvelopeBuilder:
def __init__(
self,
*,
_data: bytes | None = None,
_recipients: list[x509.Certificate] | None = None,
):
from cryptography.hazmat.backends.openssl.backend import (
backend as ossl,
)
if not ossl.rsa_encryption_supported(padding=padding.PKCS1v15()):
raise UnsupportedAlgorithm(
"RSA with PKCS1 v1.5 padding is not supported by this version"
" of OpenSSL.",
_Reasons.UNSUPPORTED_PADDING,
)
self._data = _data
self._recipients = _recipients if _recipients is not None else []
def set_data(self, data: bytes) -> PKCS7EnvelopeBuilder:
_check_byteslike("data", data)
if self._data is not None:
raise ValueError("data may only be set once")
return PKCS7EnvelopeBuilder(_data=data, _recipients=self._recipients)
def add_recipient(
self,
certificate: x509.Certificate,
) -> PKCS7EnvelopeBuilder:
if not isinstance(certificate, x509.Certificate):
raise TypeError("certificate must be a x509.Certificate")
if not isinstance(certificate.public_key(), rsa.RSAPublicKey):
raise TypeError("Only RSA keys are supported at this time.")
return PKCS7EnvelopeBuilder(
_data=self._data,
_recipients=[
*self._recipients,
certificate,
],
)
def encrypt(
self,
encoding: serialization.Encoding,
options: typing.Iterable[PKCS7Options],
) -> bytes:
if len(self._recipients) == 0:
raise ValueError("Must have at least one recipient")
if self._data is None:
raise ValueError("You must add data to encrypt")
options = list(options)
if not all(isinstance(x, PKCS7Options) for x in options):
raise ValueError("options must be from the PKCS7Options enum")
if encoding not in (
serialization.Encoding.PEM,
serialization.Encoding.DER,
serialization.Encoding.SMIME,
):
raise ValueError(
"Must be PEM, DER, or SMIME from the Encoding enum"
)
# Only allow options that make sense for encryption
if any(
opt not in [PKCS7Options.Text, PKCS7Options.Binary]
for opt in options
):
raise ValueError(
"Only the following options are supported for encryption: "
"Text, Binary"
)
elif PKCS7Options.Text in options and PKCS7Options.Binary in options:
# OpenSSL accepts both options at the same time, but ignores Text.
# We fail defensively to avoid unexpected outputs.
raise ValueError(
"Cannot use Binary and Text options at the same time"
)
return rust_pkcs7.encrypt_and_serialize(self, encoding, options)
def _smime_signed_encode(
data: bytes, signature: bytes, micalg: str, text_mode: bool
) -> bytes:
# This function works pretty hard to replicate what OpenSSL does
@ -225,6 +311,23 @@ def _smime_encode(
return fp.getvalue()
def _smime_enveloped_encode(data: bytes) -> bytes:
m = email.message.Message()
m.add_header("MIME-Version", "1.0")
m.add_header("Content-Disposition", "attachment", filename="smime.p7m")
m.add_header(
"Content-Type",
"application/pkcs7-mime",
smime_type="enveloped-data",
name="smime.p7m",
)
m.add_header("Content-Transfer-Encoding", "base64")
m.set_payload(email.base64mime.body_encode(data, maxlinelen=65))
return m.as_bytes(policy=m.policy.clone(linesep="\n", max_line_length=0))
class OpenSSLMimePart(email.message.MIMEPart):
# A MIMEPart subclass that replicates OpenSSL's behavior of not including
# a newline if there are no headers.

View file

@ -136,6 +136,16 @@ pub enum AlgorithmParameters<'a> {
#[defined_by(oid::HMAC_WITH_SHA256_OID)]
HmacWithSha256(asn1::Null),
// Used only in PKCS#7 AlgorithmIdentifiers
// https://datatracker.ietf.org/doc/html/rfc3565#section-4.1
//
// From RFC 3565 section 4.1:
// The AlgorithmIdentifier parameters field MUST be present, and the
// parameters field MUST contain a AES-IV:
//
// AES-IV ::= OCTET STRING (SIZE(16))
#[defined_by(oid::AES_128_CBC_OID)]
Aes128Cbc([u8; 16]),
#[defined_by(oid::AES_256_CBC_OID)]
Aes256Cbc([u8; 16]),

View file

@ -6,6 +6,7 @@ use crate::{certificate, common, csr, name};
pub const PKCS7_DATA_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 7, 1);
pub const PKCS7_SIGNED_DATA_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 7, 2);
pub const PKCS7_ENVELOPED_DATA_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 7, 3);
pub const PKCS7_ENCRYPTED_DATA_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 7, 6);
#[derive(asn1::Asn1Write)]
@ -18,6 +19,8 @@ pub struct ContentInfo<'a> {
#[derive(asn1::Asn1DefinedByWrite)]
pub enum Content<'a> {
#[defined_by(PKCS7_ENVELOPED_DATA_OID)]
EnvelopedData(asn1::Explicit<Box<EnvelopedData<'a>>, 0>),
#[defined_by(PKCS7_SIGNED_DATA_OID)]
SignedData(asn1::Explicit<Box<SignedData<'a>>, 0>),
#[defined_by(PKCS7_DATA_OID)]
@ -56,6 +59,21 @@ pub struct SignerInfo<'a> {
pub unauthenticated_attributes: Option<csr::Attributes<'a>>,
}
#[derive(asn1::Asn1Write)]
pub struct EnvelopedData<'a> {
pub version: u8,
pub recipient_infos: asn1::SetOfWriter<'a, RecipientInfo<'a>>,
pub encrypted_content_info: EncryptedContentInfo<'a>,
}
#[derive(asn1::Asn1Write)]
pub struct RecipientInfo<'a> {
pub version: u8,
pub issuer_and_serial_number: IssuerAndSerialNumber<'a>,
pub key_encryption_algorithm: common::AlgorithmIdentifier<'a>,
pub encrypted_key: &'a [u8],
}
#[derive(asn1::Asn1Write)]
pub struct IssuerAndSerialNumber<'a> {
pub issuer: name::Name<'a>,

View file

@ -79,7 +79,7 @@ impl PKCS12Certificate {
}
}
fn symmetric_encrypt(
pub(crate) fn symmetric_encrypt(
py: pyo3::Python<'_>,
algorithm: pyo3::Bound<'_, pyo3::PyAny>,
mode: pyo3::Bound<'_, pyo3::PyAny>,

View file

@ -6,7 +6,9 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::Deref;
use cryptography_x509::common::{AlgorithmIdentifier, AlgorithmParameters};
use cryptography_x509::csr::Attribute;
use cryptography_x509::pkcs7::PKCS7_DATA_OID;
use cryptography_x509::{common, oid, pkcs7};
use once_cell::sync::Lazy;
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
@ -18,6 +20,7 @@ use pyo3::IntoPy;
use crate::asn1::encode_der_data;
use crate::buf::CffiBuf;
use crate::error::{CryptographyError, CryptographyResult};
use crate::pkcs12::symmetric_encrypt;
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
use crate::x509::certificate::load_der_x509_certificate;
use crate::{exceptions, types, x509};
@ -75,6 +78,90 @@ fn serialize_certificates<'p>(
encode_der_data(py, "PKCS7".to_string(), content_info_bytes, encoding)
}
#[pyo3::pyfunction]
fn encrypt_and_serialize<'p>(
py: pyo3::Python<'p>,
builder: &pyo3::Bound<'p, pyo3::PyAny>,
encoding: &pyo3::Bound<'p, pyo3::PyAny>,
options: &pyo3::Bound<'p, pyo3::types::PyList>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
let raw_data: CffiBuf<'p> = builder.getattr(pyo3::intern!(py, "_data"))?.extract()?;
let text_mode = options.contains(types::PKCS7_TEXT.get(py)?)?;
let data_with_header = if options.contains(types::PKCS7_BINARY.get(py)?)? {
Cow::Borrowed(raw_data.as_bytes())
} else {
smime_canonicalize(raw_data.as_bytes(), text_mode).0
};
// The message is encrypted with AES-128-CBC, which the S/MIME v3.2 RFC
// specifies as MUST support (https://datatracker.ietf.org/doc/html/rfc5751#section-2.7)
let key = types::OS_URANDOM.get(py)?.call1((16,))?;
let aes128_algorithm = types::AES128.get(py)?.call1((&key,))?;
let iv = types::OS_URANDOM.get(py)?.call1((16,))?;
let cbc_mode = types::CBC.get(py)?.call1((&iv,))?;
let encrypted_content = symmetric_encrypt(py, aes128_algorithm, cbc_mode, &data_with_header)?;
let py_recipients: Vec<pyo3::Bound<'p, x509::certificate::Certificate>> = builder
.getattr(pyo3::intern!(py, "_recipients"))?
.extract()?;
let mut recipient_infos = vec![];
let padding = types::PKCS1V15.get(py)?.call0()?;
let ka_bytes = cryptography_keepalive::KeepAlive::new();
for cert in py_recipients.iter() {
// Currently, keys are encrypted with RSA (PKCS #1 v1.5), which the S/MIME v3.2 RFC
// specifies as MUST support (https://datatracker.ietf.org/doc/html/rfc5751#section-2.3)
let encrypted_key = cert
.call_method0(pyo3::intern!(py, "public_key"))?
.call_method1(pyo3::intern!(py, "encrypt"), (&key, &padding))?
.extract::<pyo3::pybacked::PyBackedBytes>()?;
recipient_infos.push(pkcs7::RecipientInfo {
version: 0,
issuer_and_serial_number: pkcs7::IssuerAndSerialNumber {
issuer: cert.get().raw.borrow_dependent().tbs_cert.issuer.clone(),
serial_number: cert.get().raw.borrow_dependent().tbs_cert.serial,
},
key_encryption_algorithm: AlgorithmIdentifier {
oid: asn1::DefinedByMarker::marker(),
params: AlgorithmParameters::Rsa(Some(())),
},
encrypted_key: ka_bytes.add(encrypted_key),
});
}
let enveloped_data = pkcs7::EnvelopedData {
version: 0,
recipient_infos: asn1::SetOfWriter::new(&recipient_infos),
encrypted_content_info: pkcs7::EncryptedContentInfo {
content_type: PKCS7_DATA_OID,
content_encryption_algorithm: AlgorithmIdentifier {
oid: asn1::DefinedByMarker::marker(),
params: AlgorithmParameters::Aes128Cbc(iv.extract()?),
},
encrypted_content: Some(&encrypted_content),
},
};
let content_info = pkcs7::ContentInfo {
_content_type: asn1::DefinedByMarker::marker(),
content: pkcs7::Content::EnvelopedData(asn1::Explicit::new(Box::new(enveloped_data))),
};
let ci_bytes = asn1::write_single(&content_info)?;
if encoding.is(&types::ENCODING_SMIME.get(py)?) {
Ok(types::SMIME_ENVELOPED_ENCODE
.get(py)?
.call1((&*ci_bytes,))?
.extract()?)
} else {
// Handles the DER, PEM, and error cases
encode_der_data(py, "PKCS7".to_string(), ci_bytes, encoding)
}
}
#[pyo3::pyfunction]
fn sign_and_serialize<'p>(
py: pyo3::Python<'p>,
@ -256,7 +343,7 @@ fn sign_and_serialize<'p>(
.map(|d| OIDS_TO_MIC_NAME[&d.oid()])
.collect::<Vec<_>>()
.join(",");
Ok(types::SMIME_ENCODE
Ok(types::SMIME_SIGNED_ENCODE
.get(py)?
.call1((&*data_without_header, &*ci_bytes, mic_algs, text_mode))?
.extract()?)
@ -412,8 +499,8 @@ fn load_der_pkcs7_certificates<'p>(
pub(crate) mod pkcs7_mod {
#[pymodule_export]
use super::{
load_der_pkcs7_certificates, load_pem_pkcs7_certificates, serialize_certificates,
sign_and_serialize,
encrypt_and_serialize, load_der_pkcs7_certificates, load_pem_pkcs7_certificates,
serialize_certificates, sign_and_serialize,
};
}

View file

@ -103,8 +103,55 @@ fn pkcs7_verify(
Ok(())
}
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
#[pyo3::pyfunction]
#[pyo3(signature = (encoding, msg, pkey, cert_recipient, options))]
fn pkcs7_decrypt<'p>(
py: pyo3::Python<'p>,
encoding: pyo3::Bound<'p, pyo3::PyAny>,
msg: CffiBuf<'p>,
pkey: pyo3::Bound<'p, pyo3::PyAny>,
cert_recipient: pyo3::Bound<'p, PyCertificate>,
options: pyo3::Bound<'p, pyo3::types::PyList>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
let p7 = if encoding.is(&types::ENCODING_DER.get(py)?) {
openssl::pkcs7::Pkcs7::from_der(msg.as_bytes())?
} else if encoding.is(&types::ENCODING_PEM.get(py)?) {
openssl::pkcs7::Pkcs7::from_pem(msg.as_bytes())?
} else {
openssl::pkcs7::Pkcs7::from_smime(msg.as_bytes())?.0
};
let mut flags = openssl::pkcs7::Pkcs7Flags::empty();
if options.contains(types::PKCS7_TEXT.get(py)?)? {
flags |= openssl::pkcs7::Pkcs7Flags::TEXT;
}
let cert_der = asn1::write_single(cert_recipient.get().raw.borrow_dependent())?;
let cert_ossl = openssl::x509::X509::from_der(&cert_der)?;
let der = types::ENCODING_DER.get(py)?;
let pkcs8 = types::PRIVATE_FORMAT_PKCS8.get(py)?;
let no_encryption = types::NO_ENCRYPTION.get(py)?.call0()?;
let pkey_bytes = pkey
.call_method1(
pyo3::intern!(py, "private_bytes"),
(der, pkcs8, no_encryption),
)?
.extract::<pyo3::pybacked::PyBackedBytes>()?;
let pkey_ossl = openssl::pkey::PKey::private_key_from_der(&pkey_bytes)?;
let result = p7.decrypt(&pkey_ossl, &cert_ossl, flags)?;
Ok(pyo3::types::PyBytes::new_bound(py, &result))
}
#[pyo3::pymodule]
pub(crate) mod test_support {
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
#[pymodule_export]
use super::pkcs7_decrypt;
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
#[pymodule_export]
use super::pkcs7_verify;

View file

@ -339,9 +339,14 @@ pub static PKCS7_DETACHED_SIGNATURE: LazyPyImport = LazyPyImport::new(
&["PKCS7Options", "DetachedSignature"],
);
pub static SMIME_ENCODE: LazyPyImport = LazyPyImport::new(
pub static SMIME_ENVELOPED_ENCODE: LazyPyImport = LazyPyImport::new(
"cryptography.hazmat.primitives.serialization.pkcs7",
&["_smime_encode"],
&["_smime_enveloped_encode"],
);
pub static SMIME_SIGNED_ENCODE: LazyPyImport = LazyPyImport::new(
"cryptography.hazmat.primitives.serialization.pkcs7",
&["_smime_signed_encode"],
);
pub static PKCS12KEYANDCERTIFICATES: LazyPyImport = LazyPyImport::new(

View file

@ -117,7 +117,7 @@ def _load_cert_key():
only_if=lambda backend: backend.pkcs7_supported(),
skip_message="Requires OpenSSL with PKCS7 support",
)
class TestPKCS7Builder:
class TestPKCS7SignatureBuilder:
def test_invalid_data(self, backend):
builder = pkcs7.PKCS7SignatureBuilder()
with pytest.raises(TypeError):
@ -834,6 +834,242 @@ class TestPKCS7Builder:
)
def _load_rsa_cert_key():
key = load_vectors_from_file(
os.path.join("x509", "custom", "ca", "rsa_key.pem"),
lambda pemfile: serialization.load_pem_private_key(
pemfile.read(), None, unsafe_skip_rsa_key_validation=True
),
mode="rb",
)
cert = load_vectors_from_file(
os.path.join("x509", "custom", "ca", "rsa_ca.pem"),
loader=lambda pemfile: x509.load_pem_x509_certificate(pemfile.read()),
mode="rb",
)
return cert, key
@pytest.mark.supported(
only_if=lambda backend: backend.pkcs7_supported()
and backend.rsa_encryption_supported(padding.PKCS1v15()),
skip_message="Requires OpenSSL with PKCS7 support and PKCS1 v1.5 padding "
"support",
)
class TestPKCS7EnvelopeBuilder:
def test_invalid_data(self, backend):
builder = pkcs7.PKCS7EnvelopeBuilder()
with pytest.raises(TypeError):
builder.set_data("not bytes") # type: ignore[arg-type]
def test_set_data_twice(self, backend):
builder = pkcs7.PKCS7EnvelopeBuilder().set_data(b"test")
with pytest.raises(ValueError):
builder.set_data(b"test")
def test_encrypt_no_recipient(self, backend):
builder = pkcs7.PKCS7EnvelopeBuilder().set_data(b"test")
with pytest.raises(ValueError):
builder.encrypt(serialization.Encoding.SMIME, [])
def test_encrypt_no_data(self, backend):
cert, _ = _load_rsa_cert_key()
builder = pkcs7.PKCS7EnvelopeBuilder().add_recipient(cert)
with pytest.raises(ValueError):
builder.encrypt(serialization.Encoding.SMIME, [])
def test_unsupported_encryption(self, backend):
cert_non_rsa, _ = _load_cert_key()
with pytest.raises(TypeError):
pkcs7.PKCS7EnvelopeBuilder().add_recipient(cert_non_rsa)
def test_not_a_cert(self, backend):
with pytest.raises(TypeError):
pkcs7.PKCS7EnvelopeBuilder().add_recipient(
b"notacert", # type: ignore[arg-type]
)
def test_encrypt_invalid_options(self, backend):
cert, _ = _load_rsa_cert_key()
builder = (
pkcs7.PKCS7EnvelopeBuilder().set_data(b"test").add_recipient(cert)
)
with pytest.raises(ValueError):
builder.encrypt(
serialization.Encoding.SMIME,
[b"invalid"], # type: ignore[list-item]
)
def test_encrypt_invalid_encoding(self, backend):
cert, _ = _load_rsa_cert_key()
builder = (
pkcs7.PKCS7EnvelopeBuilder().set_data(b"test").add_recipient(cert)
)
with pytest.raises(ValueError):
builder.encrypt(serialization.Encoding.Raw, [])
@pytest.mark.parametrize(
"invalid_options",
[
[pkcs7.PKCS7Options.NoAttributes],
[pkcs7.PKCS7Options.NoCapabilities],
[pkcs7.PKCS7Options.NoCerts],
[pkcs7.PKCS7Options.DetachedSignature],
[pkcs7.PKCS7Options.Binary, pkcs7.PKCS7Options.Text],
],
)
def test_encrypt_invalid_encryption_options(
self, backend, invalid_options
):
cert, _ = _load_rsa_cert_key()
builder = (
pkcs7.PKCS7EnvelopeBuilder().set_data(b"test").add_recipient(cert)
)
with pytest.raises(ValueError):
builder.encrypt(serialization.Encoding.DER, invalid_options)
@pytest.mark.parametrize(
"options",
[
[pkcs7.PKCS7Options.Text],
[pkcs7.PKCS7Options.Binary],
],
)
def test_smime_encrypt_smime_encoding(self, backend, options):
data = b"hello world\n"
cert, private_key = _load_rsa_cert_key()
builder = (
pkcs7.PKCS7EnvelopeBuilder().set_data(data).add_recipient(cert)
)
enveloped = builder.encrypt(serialization.Encoding.SMIME, options)
assert b"MIME-Version: 1.0\n" in enveloped
assert b"Content-Transfer-Encoding: base64\n" in enveloped
message = email.parser.BytesParser().parsebytes(enveloped)
assert message.get_content_disposition() == "attachment"
assert message.get_filename() == "smime.p7m"
assert message.get_content_type() == "application/pkcs7-mime"
assert message.get_param("smime-type") == "enveloped-data"
assert message.get_param("name") == "smime.p7m"
payload = message.get_payload(decode=True)
assert isinstance(payload, bytes)
# We want to know if we've serialized something that has the parameters
# we expect, so we match on specific byte strings of OIDs & DER values.
# OID 2.16.840.1.101.3.4.1.2 (aes128-CBC)
assert b"\x06\x09\x60\x86\x48\x01\x65\x03\x04\x01\x02" in payload
# OID 1.2.840.113549.1.1.1 (rsaEncryption (PKCS #1))
assert b"\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01" in payload
# cryptography CA (the recipient's Common Name)
assert (
b"\x0c\x0f\x63\x72\x79\x70\x74\x6f\x67\x72\x61\x70\x68\x79"
b"\x20\x43\x41"
) in payload
decrypted_bytes = test_support.pkcs7_decrypt(
serialization.Encoding.SMIME,
enveloped,
private_key,
cert,
options,
)
# New lines are canonicalized to '\r\n' when not using Binary
expected_data = (
data
if pkcs7.PKCS7Options.Binary in options
else data.replace(b"\n", b"\r\n")
)
assert decrypted_bytes == expected_data
@pytest.mark.parametrize(
"options",
[
[pkcs7.PKCS7Options.Text],
[pkcs7.PKCS7Options.Binary],
],
)
def test_smime_encrypt_der_encoding(self, backend, options):
data = b"hello world\n"
cert, private_key = _load_rsa_cert_key()
builder = (
pkcs7.PKCS7EnvelopeBuilder().set_data(data).add_recipient(cert)
)
enveloped = builder.encrypt(serialization.Encoding.DER, options)
# We want to know if we've serialized something that has the parameters
# we expect, so we match on specific byte strings of OIDs & DER values.
# OID 2.16.840.1.101.3.4.1.2 (aes128-CBC)
assert b"\x06\x09\x60\x86\x48\x01\x65\x03\x04\x01\x02" in enveloped
# OID 1.2.840.113549.1.1.1 (rsaEncryption (PKCS #1))
assert b"\x06\x09\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01" in enveloped
# cryptography CA (the recipient's Common Name)
assert (
b"\x0c\x0f\x63\x72\x79\x70\x74\x6f\x67\x72\x61\x70\x68\x79"
b"\x20\x43\x41"
) in enveloped
decrypted_bytes = test_support.pkcs7_decrypt(
serialization.Encoding.DER,
enveloped,
private_key,
cert,
options,
)
# New lines are canonicalized to '\r\n' when not using Binary
expected_data = (
data
if pkcs7.PKCS7Options.Binary in options
else data.replace(b"\n", b"\r\n")
)
assert decrypted_bytes == expected_data
@pytest.mark.parametrize(
"options",
[
[pkcs7.PKCS7Options.Text],
[pkcs7.PKCS7Options.Binary],
],
)
def test_smime_encrypt_pem_encoding(self, backend, options):
data = b"hello world\n"
cert, private_key = _load_rsa_cert_key()
builder = (
pkcs7.PKCS7EnvelopeBuilder().set_data(data).add_recipient(cert)
)
enveloped = builder.encrypt(serialization.Encoding.PEM, options)
decrypted_bytes = test_support.pkcs7_decrypt(
serialization.Encoding.PEM,
enveloped,
private_key,
cert,
options,
)
# New lines are canonicalized to '\r\n' when not using Binary
expected_data = (
data
if pkcs7.PKCS7Options.Binary in options
else data.replace(b"\n", b"\r\n")
)
assert decrypted_bytes == expected_data
def test_smime_encrypt_multiple_recipients(self, backend):
data = b"hello world\n"
cert, private_key = _load_rsa_cert_key()
builder = (
pkcs7.PKCS7EnvelopeBuilder()
.set_data(data)
.add_recipient(cert)
.add_recipient(cert)
)
enveloped = builder.encrypt(serialization.Encoding.DER, [])
# cryptography CA (the recipient's Common Name)
common_name_bytes = (
b"\x0c\x0f\x63\x72\x79\x70\x74\x6f\x67\x72\x61"
b"\x70\x68\x79\x20\x43\x41"
)
assert enveloped.count(common_name_bytes) == 2
@pytest.mark.supported(
only_if=lambda backend: backend.pkcs7_supported(),
skip_message="Requires OpenSSL with PKCS7 support",
@ -921,3 +1157,14 @@ class TestPKCS7Unsupported:
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
pkcs7.load_pem_pkcs7_certificates(b"nonsense")
@pytest.mark.supported(
only_if=lambda backend: backend.pkcs7_supported()
and not backend.rsa_encryption_supported(padding.PKCS1v15()),
skip_message="Requires OpenSSL with no PKCS1 v1.5 padding support",
)
class TestPKCS7EnvelopeBuilderUnsupported:
def test_envelope_builder_unsupported(self, backend):
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_PADDING):
pkcs7.PKCS7EnvelopeBuilder()