mirror of
https://github.com/saymrwulf/cryptography.git
synced 2026-05-14 20:37:55 +00:00
add support for CipherContext.update_nonce (#10437)
* add support for CipherContext.reset_nonce This only supports ChaCha20 and ciphers in CTR mode. * expand tests to reset to different nonces
This commit is contained in:
parent
8a7f27be3d
commit
2b371f418b
7 changed files with 205 additions and 2 deletions
|
|
@ -60,6 +60,9 @@ Changelog
|
|||
``datetime`` objects.
|
||||
* Added
|
||||
:func:`~cryptography.hazmat.primitives.asymmetric.rsa.rsa_recover_private_exponent`
|
||||
* Added :meth:`~cryptography.hazmat.primitives.ciphers.CipherContext.reset_nonce`
|
||||
for altering the ``nonce`` of a cipher context without initializing a new
|
||||
instance. See the docs for additional restrictions.
|
||||
|
||||
.. _v42-0-8:
|
||||
|
||||
|
|
|
|||
|
|
@ -693,6 +693,27 @@ Interfaces
|
|||
:meth:`update` and :meth:`finalize` will raise an
|
||||
:class:`~cryptography.exceptions.AlreadyFinalized` exception.
|
||||
|
||||
.. method:: reset_nonce(nonce)
|
||||
|
||||
.. versionadded:: 43.0.0
|
||||
|
||||
This method allows changing the nonce for an already existing context.
|
||||
Normally the nonce is set when the context is created and internally
|
||||
incremented as data as passed. However, in some scenarios the same key
|
||||
is used repeatedly but the nonce changes non-sequentially (e.g. ``QUIC``),
|
||||
which requires updating the context with the new nonce.
|
||||
|
||||
This method only works for contexts using
|
||||
:class:`~cryptography.hazmat.primitives.ciphers.algorithms.ChaCha20` or
|
||||
:class:`~cryptography.hazmat.primitives.ciphers.modes.CTR` mode.
|
||||
|
||||
:param nonce: The nonce to update the context with.
|
||||
:type data: :term:`bytes-like`
|
||||
:raises cryptography.exceptions.UnsupportedAlgorithm: If the
|
||||
algorithm does not support updating the nonce.
|
||||
:raises ValueError: If the nonce is not the correct length for the
|
||||
algorithm.
|
||||
|
||||
.. class:: AEADCipherContext
|
||||
|
||||
When calling ``encryptor`` or ``decryptor`` on a ``Cipher`` object
|
||||
|
|
|
|||
|
|
@ -33,6 +33,14 @@ class CipherContext(metaclass=abc.ABCMeta):
|
|||
Returns the results of processing the final block as bytes.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def reset_nonce(self, nonce: bytes) -> None:
|
||||
"""
|
||||
Resets the nonce for the cipher context to the provided value.
|
||||
Raises an exception if it does not support reset or if the
|
||||
provided nonce does not have a valid length.
|
||||
"""
|
||||
|
||||
|
||||
class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta):
|
||||
@abc.abstractmethod
|
||||
|
|
|
|||
|
|
@ -13,6 +13,8 @@ use pyo3::IntoPy;
|
|||
pub(crate) struct CipherContext {
|
||||
ctx: openssl::cipher_ctx::CipherCtx,
|
||||
py_mode: pyo3::PyObject,
|
||||
py_algorithm: pyo3::PyObject,
|
||||
side: openssl::symm::Mode,
|
||||
}
|
||||
|
||||
impl CipherContext {
|
||||
|
|
@ -113,9 +115,44 @@ impl CipherContext {
|
|||
Ok(CipherContext {
|
||||
ctx,
|
||||
py_mode: mode.into(),
|
||||
py_algorithm: algorithm.into(),
|
||||
side,
|
||||
})
|
||||
}
|
||||
|
||||
fn reset_nonce(&mut self, py: pyo3::Python<'_>, nonce: CffiBuf<'_>) -> CryptographyResult<()> {
|
||||
if !self
|
||||
.py_mode
|
||||
.bind(py)
|
||||
.is_instance(&types::MODE_WITH_NONCE.get(py)?)?
|
||||
&& !self
|
||||
.py_algorithm
|
||||
.bind(py)
|
||||
.is_instance(&types::CHACHA20.get(py)?)?
|
||||
{
|
||||
return Err(CryptographyError::from(
|
||||
exceptions::UnsupportedAlgorithm::new_err((
|
||||
"This algorithm or mode does not support resetting the nonce.",
|
||||
exceptions::Reasons::UNSUPPORTED_CIPHER,
|
||||
)),
|
||||
));
|
||||
}
|
||||
if nonce.as_bytes().len() != self.ctx.iv_length() {
|
||||
return Err(CryptographyError::from(
|
||||
pyo3::exceptions::PyValueError::new_err(format!(
|
||||
"Nonce must be {} bytes long",
|
||||
self.ctx.iv_length()
|
||||
)),
|
||||
));
|
||||
}
|
||||
let init_op = match self.side {
|
||||
openssl::symm::Mode::Encrypt => openssl::cipher_ctx::CipherCtxRef::encrypt_init,
|
||||
openssl::symm::Mode::Decrypt => openssl::cipher_ctx::CipherCtxRef::decrypt_init,
|
||||
};
|
||||
init_op(&mut self.ctx, None, None, Some(nonce.as_bytes()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update<'p>(
|
||||
&mut self,
|
||||
py: pyo3::Python<'p>,
|
||||
|
|
@ -236,6 +273,10 @@ impl PyCipherContext {
|
|||
get_mut_ctx(self.ctx.as_mut())?.update(py, buf.as_bytes())
|
||||
}
|
||||
|
||||
fn reset_nonce(&mut self, py: pyo3::Python<'_>, nonce: CffiBuf<'_>) -> CryptographyResult<()> {
|
||||
get_mut_ctx(self.ctx.as_mut())?.reset_nonce(py, nonce)
|
||||
}
|
||||
|
||||
fn update_into(
|
||||
&mut self,
|
||||
py: pyo3::Python<'_>,
|
||||
|
|
@ -340,6 +381,10 @@ impl PyAEADEncryptionContext {
|
|||
})?
|
||||
.clone_ref(py))
|
||||
}
|
||||
|
||||
fn reset_nonce(&mut self, py: pyo3::Python<'_>, nonce: CffiBuf<'_>) -> CryptographyResult<()> {
|
||||
get_mut_ctx(self.ctx.as_mut())?.reset_nonce(py, nonce)
|
||||
}
|
||||
}
|
||||
|
||||
#[pyo3::pymethods]
|
||||
|
|
@ -468,6 +513,10 @@ impl PyAEADDecryptionContext {
|
|||
self.ctx = None;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn reset_nonce(&mut self, py: pyo3::Python<'_>, nonce: CffiBuf<'_>) -> CryptographyResult<()> {
|
||||
get_mut_ctx(self.ctx.as_mut())?.reset_nonce(py, nonce)
|
||||
}
|
||||
}
|
||||
|
||||
#[pyo3::pyfunction]
|
||||
|
|
|
|||
|
|
@ -8,11 +8,12 @@ import os
|
|||
|
||||
import pytest
|
||||
|
||||
from cryptography.exceptions import AlreadyFinalized, _Reasons
|
||||
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
|
||||
from cryptography.hazmat.primitives.ciphers import algorithms, base, modes
|
||||
|
||||
from ...doubles import DummyMode
|
||||
from ...utils import load_nist_vectors
|
||||
from ...utils import load_nist_vectors, raises_unsupported_algorithm
|
||||
from .utils import _load_all_params, generate_encrypt_test
|
||||
|
||||
|
||||
|
|
@ -305,3 +306,61 @@ def test_alternate_aes_classes(mode, alg_cls, backend):
|
|||
dec = cipher.decryptor()
|
||||
pt = dec.update(ct) + dec.finalize()
|
||||
assert pt == data
|
||||
|
||||
|
||||
def test_reset_nonce(backend):
|
||||
data = b"helloworld" * 10
|
||||
nonce = b"\x00" * 16
|
||||
nonce_alt = b"\xee" * 16
|
||||
cipher = base.Cipher(
|
||||
algorithms.AES(b"\x00" * 16),
|
||||
modes.CTR(nonce),
|
||||
)
|
||||
cipher_alt = base.Cipher(
|
||||
algorithms.AES(b"\x00" * 16),
|
||||
modes.CTR(nonce_alt),
|
||||
)
|
||||
enc = cipher.encryptor()
|
||||
ct1 = enc.update(data)
|
||||
assert len(ct1) == len(data)
|
||||
for _ in range(2):
|
||||
enc.reset_nonce(nonce)
|
||||
assert enc.update(data) == ct1
|
||||
# Reset the nonce to a different value
|
||||
# and check it matches with a different context
|
||||
enc_alt = cipher_alt.encryptor()
|
||||
ct2 = enc_alt.update(data)
|
||||
enc.reset_nonce(nonce_alt)
|
||||
assert enc.update(data) == ct2
|
||||
enc_alt.finalize()
|
||||
enc.finalize()
|
||||
with pytest.raises(AlreadyFinalized):
|
||||
enc.reset_nonce(nonce)
|
||||
dec = cipher.decryptor()
|
||||
assert dec.update(ct1) == data
|
||||
for _ in range(2):
|
||||
dec.reset_nonce(nonce)
|
||||
assert dec.update(ct1) == data
|
||||
# Reset the nonce to a different value
|
||||
# and check it matches with a different context
|
||||
dec_alt = cipher_alt.decryptor()
|
||||
dec.reset_nonce(nonce_alt)
|
||||
assert dec.update(ct2) == dec_alt.update(ct2)
|
||||
dec_alt.finalize()
|
||||
dec.finalize()
|
||||
with pytest.raises(AlreadyFinalized):
|
||||
dec.reset_nonce(nonce)
|
||||
|
||||
|
||||
def test_reset_nonce_invalid_mode(backend):
|
||||
iv = b"\x00" * 16
|
||||
c = base.Cipher(
|
||||
algorithms.AES(b"\x00" * 16),
|
||||
modes.CBC(iv),
|
||||
)
|
||||
enc = c.encryptor()
|
||||
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
|
||||
enc.reset_nonce(iv)
|
||||
dec = c.decryptor()
|
||||
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
|
||||
dec.reset_nonce(iv)
|
||||
|
|
|
|||
|
|
@ -8,10 +8,11 @@ import os
|
|||
|
||||
import pytest
|
||||
|
||||
from cryptography.exceptions import _Reasons
|
||||
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
|
||||
from cryptography.hazmat.primitives.ciphers import algorithms, base, modes
|
||||
|
||||
from ...utils import load_nist_vectors
|
||||
from ...utils import load_nist_vectors, raises_unsupported_algorithm
|
||||
from .utils import generate_aead_test
|
||||
|
||||
|
||||
|
|
@ -230,3 +231,16 @@ class TestAESModeGCM:
|
|||
dec = cipher.decryptor()
|
||||
pt = dec.update(ct) + dec.finalize_with_tag(enc.tag)
|
||||
assert pt == data
|
||||
|
||||
def test_reset_nonce_invalid_mode(self, backend):
|
||||
nonce = b"\x00" * 12
|
||||
c = base.Cipher(
|
||||
algorithms.AES(b"\x00" * 16),
|
||||
modes.GCM(nonce),
|
||||
)
|
||||
enc = c.encryptor()
|
||||
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
|
||||
enc.reset_nonce(nonce)
|
||||
dec = c.decryptor()
|
||||
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
|
||||
dec.reset_nonce(nonce)
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import struct
|
|||
|
||||
import pytest
|
||||
|
||||
from cryptography.exceptions import AlreadyFinalized
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
|
||||
|
||||
from ...utils import load_nist_vectors
|
||||
|
|
@ -90,3 +91,51 @@ class TestChaCha20:
|
|||
ct_partial_3 = enc_partial.update(pt[len_partial * 2 :])
|
||||
|
||||
assert ct_full == ct_partial_1 + ct_partial_2 + ct_partial_3
|
||||
|
||||
def test_reset_nonce(self, backend):
|
||||
data = b"helloworld" * 10
|
||||
key = b"\x00" * 32
|
||||
nonce = b"\x00" * 16
|
||||
nonce_alt = b"\xee" * 16
|
||||
cipher = Cipher(algorithms.ChaCha20(key, nonce), None)
|
||||
cipher_alt = Cipher(algorithms.ChaCha20(key, nonce_alt), None)
|
||||
enc = cipher.encryptor()
|
||||
ct1 = enc.update(data)
|
||||
assert len(ct1) == len(data)
|
||||
for _ in range(2):
|
||||
enc.reset_nonce(nonce)
|
||||
assert enc.update(data) == ct1
|
||||
# Reset the nonce to a different value
|
||||
# and check it matches with a different context
|
||||
enc_alt = cipher_alt.encryptor()
|
||||
ct2 = enc_alt.update(data)
|
||||
enc.reset_nonce(nonce_alt)
|
||||
assert enc.update(data) == ct2
|
||||
enc_alt.finalize()
|
||||
enc.finalize()
|
||||
with pytest.raises(AlreadyFinalized):
|
||||
enc.reset_nonce(nonce)
|
||||
dec = cipher.decryptor()
|
||||
assert dec.update(ct1) == data
|
||||
for _ in range(2):
|
||||
dec.reset_nonce(nonce)
|
||||
assert dec.update(ct1) == data
|
||||
# Reset the nonce to a different value
|
||||
# and check it matches with a different context
|
||||
dec_alt = cipher_alt.decryptor()
|
||||
dec.reset_nonce(nonce_alt)
|
||||
assert dec.update(ct2) == dec_alt.update(ct2)
|
||||
dec_alt.finalize()
|
||||
dec.finalize()
|
||||
with pytest.raises(AlreadyFinalized):
|
||||
dec.reset_nonce(nonce)
|
||||
|
||||
def test_nonce_reset_invalid_length(self, backend):
|
||||
key = b"\x00" * 32
|
||||
nonce = b"\x00" * 16
|
||||
cipher = Cipher(algorithms.ChaCha20(key, nonce), None)
|
||||
enc = cipher.encryptor()
|
||||
with pytest.raises(ValueError):
|
||||
enc.reset_nonce(nonce[:-1])
|
||||
with pytest.raises(ValueError):
|
||||
enc.reset_nonce(nonce + b"\x00")
|
||||
|
|
|
|||
Loading…
Reference in a new issue