mirror of
https://github.com/saymrwulf/cryptography.git
synced 2026-05-14 20:37:55 +00:00
Merge pull request #707 from public/consume-errors
Get rid of handle_errors
This commit is contained in:
commit
09cd8e6cd8
2 changed files with 55 additions and 67 deletions
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
import collections
|
||||
import itertools
|
||||
|
||||
from cryptography import utils
|
||||
|
|
@ -34,6 +35,10 @@ from cryptography.hazmat.primitives.ciphers.modes import (
|
|||
)
|
||||
|
||||
|
||||
_OpenSSLError = collections.namedtuple("_OpenSSLError",
|
||||
["code", "lib", "func", "reason"])
|
||||
|
||||
|
||||
@utils.register_interface(CipherBackend)
|
||||
@utils.register_interface(HashBackend)
|
||||
@utils.register_interface(HMACBackend)
|
||||
|
|
@ -228,43 +233,25 @@ class Backend(object):
|
|||
self._lib.ERR_error_string_n(code, err_buf, 256)
|
||||
return self._ffi.string(err_buf, 256)[:]
|
||||
|
||||
def _handle_error(self, mode):
|
||||
code = self._lib.ERR_get_error()
|
||||
if not code and isinstance(mode, GCM):
|
||||
raise InvalidTag
|
||||
assert code != 0
|
||||
def _consume_errors(self):
|
||||
errors = []
|
||||
while True:
|
||||
code = self._lib.ERR_get_error()
|
||||
if code == 0:
|
||||
break
|
||||
|
||||
# consume any remaining errors on the stack
|
||||
ignored_code = None
|
||||
while ignored_code != 0:
|
||||
ignored_code = self._lib.ERR_get_error()
|
||||
lib = self._lib.ERR_GET_LIB(code)
|
||||
func = self._lib.ERR_GET_FUNC(code)
|
||||
reason = self._lib.ERR_GET_REASON(code)
|
||||
|
||||
# raise the first error we found
|
||||
return self._handle_error_code(code)
|
||||
errors.append(_OpenSSLError(code, lib, func, reason))
|
||||
return errors
|
||||
|
||||
def _handle_error_code(self, code):
|
||||
lib = self._lib.ERR_GET_LIB(code)
|
||||
func = self._lib.ERR_GET_FUNC(code)
|
||||
reason = self._lib.ERR_GET_REASON(code)
|
||||
|
||||
if lib == self._lib.ERR_LIB_EVP:
|
||||
if func == self._lib.EVP_F_EVP_ENCRYPTFINAL_EX:
|
||||
if reason == self._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH:
|
||||
raise ValueError(
|
||||
"The length of the provided data is not a multiple of "
|
||||
"the block length"
|
||||
)
|
||||
elif func == self._lib.EVP_F_EVP_DECRYPTFINAL_EX:
|
||||
if reason == self._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH:
|
||||
raise ValueError(
|
||||
"The length of the provided data is not a multiple of "
|
||||
"the block length"
|
||||
)
|
||||
|
||||
raise InternalError(
|
||||
def _unknown_error(self, error):
|
||||
return InternalError(
|
||||
"Unknown error code {0} from OpenSSL, "
|
||||
"you should probably file a bug. {1}".format(
|
||||
code, self._err_string(code)
|
||||
error.code, self._err_string(error.code)
|
||||
)
|
||||
)
|
||||
|
||||
|
|
@ -464,7 +451,28 @@ class _CipherContext(object):
|
|||
outlen = self._backend._ffi.new("int *")
|
||||
res = self._backend._lib.EVP_CipherFinal_ex(self._ctx, buf, outlen)
|
||||
if res == 0:
|
||||
self._backend._handle_error(self._mode)
|
||||
errors = self._backend._consume_errors()
|
||||
|
||||
if not errors and isinstance(self._mode, GCM):
|
||||
raise InvalidTag
|
||||
|
||||
assert errors
|
||||
|
||||
if errors[0][1:] == (
|
||||
self._backend._lib.ERR_LIB_EVP,
|
||||
self._backend._lib.EVP_F_EVP_ENCRYPTFINAL_EX,
|
||||
self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH
|
||||
) or errors[0][1:] == (
|
||||
self._backend._lib.ERR_LIB_EVP,
|
||||
self._backend._lib.EVP_F_EVP_DECRYPTFINAL_EX,
|
||||
self._backend._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH
|
||||
):
|
||||
raise ValueError(
|
||||
"The length of the provided data is not a multiple of "
|
||||
"the block length."
|
||||
)
|
||||
else:
|
||||
raise self._backend._unknown_error(errors[0])
|
||||
|
||||
if (isinstance(self._mode, GCM) and
|
||||
self._operation == self._ENCRYPT):
|
||||
|
|
|
|||
|
|
@ -71,46 +71,17 @@ class TestOpenSSL(object):
|
|||
with pytest.raises(UnsupportedAlgorithm):
|
||||
cipher.encryptor()
|
||||
|
||||
def test_handle_unknown_error(self):
|
||||
with pytest.raises(InternalError):
|
||||
backend._handle_error_code(0)
|
||||
|
||||
backend._lib.ERR_put_error(backend._lib.ERR_LIB_EVP, 0, 0,
|
||||
b"test_openssl.py", -1)
|
||||
with pytest.raises(InternalError):
|
||||
backend._handle_error(None)
|
||||
|
||||
backend._lib.ERR_put_error(
|
||||
backend._lib.ERR_LIB_EVP,
|
||||
backend._lib.EVP_F_EVP_ENCRYPTFINAL_EX,
|
||||
0,
|
||||
b"test_openssl.py",
|
||||
-1
|
||||
)
|
||||
with pytest.raises(InternalError):
|
||||
backend._handle_error(None)
|
||||
|
||||
backend._lib.ERR_put_error(
|
||||
backend._lib.ERR_LIB_EVP,
|
||||
backend._lib.EVP_F_EVP_DECRYPTFINAL_EX,
|
||||
0,
|
||||
b"test_openssl.py",
|
||||
-1
|
||||
)
|
||||
with pytest.raises(InternalError):
|
||||
backend._handle_error(None)
|
||||
|
||||
def test_handle_multiple_errors(self):
|
||||
def test_consume_errors(self):
|
||||
for i in range(10):
|
||||
backend._lib.ERR_put_error(backend._lib.ERR_LIB_EVP, 0, 0,
|
||||
b"test_openssl.py", -1)
|
||||
|
||||
assert backend._lib.ERR_peek_error() != 0
|
||||
|
||||
with pytest.raises(InternalError):
|
||||
backend._handle_error(None)
|
||||
errors = backend._consume_errors()
|
||||
|
||||
assert backend._lib.ERR_peek_error() == 0
|
||||
assert len(errors) == 10
|
||||
|
||||
def test_openssl_error_string(self):
|
||||
backend._lib.ERR_put_error(
|
||||
|
|
@ -121,8 +92,8 @@ class TestOpenSSL(object):
|
|||
-1
|
||||
)
|
||||
|
||||
with pytest.raises(InternalError) as exc:
|
||||
backend._handle_error(None)
|
||||
errors = backend._consume_errors()
|
||||
exc = backend._unknown_error(errors[0])
|
||||
|
||||
assert (
|
||||
"digital envelope routines:"
|
||||
|
|
@ -147,6 +118,15 @@ class TestOpenSSL(object):
|
|||
b"data not multiple of block length"
|
||||
)
|
||||
|
||||
def test_unknown_error_in_cipher_finalize(self):
|
||||
cipher = Cipher(AES(b"\0" * 16), CBC(b"\0" * 16), backend=backend)
|
||||
enc = cipher.encryptor()
|
||||
enc.update(b"\0")
|
||||
backend._lib.ERR_put_error(0, 0, 1,
|
||||
b"test_openssl.py", -1)
|
||||
with pytest.raises(InternalError):
|
||||
enc.finalize()
|
||||
|
||||
def test_derive_pbkdf2_raises_unsupported_on_old_openssl(self):
|
||||
if backend.pbkdf2_hmac_supported(hashes.SHA256()):
|
||||
pytest.skip("Requires an older OpenSSL")
|
||||
|
|
|
|||
Loading…
Reference in a new issue