Added typing for more of ciphers (#5738)

This commit is contained in:
Alex Gaynor 2021-02-01 22:43:36 -05:00 committed by GitHub
parent e6352d5ef2
commit da8d490ed2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 49 additions and 32 deletions

View file

@ -118,12 +118,12 @@ class _CipherContext(object):
self._backend._lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
self._ctx = ctx
def update(self, data):
def update(self, data: bytes) -> bytes:
buf = bytearray(len(data) + self._block_size_bytes - 1)
n = self.update_into(data, buf)
return bytes(buf[:n])
def update_into(self, data, buf):
def update_into(self, data: bytes, buf) -> int:
total_data_len = len(data)
if len(buf) < (total_data_len + self._block_size_bytes - 1):
raise ValueError(
@ -151,7 +151,7 @@ class _CipherContext(object):
return total_out
def finalize(self):
def finalize(self) -> bytes:
if (
self._operation == self._DECRYPT
and isinstance(self._mode, modes.ModeWithAuthenticationTag)
@ -202,7 +202,7 @@ class _CipherContext(object):
self._backend.openssl_assert(res == 1)
return self._backend._ffi.buffer(buf)[: outlen[0]]
def finalize_with_tag(self, tag):
def finalize_with_tag(self, tag: bytes) -> bytes:
if len(tag) < self._mode._min_tag_length:
raise ValueError(
"Authentication tag must be {} bytes or longer.".format(
@ -216,7 +216,7 @@ class _CipherContext(object):
self._tag = tag
return self.finalize()
def authenticate_additional_data(self, data):
def authenticate_additional_data(self, data: bytes) -> None:
outlen = self._backend._ffi.new("int *")
res = self._backend._lib.EVP_CipherUpdate(
self._ctx,

View file

@ -3,6 +3,7 @@
# for complete details.
import abc
import typing
# This exists to break an import cycle. It is normally accessible from the
@ -11,13 +12,19 @@ import abc
class CipherAlgorithm(metaclass=abc.ABCMeta):
@abc.abstractproperty
def name(self):
def name(self) -> str:
"""
A string naming this mode (e.g. "AES", "Camellia").
"""
@abc.abstractproperty
def key_size(self):
def key_sizes(self) -> typing.FrozenSet[int]:
"""
Valid key sizes for this algorithm in bits
"""
@abc.abstractproperty
def key_size(self) -> int:
"""
The size of the key being used as an integer in bits (e.g. 128, 256).
"""
@ -25,7 +32,7 @@ class CipherAlgorithm(metaclass=abc.ABCMeta):
class BlockCipherAlgorithm(metaclass=abc.ABCMeta):
@abc.abstractproperty
def block_size(self):
def block_size(self) -> int:
"""
The size of a block as an integer in bits (e.g. 64, 128).
"""

View file

@ -136,14 +136,14 @@ class AESCCM(object):
backend, self, nonce, data, associated_data, self._tag_length
)
def _validate_lengths(self, nonce, data_len):
def _validate_lengths(self, nonce: bytes, data_len: int):
# For information about computing this, see
# https://tools.ietf.org/html/rfc3610#section-2.1
l_val = 15 - len(nonce)
if 2 ** (8 * l_val) < data_len:
raise ValueError("Data too long for nonce")
def _check_params(self, nonce, data, associated_data):
def _check_params(self, nonce: bytes, data: bytes, associated_data: bytes):
utils._check_byteslike("nonce", nonce)
utils._check_bytes("data", data)
utils._check_bytes("associated_data", associated_data)

View file

@ -11,7 +11,7 @@ from cryptography.hazmat.primitives.ciphers import (
from cryptography.hazmat.primitives.ciphers.modes import ModeWithNonce
def _verify_key_size(algorithm, key):
def _verify_key_size(algorithm: CipherAlgorithm, key: bytes):
# Verify that the key is instance of bytes
utils._check_byteslike("key", key)

View file

@ -4,6 +4,7 @@
import abc
import typing
from cryptography import utils
from cryptography.exceptions import (
@ -21,7 +22,7 @@ from cryptography.hazmat.primitives.ciphers import modes
class BlockCipherAlgorithm(metaclass=abc.ABCMeta):
@abc.abstractproperty
def block_size(self):
def block_size(self) -> int:
"""
The size of a block as an integer in bits (e.g. 64, 128).
"""
@ -29,21 +30,21 @@ class BlockCipherAlgorithm(metaclass=abc.ABCMeta):
class CipherContext(metaclass=abc.ABCMeta):
@abc.abstractmethod
def update(self, data):
def update(self, data: bytes) -> bytes:
"""
Processes the provided bytes through the cipher and returns the results
as bytes.
"""
@abc.abstractmethod
def update_into(self, data, buf):
def update_into(self, data: bytes, buf) -> int:
"""
Processes the provided bytes and writes the resulting data into the
provided buffer. Returns the number of bytes written.
"""
@abc.abstractmethod
def finalize(self):
def finalize(self) -> bytes:
"""
Returns the results of processing the final block as bytes.
"""
@ -51,7 +52,7 @@ class CipherContext(metaclass=abc.ABCMeta):
class AEADCipherContext(metaclass=abc.ABCMeta):
@abc.abstractmethod
def authenticate_additional_data(self, data):
def authenticate_additional_data(self, data: bytes) -> None:
"""
Authenticates the provided bytes.
"""
@ -59,7 +60,7 @@ class AEADCipherContext(metaclass=abc.ABCMeta):
class AEADDecryptionContext(metaclass=abc.ABCMeta):
@abc.abstractmethod
def finalize_with_tag(self, tag):
def finalize_with_tag(self, tag: bytes) -> bytes:
"""
Returns the results of processing the final block as bytes and allows
delayed passing of the authentication tag.
@ -68,7 +69,7 @@ class AEADDecryptionContext(metaclass=abc.ABCMeta):
class AEADEncryptionContext(metaclass=abc.ABCMeta):
@abc.abstractproperty
def tag(self):
def tag(self) -> bytes:
"""
Returns tag bytes. This is only available after encryption is
finalized.
@ -76,7 +77,12 @@ class AEADEncryptionContext(metaclass=abc.ABCMeta):
class Cipher(object):
def __init__(self, algorithm, mode, backend=None):
def __init__(
self,
algorithm: CipherAlgorithm,
mode: typing.Optional[modes.Mode],
backend=None,
):
backend = _get_backend(backend)
if not isinstance(backend, CipherBackend):
raise UnsupportedAlgorithm(
@ -126,17 +132,17 @@ class _CipherContext(object):
def __init__(self, ctx):
self._ctx = ctx
def update(self, data):
def update(self, data: bytes) -> bytes:
if self._ctx is None:
raise AlreadyFinalized("Context was already finalized.")
return self._ctx.update(data)
def update_into(self, data, buf):
def update_into(self, data: bytes, buf) -> int:
if self._ctx is None:
raise AlreadyFinalized("Context was already finalized.")
return self._ctx.update_into(data, buf)
def finalize(self):
def finalize(self) -> bytes:
if self._ctx is None:
raise AlreadyFinalized("Context was already finalized.")
data = self._ctx.finalize()
@ -155,7 +161,7 @@ class _AEADCipherContext(object):
self._tag = None
self._updated = False
def _check_limit(self, data_size):
def _check_limit(self, data_size: int):
if self._ctx is None:
raise AlreadyFinalized("Context was already finalized.")
self._updated = True
@ -167,15 +173,15 @@ class _AEADCipherContext(object):
)
)
def update(self, data):
def update(self, data: bytes) -> bytes:
self._check_limit(len(data))
return self._ctx.update(data)
def update_into(self, data, buf):
def update_into(self, data: bytes, buf) -> int:
self._check_limit(len(data))
return self._ctx.update_into(data, buf)
def finalize(self):
def finalize(self) -> bytes:
if self._ctx is None:
raise AlreadyFinalized("Context was already finalized.")
data = self._ctx.finalize()
@ -183,7 +189,7 @@ class _AEADCipherContext(object):
self._ctx = None
return data
def finalize_with_tag(self, tag):
def finalize_with_tag(self, tag: bytes) -> bytes:
if self._ctx is None:
raise AlreadyFinalized("Context was already finalized.")
data = self._ctx.finalize_with_tag(tag)
@ -191,7 +197,7 @@ class _AEADCipherContext(object):
self._ctx = None
return data
def authenticate_additional_data(self, data):
def authenticate_additional_data(self, data: bytes) -> None:
if self._ctx is None:
raise AlreadyFinalized("Context was already finalized.")
if self._updated:
@ -211,9 +217,10 @@ class _AEADCipherContext(object):
@utils.register_interface(AEADEncryptionContext)
class _AEADEncryptionContext(_AEADCipherContext):
@property
def tag(self):
def tag(self) -> bytes:
if self._ctx is not None:
raise NotYetFinalized(
"You must finalize encryption before " "getting the tag."
)
assert self._tag is not None
return self._tag

View file

@ -12,7 +12,8 @@ from cryptography.hazmat.primitives.ciphers.modes import Mode
class DummyCipherAlgorithm(CipherAlgorithm):
name = "dummy-cipher"
block_size = 128
key_size = None
key_size = 256
key_sizes = frozenset([256])
class DummyMode(Mode):

View file

@ -45,7 +45,9 @@ class TestCipher(object):
def test_instantiate_with_non_algorithm(self, backend):
algorithm = object()
with pytest.raises(TypeError):
Cipher(algorithm, mode=None, backend=backend)
Cipher(
algorithm, mode=None, backend=backend # type: ignore[arg-type]
)
@pytest.mark.requires_backend_interface(interface=CipherBackend)

View file

@ -205,7 +205,7 @@ def test_invalid_backend():
pretend_backend = object()
with raises_unsupported_algorithm(_Reasons.BACKEND_MISSING_INTERFACE):
ciphers.Cipher(AES(b"AAAAAAAAAAAAAAAA"), modes.ECB, pretend_backend)
ciphers.Cipher(AES(b"AAAAAAAAAAAAAAAA"), modes.ECB(), pretend_backend)
@pytest.mark.supported(