[Quant tool] Handle input models with pre-quantized weights (#22633)

### Description
Allows the QDQ quantizer to handle input models that already have some
pre-quantized weights. In this case, the qdq quantizer will properly
skip/handle the pre-quantized weights.

Also handles an operator (e.g., Conv) with a pre-quantized weight and a
float bias. The tool will read the pre-quantized weight's quantization
scale to compute the bias's scale (`bias_scale = input_scale *
weight_scale`).

Input model (pre-quantized Conv weight):

![image](https://github.com/user-attachments/assets/7d2626e4-49ad-47ae-bd0e-6339ac590435)

Output QDQ model (everything is quantized):

![image](https://github.com/user-attachments/assets/393804d3-f042-47bd-895f-3d667fb2ae94)


### Motivation and Context
Customers may use external tools to quantize some weights (e.g., int4
for Conv/MatMul). The qdq quantizer should still be able to quantize the
rest of the model (float weights and activations) in this case.
This commit is contained in:
Adrian Lizarraga 2024-11-14 13:48:46 -08:00 committed by GitHub
parent 562ddce270
commit 0733733307
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 342 additions and 11 deletions

View file

@ -19,7 +19,9 @@ except ImportError:
from .calibrate import TensorData
from .onnx_model import ONNXModel
from .quant_utils import (
DEQUANT_OP_NAME,
ONNX_TYPE_TO_NP_TYPE,
QUANT_OP_NAME,
TENSOR_NAME_QUANT_SUFFIX,
find_by_name,
model_has_infer_metadata,
@ -178,6 +180,9 @@ class BaseQuantizer:
if node.op_type not in self.op_types_to_quantize:
return False
if node.op_type in (DEQUANT_OP_NAME, QUANT_OP_NAME):
return False
if self.nodes_to_exclude is not None and node.name in self.nodes_to_exclude:
return False

View file

@ -195,7 +195,11 @@ class QDQQuantizer(BaseQuantizer):
# The default behavior is that multiple nodes can share a QDQ pair as their inputs.
# In TRT, QDQ pair can`t be shared between nodes, so it will create dedicated QDQ pairs for each node.
self.dedicated_qdq_pair = extra_options.get("DedicatedQDQPair", False)
self.tensor_to_its_receiving_nodes = {}
self.tensor_to_its_receiving_nodes: dict[str, list[onnx.NodeProto]] = {}
# Maps a tensor to the DequantizeLinear node (in the original input model) that outputs the tensor.
# Populated for input models with some pre-quantized weights (typically via a different tool).
self.tensor_to_producing_dq: dict[str, onnx.NodeProto] = {}
# Let user set channel axis for specific op type and it's effective only when per channel quantization is supported and per_channel is True.
self.qdq_op_type_per_channel_support_to_axis = extra_options.get("QDQOpTypePerChannelSupportToAxis", {})
@ -555,6 +559,9 @@ class QDQQuantizer(BaseQuantizer):
if tensor_name not in self.tensor_to_its_receiving_nodes:
self.tensor_to_its_receiving_nodes[tensor_name] = []
self.tensor_to_its_receiving_nodes[tensor_name].append(node)
if node.op_type == DEQUANT_OP_NAME:
for tensor_name in node.output:
self.tensor_to_producing_dq[tensor_name] = node
self.initializer_quant_params = self._calc_initializer_quant_params()
self._adjust_weight_quant_params_for_bias_tensors()
@ -958,6 +965,14 @@ class QDQQuantizer(BaseQuantizer):
if initializer:
self._add_qdq_nodes_for_initializer(initializer)
else:
# Check if this tensor is already a dequantized value. If so, skip it.
# This happens if the original input model already has some pre-quantized weights
# generated by a different tool.
# Ex: (quantized_weight -> DequantizeLinear -> this_tensor)
if tensor_name in self.tensor_to_producing_dq:
del self.tensors_to_quantize[tensor_name]
continue
tensor_qparam_initializers = self._make_tensor_scale_zp_initializers(tensor_name)
if not tensor_qparam_initializers:
raise ValueError(
@ -1009,6 +1024,12 @@ class QDQQuantizer(BaseQuantizer):
if self.is_input_a_initializer(tensor_name):
raise ValueError("Quantization parameter shared mode is not supported for weight yet")
if tensor_name in self.tensor_to_producing_dq:
raise ValueError(
f"Quantization parameter sharing is invalid for tensor {tensor_name} "
"because it has already been quantized"
)
# Need to check if this tensor's quant_type is converted for some consumers.
# If so, create new scale/zp initializers for these consumers.
converted_qparam_inits = None
@ -1147,6 +1168,30 @@ class QDQQuantizer(BaseQuantizer):
return True, axis
def _get_tensor_quantization_scale(self, tensor_name: str, consumer_node_name: str) -> np.ndarray | None:
"""
Returns the quantization scale of a tensor that is consumed by the given node.
:parameter tensor_name: The name of the tensor.
:parameter consumer_node_name: The name of the node that consumes the tensor as input. Necessary in case
the quantization type of the tensor was converted.
Refer: QDQQuantizer::_add_qdq_ops_for_converted_activation.
:returns: The quantization scale or None.
"""
initializers = self.model.initializer()
scale_initializer: onnx.TensorProto | None = None
if tensor_name in self.quantized_value_map:
# Tensor was quantized by this tool, so get scale from initializer created by this tool run.
scale_name = self.quantized_value_map[tensor_name].get_for_consumer(consumer_node_name).scale_name
scale_initializer = find_by_name(scale_name, initializers)
else:
# Tensor was already quantized in original model, so get scale from DQ node that outputs the tensor.
dq_node = self.tensor_to_producing_dq.get(tensor_name, None)
if dq_node:
scale_initializer = find_by_name(dq_node.input[1], initializers)
return tensor_proto_to_array(scale_initializer) if scale_initializer is not None else None
def quantize_bias_static(self, bias_name: str, bias_info: QDQBiasQuantInfo) -> str:
"""
Quantized the bias. Zero Point == 0 and Scale == Input_Scale * Weight_Scale
@ -1156,17 +1201,21 @@ class QDQQuantizer(BaseQuantizer):
if bias_name in self.quantized_value_map:
return self.quantized_value_map[bias_name].original.q_name
# get scale for weight
weight_scale_name = self.quantized_value_map[bias_info.weight_name].original.scale_name
weight_scale_initializer = find_by_name(weight_scale_name, self.model.initializer())
weight_scale = tensor_proto_to_array(weight_scale_initializer)
# get scale for weight.
weight_scale = self._get_tensor_quantization_scale(bias_info.weight_name, bias_info.node_name)
if weight_scale is None:
raise ValueError(
f"Unable to get valid quantization scale for weight input '{bias_info.weight_name}' "
f"when quantizing bias '{bias_name}' to int32."
)
# get scale for input
input_scale_name = (
self.quantized_value_map[bias_info.input_name].get_for_consumer(bias_info.node_name).scale_name
)
input_scale_initializer = find_by_name(input_scale_name, self.model.initializer())
input_scale = tensor_proto_to_array(input_scale_initializer)
# get scale for input.
input_scale = self._get_tensor_quantization_scale(bias_info.input_name, bias_info.node_name)
if input_scale is None:
raise ValueError(
f"Unable to get valid quantization scale for input '{bias_info.input_name}' "
f"when quantizing bias '{bias_name}' to int32."
)
(
quantized_bias_name,

View file

@ -20,10 +20,12 @@ from op_test_utils import (
check_op_type_count,
check_op_type_order,
create_clip_node,
get_tensor_consumers_and_producers,
)
from onnxruntime.quantization import QDQQuantizer, QuantFormat, QuantType, quantize_static, write_calibration_table
from onnxruntime.quantization.calibrate import CalibrationMethod, TensorData, TensorsData
from onnxruntime.quantization.quant_utils import quantize_nparray
class TestQDQFormat(unittest.TestCase):
@ -1925,5 +1927,280 @@ class TestAdjustWeightScaleForInt32Bias(unittest.TestCase):
self.assertEqual(len(bias_names), 2)
class TestQDQPrequantWeights(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._tmp_model_dir = tempfile.TemporaryDirectory(prefix="ort.qdq.prequant_weight")
# Note: swap with the commented line if you want to see the models in local test dir.
cls._tmp_dir_path = cls._tmp_model_dir.name
# cls._tmp_dir_path = "."
@classmethod
def tearDownClass(cls):
cls._tmp_model_dir.cleanup()
def build_conv_model(
self,
inp_shape: list[int],
weight_quant_data: np.ndarray,
weight_scale_data: np.ndarray,
weight_zp_data: np.ndarray,
bias_data: np.ndarray,
float_type: onnx.TensorProto.DataType = onnx.TensorProto.FLOAT,
):
"""
Builds a model with a Conv that has a pre-quantized constant weight input.
"""
input_0 = onnx.helper.make_tensor_value_info("input_0", float_type, inp_shape)
output_0 = onnx.helper.make_tensor_value_info("output_0", float_type, None)
weight_quant = onnx.numpy_helper.from_array(weight_quant_data, "weight_quant")
weight_scale = onnx.numpy_helper.from_array(weight_scale_data, "weight_scale")
weight_zp = onnx.numpy_helper.from_array(weight_zp_data, "weight_zp")
bias = onnx.numpy_helper.from_array(bias_data, "bias")
dq_node = onnx.helper.make_node(
"DequantizeLinear", ["weight_quant", "weight_scale", "weight_zp"], ["weight_dequant"], name="DQ0"
)
conv_node = onnx.helper.make_node("Conv", ["input_0", "weight_dequant", "bias"], ["output_0"], name="Conv0")
graph = onnx.helper.make_graph(
[dq_node, conv_node],
"ConvPreQuantWeight",
[input_0],
[output_0],
initializer=[weight_quant, weight_scale, weight_zp, bias],
)
opset_imports = [onnx.helper.make_opsetid("", 21)]
model = onnx.helper.make_model(graph, opset_imports=opset_imports)
return onnx.shape_inference.infer_shapes(model)
def build_conv_dynamic_weight_model(
self,
input_quant_data: np.ndarray,
input_scale_data: np.ndarray,
input_zp_data: np.ndarray,
weight_shape: list[int],
bias_data: np.ndarray,
float_type: onnx.TensorProto.DataType = onnx.TensorProto.FLOAT,
):
"""
Builds a model with a Conv that has a dynamic float weight input, but a constant
pre-quantized input[0].
"""
dyn_weight = onnx.helper.make_tensor_value_info("dyn_weight", float_type, weight_shape)
output_0 = onnx.helper.make_tensor_value_info("output_0", float_type, None)
input_quant = onnx.numpy_helper.from_array(input_quant_data, "input_quant")
input_scale = onnx.numpy_helper.from_array(input_scale_data, "input_scale")
input_zp = onnx.numpy_helper.from_array(input_zp_data, "input_zp")
bias = onnx.numpy_helper.from_array(bias_data, "bias")
dq_node = onnx.helper.make_node(
"DequantizeLinear", ["input_quant", "input_scale", "input_zp"], ["input_dequant"], name="DQ0"
)
conv_node = onnx.helper.make_node("Conv", ["input_dequant", "dyn_weight", "bias"], ["output_0"], name="Conv0")
graph = onnx.helper.make_graph(
[dq_node, conv_node],
"ConvPreQuantInput_DynamicWeight",
[dyn_weight],
[output_0],
initializer=[input_quant, input_scale, input_zp, bias],
)
opset_imports = [onnx.helper.make_opsetid("", 21)]
model = onnx.helper.make_model(graph, opset_imports=opset_imports)
return onnx.shape_inference.infer_shapes(model)
def test_quantize_with_prequantized_weights(self):
"""
Test quantization of Conv with pre-quantized weights.
"""
rng = np.random.default_rng(123)
test_configs = [onnx.TensorProto.FLOAT, onnx.TensorProto.FLOAT16]
for float_type in test_configs:
with self.subTest(float_type=float_type):
label = f"_{onnx.TensorProto.DataType.Name(float_type)}"
float_model_path = os.path.join(self._tmp_dir_path, f"conv.f32.prequant_weight{label}.onnx")
qdq_model_path = os.path.join(self._tmp_dir_path, f"conv.prequant_weight{label}.qdq.onnx")
inp_shape = [1, 2, 100, 100]
weight_shape = [2, 2, 20, 20]
np_dtype = onnx.helper.tensor_dtype_to_np_dtype(float_type)
# range = 2.0, scale = 2/254, zp = 0
weight_scale_data = np.array(2 / 254, dtype=np_dtype)
weight_zp_data = np.array(0, dtype=np.int8)
weight_data = np.linspace(-1.0, 1.0, num=1600, dtype=np_dtype).reshape(weight_shape)
weight_quant_data = quantize_nparray(
onnx.TensorProto.INT8, weight_data, weight_scale_data, weight_zp_data
)
bias_data = np.array([-10.0, 10.0], dtype=np_dtype)
float_model = self.build_conv_model(
inp_shape, weight_quant_data, weight_scale_data, weight_zp_data, bias_data, float_type
)
onnx.checker.check_model(float_model, True)
onnx.save_model(float_model, float_model_path)
# Check that the input model only has a pre-quantized weight and save its scale/zero-point
# to check that it doesn't change after quantization.
float_node_counts = {"QuantizeLinear": 0, "DequantizeLinear": 1}
check_op_type_count(self, float_model_path, **float_node_counts)
conv_node_original = next((node for node in float_model.graph.node if node.op_type == "Conv"), None)
self.assertNotEqual(conv_node_original, None)
_, producers_original = get_tensor_consumers_and_producers(float_model)
weight_dq_node_original = producers_original.get(conv_node_original.input[1], None)
initializers_original = {initializer.name: initializer for initializer in float_model.graph.initializer}
scale_name_original = weight_dq_node_original.input[1]
scale_val_original = onnx.numpy_helper.to_array(initializers_original[scale_name_original])
zp_name_original = weight_dq_node_original.input[2]
zp_val_original = onnx.numpy_helper.to_array(initializers_original[zp_name_original])
input_data_list = [
{"input_0": rng.uniform(-10.0, 10.0, inp_shape).astype(np_dtype)},
]
data_reader = TestDataFeeds(input_data_list)
quantize_static(
float_model_path,
qdq_model_path,
data_reader,
quant_format=QuantFormat.QDQ,
activation_type=QuantType.QUInt8,
weight_type=QuantType.QInt8,
op_types_to_quantize=["Conv"],
)
# The final model should have everything quantized
qdq_node_counts = {"QuantizeLinear": 2, "DequantizeLinear": 4}
check_op_type_count(self, qdq_model_path, **qdq_node_counts)
# Check that the pre-quantized weight still has the same scale/zp after quantization
qdq_model = onnx.load_model(qdq_model_path)
conv_node = next((node for node in qdq_model.graph.node if node.op_type == "Conv"), None)
self.assertNotEqual(conv_node, None)
_, producers = get_tensor_consumers_and_producers(qdq_model)
weight_dq_node = producers.get(conv_node.input[1], None)
initializers = {initializer.name: initializer for initializer in qdq_model.graph.initializer}
scale_name = weight_dq_node.input[1]
self.assertEqual(scale_name, scale_name_original)
scale_val = onnx.numpy_helper.to_array(initializers[scale_name])
self.assertEqual(scale_val, scale_val_original)
zp_name = weight_dq_node.input[2]
self.assertEqual(zp_name, zp_name_original)
zp_val = onnx.numpy_helper.to_array(initializers[zp_name])
self.assertEqual(zp_val, zp_val_original)
def test_quantize_with_prequantized_input(self):
"""
Test quantization of Conv with pre-quantized input and dynamic weight.
"""
rng = np.random.default_rng(123)
test_configs = [
(onnx.TensorProto.FLOAT, False),
(onnx.TensorProto.FLOAT16, False),
(onnx.TensorProto.FLOAT, True),
(onnx.TensorProto.FLOAT16, True),
]
for float_type, convert_weight_qtype in test_configs:
with self.subTest(float_type=float_type):
convert_label = "_convert_qtype" if convert_weight_qtype else ""
label = f"_{onnx.TensorProto.DataType.Name(float_type)}{convert_label}"
float_model_path = os.path.join(self._tmp_dir_path, f"conv.f32.prequant_input{label}.onnx")
qdq_model_path = os.path.join(self._tmp_dir_path, f"conv.prequant_input{label}.qdq.onnx")
inp_shape = [1, 2, 40, 40]
weight_shape = [2, 2, 20, 20]
np_dtype = onnx.helper.tensor_dtype_to_np_dtype(float_type)
# range = 3.0, scale = 3/255, zp = 127
input_scale_data = np.array(3 / 255, dtype=np_dtype)
input_zp_data = np.array(127, dtype=np.uint8)
input_data = np.linspace(-1.5, 1.5, num=3200, dtype=np_dtype).reshape(inp_shape)
input_quant_data = quantize_nparray(onnx.TensorProto.UINT8, input_data, input_scale_data, input_zp_data)
bias_data = np.array([-10.0, 10.0], dtype=np_dtype)
float_model = self.build_conv_dynamic_weight_model(
input_quant_data, input_scale_data, input_zp_data, weight_shape, bias_data, float_type
)
onnx.checker.check_model(float_model, True)
onnx.save_model(float_model, float_model_path)
# Check that the input model only has a pre-quantized input and save its scale/zero-point
# to check that it doesn't change after quantization.
float_node_counts = {"QuantizeLinear": 0, "DequantizeLinear": 1}
check_op_type_count(self, float_model_path, **float_node_counts)
conv_node_original = next((node for node in float_model.graph.node if node.op_type == "Conv"), None)
self.assertNotEqual(conv_node_original, None)
_, producers_original = get_tensor_consumers_and_producers(float_model)
input_dq_node_original = producers_original.get(conv_node_original.input[0], None)
initializers_original = {initializer.name: initializer for initializer in float_model.graph.initializer}
scale_name_original = input_dq_node_original.input[1]
scale_val_original = onnx.numpy_helper.to_array(initializers_original[scale_name_original])
zp_name_original = input_dq_node_original.input[2]
zp_val_original = onnx.numpy_helper.to_array(initializers_original[zp_name_original])
# Create data reader with random input calibration data.
dyn_weight_data_list = [
{"dyn_weight": rng.uniform(-10.0, 10.0, weight_shape).astype(np_dtype)},
]
data_reader = TestDataFeeds(dyn_weight_data_list)
extra_options = {}
if convert_weight_qtype:
# Test converting the dynamic weight's quantization type, which results in
# dyn_weight -> Q(u16) -> DQ(f32) -> Q(u8) -> DQ(f32) -> Conv
extra_options["TensorQuantOverrides"] = {
"dyn_weight": [{"quant_type": QuantType.QUInt16, "convert": {"quant_type": QuantType.QUInt8}}],
}
quantize_static(
float_model_path,
qdq_model_path,
data_reader,
quant_format=QuantFormat.QDQ,
activation_type=QuantType.QUInt8,
weight_type=QuantType.QInt8,
op_types_to_quantize=["Conv"],
extra_options=extra_options,
)
# The final model should have everything quantized
qdq_node_counts = {"QuantizeLinear": 2, "DequantizeLinear": 4}
if convert_weight_qtype:
qdq_node_counts["QuantizeLinear"] += 1
qdq_node_counts["DequantizeLinear"] += 1
check_op_type_count(self, qdq_model_path, **qdq_node_counts)
# Check that the pre-quantized input still has the same scale/zp after quantization
qdq_model = onnx.load_model(qdq_model_path)
conv_node = next((node for node in qdq_model.graph.node if node.op_type == "Conv"), None)
self.assertNotEqual(conv_node, None)
_, producers = get_tensor_consumers_and_producers(qdq_model)
input_dq_node = producers.get(conv_node.input[0], None)
initializers = {initializer.name: initializer for initializer in qdq_model.graph.initializer}
scale_name = input_dq_node.input[1]
self.assertEqual(scale_name, scale_name_original)
scale_val = onnx.numpy_helper.to_array(initializers[scale_name])
self.assertEqual(scale_val, scale_val_original)
zp_name = input_dq_node.input[2]
self.assertEqual(zp_name, zp_name_original)
zp_val = onnx.numpy_helper.to_array(initializers[zp_name])
self.assertEqual(zp_val, zp_val_original)
if __name__ == "__main__":
unittest.main()