mirror of
https://github.com/saymrwulf/pytorch.git
synced 2026-05-15 21:00:47 +00:00
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/28815 Add the unittest import ghstack-source-id: 92789329 Test Plan: CI Differential Revision: D18191989 fbshipit-source-id: c54e0309e21156c33e4fec01bfba17a1c30463c9
323 lines
15 KiB
Python
323 lines
15 KiB
Python
import torch
|
|
import torch.cuda
|
|
import torch.jit
|
|
import numpy as np
|
|
from hypothesis import given
|
|
from hypothesis import strategies as st
|
|
import hypothesis_utils as hu
|
|
from hypothesis_utils import no_deadline
|
|
from common_utils import run_tests, TestCase
|
|
from torch.quantization import FakeQuantize
|
|
from torch.quantization import default_observer, default_per_channel_weight_observer
|
|
import io
|
|
import unittest
|
|
|
|
# Reference method for fake quantize
|
|
def _fake_quantize_per_tensor_affine_reference(X, scale, zero_point, quant_min, quant_max):
|
|
res = (torch.clamp(torch.round(X.cpu() * (1.0 / scale) + zero_point), quant_min, quant_max) - zero_point) * scale
|
|
return res
|
|
|
|
# Reference method for the gradient of the fake quantize operator
|
|
def _fake_quantize_per_tensor_affine_grad_reference(dY, X, scale, zero_point, quant_min, quant_max):
|
|
Xq = torch.round(X.cpu() * (1.0 / scale) + zero_point)
|
|
mask = (Xq >= quant_min) * (Xq <= quant_max)
|
|
res = torch.zeros_like(dY.cpu())
|
|
res[mask] = dY.cpu()[mask]
|
|
return res
|
|
|
|
# Helper function used to simulate per-channel fake-quant against any axis
|
|
def _permute_to_axis_zero(X, axis):
|
|
new_axis_list = list(range(X.dim()))
|
|
new_axis_list[axis] = 0
|
|
new_axis_list[0] = axis
|
|
y = X.permute(tuple(new_axis_list))
|
|
return y, new_axis_list
|
|
|
|
# Reference method for fake quantize
|
|
def _fake_quantize_per_channel_affine_reference(X, per_channel_scale, per_channel_zero_point, axis, quant_min, quant_max):
|
|
X, permute_axis_list = _permute_to_axis_zero(X, axis)
|
|
res = torch.zeros_like(X)
|
|
|
|
for i in range(X.size()[0]):
|
|
res[i] = (torch.clamp(torch.round(X[i] * (1.0 / per_channel_scale[i]) +
|
|
per_channel_zero_point[i]), quant_min, quant_max) - per_channel_zero_point[i]) * per_channel_scale[i]
|
|
|
|
out = res.permute(tuple(permute_axis_list))
|
|
return out
|
|
|
|
# Reference method for the gradient of the fake quantize operator
|
|
def _fake_quantize_per_channel_affine_grad_reference(dY, X, per_channel_scale, per_channel_zero_point, axis, quant_min, quant_max):
|
|
X, permute_axis_list = _permute_to_axis_zero(X, axis)
|
|
Xq = torch.zeros_like(X)
|
|
for i in range(X.size()[0]):
|
|
Xq[i] = torch.round(X[i] * (1.0 / per_channel_scale[i]) + per_channel_zero_point[i])
|
|
Xq = Xq.permute(tuple(permute_axis_list))
|
|
mask = (Xq >= quant_min) * (Xq <= quant_max)
|
|
res = torch.zeros_like(dY)
|
|
res[mask] = dY[mask]
|
|
return res
|
|
|
|
def to_tensor(X, device):
|
|
return torch.tensor(X).to(device=torch.device(device), dtype=torch.float32)
|
|
|
|
NP_RANDOM_SEED = 19
|
|
tolerance = 1e-6
|
|
|
|
class TestFakeQuantizePerTensor(TestCase):
|
|
# NOTE: Tests in this class are decorated with no_deadline
|
|
# to prevent spurious failures due to cuda runtime initialization.
|
|
|
|
@no_deadline
|
|
@given(device=st.sampled_from(['cpu', 'cuda'] if torch.cuda.is_available() else ['cpu']),
|
|
X=hu.tensor(shapes=hu.array_shapes(1, 5,),
|
|
qparams=hu.qparams(dtypes=torch.quint8)))
|
|
def test_forward_per_tensor(self, device, X):
|
|
r"""Tests the forward path of the FakeQuantizePerTensorAffine op.
|
|
"""
|
|
np.random.seed(NP_RANDOM_SEED)
|
|
X, (scale, zero_point, torch_type) = X
|
|
quant_min = torch.iinfo(torch_type).min
|
|
quant_max = torch.iinfo(torch_type).max
|
|
|
|
X = to_tensor(X, device)
|
|
Y = _fake_quantize_per_tensor_affine_reference(X.cpu(), scale, zero_point, quant_min, quant_max)
|
|
Y_prime = torch.fake_quantize_per_tensor_affine(
|
|
X, scale, zero_point, quant_min, quant_max)
|
|
np.testing.assert_allclose(Y, Y_prime.cpu(), rtol=tolerance, atol=tolerance)
|
|
|
|
@no_deadline
|
|
@given(device=st.sampled_from(['cpu', 'cuda'] if torch.cuda.is_available() else ['cpu']),
|
|
X=hu.tensor(shapes=hu.array_shapes(1, 5,),
|
|
qparams=hu.qparams(dtypes=torch.quint8)))
|
|
def test_backward_per_tensor(self, device, X):
|
|
r"""Tests the backward method.
|
|
"""
|
|
np.random.seed(NP_RANDOM_SEED)
|
|
X, (scale, zero_point, torch_type) = X
|
|
quant_min = torch.iinfo(torch_type).min
|
|
quant_max = torch.iinfo(torch_type).max
|
|
|
|
X = to_tensor(X, device)
|
|
X.requires_grad_()
|
|
Y = _fake_quantize_per_tensor_affine_reference(X.cpu(), scale, zero_point, quant_min, quant_max)
|
|
Y_prime = torch.fake_quantize_per_tensor_affine(
|
|
X, scale, zero_point, quant_min, quant_max)
|
|
dout = torch.rand(X.shape, dtype=torch.float).to(device)
|
|
dX = _fake_quantize_per_tensor_affine_grad_reference(
|
|
dout, X, scale, zero_point, quant_min, quant_max)
|
|
Y_prime.backward(dout)
|
|
np.testing.assert_allclose(dX.cpu(), X.grad.cpu().detach().numpy(), rtol=tolerance, atol=tolerance)
|
|
|
|
@no_deadline
|
|
@given(device=st.sampled_from(['cpu', 'cuda'] if torch.cuda.is_available() else ['cpu']),
|
|
X=hu.tensor(shapes=hu.array_shapes(1, 5,),
|
|
qparams=hu.qparams(dtypes=torch.quint8)))
|
|
def test_numerical_consistency_per_tensor(self, device, X):
|
|
r"""Comparing numerical consistency between CPU quantize/dequantize op and the CPU fake quantize op
|
|
"""
|
|
np.random.seed(NP_RANDOM_SEED)
|
|
X, (scale, zero_point, torch_type) = X
|
|
quant_min = torch.iinfo(torch_type).min
|
|
quant_max = torch.iinfo(torch_type).max
|
|
|
|
X = to_tensor(X, device)
|
|
# quantize_per_tensor and dequantize are only implemented in CPU
|
|
Y = torch.dequantize(torch.quantize_per_tensor(X.cpu(), scale, zero_point, torch_type))
|
|
Y_prime = torch.fake_quantize_per_tensor_affine(
|
|
X, scale, zero_point, quant_min, quant_max)
|
|
np.testing.assert_allclose(Y, Y_prime.cpu(), rtol=tolerance, atol=tolerance)
|
|
|
|
@no_deadline
|
|
@given(device=st.sampled_from(['cpu', 'cuda'] if torch.cuda.is_available() else ['cpu']),
|
|
X=hu.tensor(shapes=hu.array_shapes(1, 5,),
|
|
qparams=hu.qparams(dtypes=[torch.quint8])),
|
|
)
|
|
def test_fq_module(self, device, X):
|
|
np.random.seed(NP_RANDOM_SEED)
|
|
X, (scale, zero_point, torch_type) = X
|
|
quant_min = torch.iinfo(torch_type).min
|
|
quant_max = torch.iinfo(torch_type).max
|
|
|
|
X = to_tensor(X, device)
|
|
X.requires_grad_()
|
|
fq_module = torch.quantization.default_fake_quant().to(device)
|
|
Y_prime = fq_module(X)
|
|
assert fq_module.scale is not None
|
|
assert fq_module.zero_point is not None
|
|
Y = _fake_quantize_per_tensor_affine_reference(X, fq_module.scale, fq_module.zero_point, quant_min, quant_max)
|
|
np.testing.assert_allclose(Y.cpu().detach().numpy(), Y_prime.cpu().detach().numpy(), rtol=tolerance, atol=tolerance)
|
|
|
|
# Test backward
|
|
dout = torch.rand(X.shape, dtype=torch.float, device=device)
|
|
Y_prime.backward(dout)
|
|
dX = _fake_quantize_per_tensor_affine_grad_reference(dout, X, fq_module.scale, fq_module.zero_point, quant_min, quant_max)
|
|
np.testing.assert_allclose(dX.cpu().numpy(), X.grad.cpu().detach().numpy(), rtol=tolerance, atol=tolerance)
|
|
|
|
def test_fq_serializable(self):
|
|
observer = default_observer
|
|
quant_min = 0
|
|
quant_max = 255
|
|
fq_module = FakeQuantize(observer, quant_min, quant_max)
|
|
X = torch.tensor([-5, -3.5, -2, 0, 3, 5, 7], dtype=torch.float32)
|
|
y_ref = fq_module(X)
|
|
state_dict = fq_module.state_dict()
|
|
self.assertEqual(state_dict['scale'], 0.094488)
|
|
self.assertEqual(state_dict['zero_point'], 53)
|
|
b = io.BytesIO()
|
|
torch.save(state_dict, b)
|
|
b.seek(0)
|
|
loaded_dict = torch.load(b)
|
|
loaded_fq_module = FakeQuantize(observer, quant_min, quant_max)
|
|
loaded_fq_module.load_state_dict(loaded_dict)
|
|
for key in state_dict:
|
|
self.assertEqual(state_dict[key], loaded_fq_module.state_dict()[key])
|
|
|
|
self.assertEqual(loaded_fq_module.calculate_qparams(), fq_module.calculate_qparams())
|
|
|
|
def test_fake_quant_control(self):
|
|
torch.manual_seed(42)
|
|
X = torch.rand(20, 10, dtype=torch.float32)
|
|
fq_module = torch.quantization.default_fake_quant()
|
|
# Output of fake quant is not identical to input
|
|
Y = fq_module(X)
|
|
self.assertNotEqual(Y, X)
|
|
torch.quantization.disable_fake_quant(fq_module)
|
|
X = torch.rand(20, 10, dtype=torch.float32)
|
|
Y = fq_module(X)
|
|
# Fake quant is disabled,output is identical to input
|
|
self.assertEqual(Y, X)
|
|
scale = fq_module.scale
|
|
zero_point = fq_module.zero_point
|
|
torch.quantization.disable_observer(fq_module)
|
|
torch.quantization.enable_fake_quant(fq_module)
|
|
X = 10.0 * torch.rand(20, 10, dtype=torch.float32) - 5.0
|
|
Y = fq_module(X)
|
|
self.assertNotEqual(Y, X)
|
|
# Observer is disabled, scale and zero-point do not change
|
|
self.assertEqual(fq_module.scale, scale)
|
|
self.assertEqual(fq_module.zero_point, zero_point)
|
|
torch.quantization.enable_observer(fq_module)
|
|
Y = fq_module(X)
|
|
self.assertNotEqual(Y, X)
|
|
# Observer is enabled, scale and zero-point are different
|
|
self.assertNotEqual(fq_module.scale, scale)
|
|
self.assertNotEqual(fq_module.zero_point, zero_point)
|
|
|
|
|
|
|
|
class TestFakeQuantizePerChannel(TestCase):
|
|
# NOTE: Tests in this class are decorated with no_deadline
|
|
# to prevent spurious failures due to cuda runtime initialization.
|
|
|
|
@no_deadline
|
|
@given(device=st.sampled_from(['cpu', 'cuda'] if torch.cuda.is_available() else ['cpu']),
|
|
X=hu.per_channel_tensor(shapes=hu.array_shapes(1, 5,),
|
|
qparams=hu.qparams(dtypes=torch.quint8)))
|
|
def test_forward_per_channel(self, device, X):
|
|
r"""Tests the forward path of the FakeQuantizePerTensorAffine op.
|
|
"""
|
|
np.random.seed(NP_RANDOM_SEED)
|
|
X, (scale, zero_point, axis, torch_type) = X
|
|
quant_min = torch.iinfo(torch_type).min
|
|
quant_max = torch.iinfo(torch_type).max
|
|
|
|
X = to_tensor(X, device)
|
|
scale = to_tensor(scale, device)
|
|
zero_point = torch.tensor(zero_point).to(dtype=torch.int64, device=device)
|
|
Y = _fake_quantize_per_channel_affine_reference(X.cpu(), scale.cpu(), zero_point.cpu(), axis, quant_min, quant_max)
|
|
Y_prime = torch.fake_quantize_per_channel_affine(
|
|
X, scale, zero_point, axis, quant_min, quant_max)
|
|
np.testing.assert_allclose(Y, Y_prime.cpu(), rtol=tolerance, atol=tolerance)
|
|
|
|
@no_deadline
|
|
@given(device=st.sampled_from(['cpu', 'cuda'] if torch.cuda.is_available() else ['cpu']),
|
|
X=hu.per_channel_tensor(shapes=hu.array_shapes(1, 5,),
|
|
qparams=hu.qparams(dtypes=torch.quint8)))
|
|
def test_backward_per_channel(self, device, X):
|
|
r"""Tests the backward method.
|
|
"""
|
|
np.random.seed(NP_RANDOM_SEED)
|
|
X, (scale, zero_point, axis, torch_type) = X
|
|
quant_min = torch.iinfo(torch_type).min
|
|
quant_max = torch.iinfo(torch_type).max
|
|
|
|
X = to_tensor(X, device)
|
|
scale = to_tensor(scale, device)
|
|
zero_point = torch.tensor(zero_point).to(dtype=torch.int64, device=device)
|
|
X.requires_grad_()
|
|
Y_prime = torch.fake_quantize_per_channel_affine(
|
|
X, scale, zero_point, axis, quant_min, quant_max)
|
|
dout = torch.rand(X.shape, dtype=torch.float).to(device)
|
|
dX = _fake_quantize_per_channel_affine_grad_reference(
|
|
dout, X, scale, zero_point, axis, quant_min, quant_max)
|
|
Y_prime.backward(dout)
|
|
np.testing.assert_allclose(dX.cpu().detach().numpy(), X.grad.cpu().detach().numpy(), rtol=tolerance, atol=tolerance)
|
|
|
|
@no_deadline
|
|
@given(device=st.sampled_from(['cpu', 'cuda'] if torch.cuda.is_available() else ['cpu']),
|
|
X=hu.per_channel_tensor(shapes=hu.array_shapes(1, 5,),
|
|
qparams=hu.qparams(dtypes=torch.quint8)))
|
|
@unittest.skip("temporarily disable the test")
|
|
def test_numerical_consistency_per_channel(self, device, X):
|
|
r"""Comparing numerical consistency between CPU quantize/dequantize op and the CPU fake quantize op
|
|
"""
|
|
np.random.seed(NP_RANDOM_SEED)
|
|
X, (scale, zero_point, axis, torch_type) = X
|
|
quant_min = torch.iinfo(torch_type).min
|
|
quant_max = torch.iinfo(torch_type).max
|
|
|
|
X = to_tensor(X, device)
|
|
scale = to_tensor(scale, device)
|
|
zero_point = torch.tensor(zero_point).to(dtype=torch.int64, device=device)
|
|
# quantize_linear and dequantize are only implemented in CPU
|
|
Y = torch.dequantize(torch.quantize_per_channel(X.cpu(), scale.cpu(), zero_point.cpu(), axis, torch_type))
|
|
Y_prime = torch.fake_quantize_per_channel_affine(
|
|
X, scale, zero_point, axis, quant_min, quant_max)
|
|
np.testing.assert_allclose(Y, Y_prime.cpu(), rtol=tolerance, atol=tolerance)
|
|
|
|
@no_deadline
|
|
@given(device=st.sampled_from(['cpu', 'cuda'] if torch.cuda.is_available() else ['cpu']),
|
|
X=hu.per_channel_tensor(shapes=hu.array_shapes(2, 5,),
|
|
qparams=hu.qparams(dtypes=torch.qint8)))
|
|
def test_fq_module(self, device, X):
|
|
np.random.seed(NP_RANDOM_SEED)
|
|
X, (scale, zero_point, axis, torch_type) = X
|
|
quant_min = torch.iinfo(torch_type).min
|
|
quant_max = torch.iinfo(torch_type).max
|
|
|
|
X = to_tensor(X, device)
|
|
X.requires_grad_()
|
|
fq_module = FakeQuantize(default_per_channel_weight_observer, quant_min, quant_max, ch_axis=axis).to(device)
|
|
Y_prime = fq_module(X)
|
|
assert fq_module.scale is not None
|
|
assert fq_module.zero_point is not None
|
|
Y = _fake_quantize_per_channel_affine_reference(X, fq_module.scale,
|
|
fq_module.zero_point, axis, quant_min, quant_max)
|
|
np.testing.assert_allclose(Y.cpu().detach().numpy(), Y_prime.cpu().detach().numpy(), rtol=tolerance, atol=tolerance)
|
|
|
|
# Test backward
|
|
dout = torch.rand(X.shape, dtype=torch.float, device=device)
|
|
Y_prime.backward(dout)
|
|
dX = _fake_quantize_per_channel_affine_grad_reference(dout, X, fq_module.scale,
|
|
fq_module.zero_point, axis, quant_min, quant_max)
|
|
np.testing.assert_allclose(dX.cpu().numpy(), X.grad.cpu().detach().numpy(), rtol=tolerance, atol=tolerance)
|
|
|
|
def test_fq_serializable(self):
|
|
observer = default_per_channel_weight_observer
|
|
quant_min = -128
|
|
quant_max = 127
|
|
fq_module = FakeQuantize(observer, quant_min, quant_max)
|
|
X = torch.tensor([[-5, -3.5, -2, 0, 3, 5, 7], [1, 3, 2, 5, 6.5, 8, 10]], dtype=torch.float32)
|
|
y_ref = fq_module(X)
|
|
state_dict = fq_module.state_dict()
|
|
self.assertEqual(state_dict['scale'], [0.054902, 0.078431])
|
|
self.assertEqual(state_dict['zero_point'], [0, 0])
|
|
b = io.BytesIO()
|
|
torch.save(state_dict, b)
|
|
b.seek(0)
|
|
loaded_dict = torch.load(b)
|
|
for key in state_dict:
|
|
self.assertEqual(state_dict[key], loaded_dict[key])
|
|
|
|
if __name__ == '__main__':
|
|
run_tests()
|