diff --git a/onnxruntime/python/tools/quantization/onnx_quantizer.py b/onnxruntime/python/tools/quantization/onnx_quantizer.py index ab58143e9c..c1c2248bc8 100644 --- a/onnxruntime/python/tools/quantization/onnx_quantizer.py +++ b/onnxruntime/python/tools/quantization/onnx_quantizer.py @@ -111,6 +111,7 @@ class ONNXQuantizer: self.is_activation_symmetric = ( False if "ActivationSymmetric" not in self.extra_options else self.extra_options["ActivationSymmetric"] ) + self.min_real_range = self.extra_options.get("MinimumRealRange") self.activation_qType = getattr(activation_qType, "tensor_type", activation_qType) self.weight_qType = getattr(weight_qType, "tensor_type", weight_qType) @@ -998,6 +999,7 @@ class ONNXQuantizer: qType, self.is_weight_symmetric, self.reduce_range and reduce_range, + self.min_real_range, ) if qType in { @@ -1087,6 +1089,7 @@ class ONNXQuantizer: self.is_weight_symmetric or weight_qType in (onnx_proto.TensorProto.INT8, onnx_proto.TensorProto.FLOAT8E4M3FN), self.reduce_range and reduce_range, + self.min_real_range, ) rmin_list.append(rmin) rmax_list.append(rmax) @@ -1208,7 +1211,9 @@ class ONNXQuantizer: rmin, rmax = td.range_value qmin, qmax = get_qmin_qmax_for_qType(self.activation_qType, symmetric=self.is_activation_symmetric) - zero, scale = compute_scale_zp(rmin, rmax, qmin, qmax, self.is_activation_symmetric) + zero, scale = compute_scale_zp( + rmin, rmax, qmin, qmax, self.is_activation_symmetric, self.min_real_range + ) quantization_params[tensor_name] = QuantizationParams(zero_point=zero, scale=scale) return quantization_params diff --git a/onnxruntime/python/tools/quantization/quant_utils.py b/onnxruntime/python/tools/quantization/quant_utils.py index 739e399042..8825d78993 100644 --- a/onnxruntime/python/tools/quantization/quant_utils.py +++ b/onnxruntime/python/tools/quantization/quant_utils.py @@ -184,7 +184,7 @@ def quantize_nparray(qType, arr, scale, zero_point, low=None, high=None): return arr_fp32.astype(dtype) -def compute_scale_zp(rmin, rmax, qmin, qmax, symmetric=False): +def compute_scale_zp(rmin, rmax, qmin, qmax, symmetric=False, min_real_range=None): """Calculate the scale s and zero point z for the quantization relation r = s(q-z), where r are the original values and q are the corresponding quantized values. @@ -199,6 +199,8 @@ def compute_scale_zp(rmin, rmax, qmin, qmax, symmetric=False): :parameter rmax: maximum value of r :parameter qmin: minimum value representable by the target quantization data type :parameter qmax: maximum value representable by the target quantization data type + :parameter symmetric: True if the floating-point range should be made symmetric. Defaults to False. + :parameter min_real_range: Minimum floating-point range (i.e., rmax - rmin) to enforce. Defaults to None. :return: zero and scale [z, s] """ @@ -211,6 +213,10 @@ def compute_scale_zp(rmin, rmax, qmin, qmax, symmetric=False): rmin = min(rmin, 0) rmax = max(rmax, 0) + # Ensure a minimum float-point range if specified. + if min_real_range is not None: + rmax = max(rmax, rmin + min_real_range) + if symmetric: absmax = max(abs(rmin), abs(rmax)) rmin = -absmax @@ -254,11 +260,13 @@ def compute_scale_zp_float8(element_type, std): return [zero, scale] -def quantize_data(data, qType, symmetric, reduce_range=False): +def quantize_data(data, qType, symmetric, reduce_range=False, min_real_range=None): """ :param data: data to quantize :param qType: data type to quantize to. Supported types UINT8 and INT8 :param symmetric: whether symmetric quantization is used or not. This is applied to INT8. + :parameter reduce_range: True if the quantization range should be reduced. Defaults to False. + :parameter min_real_range: Minimum floating-point range (i.e., rmax - rmin) to enforce. Defaults to None. :return: minimum, maximum, zero point, scale, and quantized weights To pack weights, we compute a linear transformation @@ -301,7 +309,7 @@ def quantize_data(data, qType, symmetric, reduce_range=False): if qType in (TensorProto.INT8, TensorProto.UINT8, TensorProto.INT16, TensorProto.UINT16): if len(data): qmin, qmax = get_qmin_qmax_for_qType(qType, reduce_range, symmetric=symmetric) - zero_point, scale = compute_scale_zp(rmin, rmax, qmin, qmax, symmetric) + zero_point, scale = compute_scale_zp(rmin, rmax, qmin, qmax, symmetric, min_real_range) quantized_data = quantize_nparray(qType, numpy.asarray(data), scale, zero_point) return rmin, rmax, zero_point, scale, quantized_data diff --git a/onnxruntime/python/tools/quantization/quantize.py b/onnxruntime/python/tools/quantization/quantize.py index 0fdd64fdd3..c9e9a92e2a 100644 --- a/onnxruntime/python/tools/quantization/quantize.py +++ b/onnxruntime/python/tools/quantization/quantize.py @@ -370,6 +370,12 @@ def quantize_static( `com.microsoft` domain, which forces use of ONNX Runtime's QuantizeLinear and DequantizeLinear contrib op implementations. The contrib op implementations may support features not standardized into the ONNX specification (e.g., 16-bit quantization types). + MinimumRealRange = float|None : + Default is None. If set to a floating-point value, the calculation of the quantization parameters + (i.e., scale and zero point) will enforce a minimum range between rmin and rmax. If (rmax - rmin) + is less than the specified minimum range, rmax will be set to rmin + MinimumRealRange. This is + necessary for EPs like QNN that require a minimum floating-point range when determining + quantization parameters. """ if activation_type == QuantType.QFLOAT8E4M3FN or weight_type == QuantType.QFLOAT8E4M3FN: if calibrate_method != CalibrationMethod.Distribution: diff --git a/onnxruntime/test/python/quantization/test_minimum_real_range_option.py b/onnxruntime/test/python/quantization/test_minimum_real_range_option.py new file mode 100644 index 0000000000..77f95ab903 --- /dev/null +++ b/onnxruntime/test/python/quantization/test_minimum_real_range_option.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +import unittest + +import numpy as np +import onnx +from onnx import TensorProto, helper, numpy_helper + +from onnxruntime import quantization + + +class TestMinimumRealRangeOption(unittest.TestCase): + def setUp(self): + self.qdq_model_name = "model_qdq_u8.onnx" + + # Set up activations/weights with zero value ranges (i.e., rmax - rmax == 0). + self.zero_range_activations = [ + np.zeros([1, 2, 32, 32], dtype="float32"), + ] + + self.zero_range_weights = np.zeros([1, 2, 2, 2], dtype="float32") + + def perform_quantization(self, activations, weight, min_real_range): + # One-layer convolution model to be quantized with uint8 activations and uint8 weights. + act = helper.make_tensor_value_info("ACT", TensorProto.FLOAT, activations[0].shape) + helper.make_tensor_value_info("WGT", TensorProto.FLOAT, weight.shape) + res = helper.make_tensor_value_info("RES", TensorProto.FLOAT, [None, None, None, None]) + wgt_init = numpy_helper.from_array(weight, "WGT") + conv_node = onnx.helper.make_node("Conv", ["ACT", "WGT"], ["RES"]) + graph = helper.make_graph([conv_node], "test", [act], [res], initializer=[wgt_init]) + model = helper.make_model(graph, opset_imports=[helper.make_opsetid("", 11)]) + onnx.save(model, "model.onnx") + + # Quantize model + class DummyDataReader(quantization.CalibrationDataReader): + def __init__(self): + self.iterator = ({"ACT": act} for act in activations) + + def get_next(self): + return next(self.iterator, None) + + quantization.quantize_static( + model_input="model.onnx", + model_output=self.qdq_model_name, + calibration_data_reader=DummyDataReader(), + quant_format=quantization.QuantFormat.QDQ, + activation_type=quantization.QuantType.QUInt8, + weight_type=quantization.QuantType.QUInt8, + op_types_to_quantize=["Conv"], + extra_options={"MinimumRealRange": min_real_range}, + ) + + # Extract quantization parameters: scales and zero points for activations and weights. + model = onnx.load(self.qdq_model_name) + act_zp = next(init for init in model.graph.initializer if init.name == "ACT_zero_point").int32_data[0] + act_sc = next(init for init in model.graph.initializer if init.name == "ACT_scale").float_data[0] + wgt_zp = next(init for init in model.graph.initializer if init.name == "WGT_zero_point").int32_data[0] + wgt_sc = next(init for init in model.graph.initializer if init.name == "WGT_scale").float_data[0] + + # Return quantization parameters + return act_zp, act_sc, wgt_zp, wgt_sc + + def test_default(self): + """ + Test default behavior without specifying the MinimumRealRange option. + """ + act_zp, act_sc, wgt_zp, wgt_sc = self.perform_quantization( + self.zero_range_activations, + self.zero_range_weights, + min_real_range=None, # default behavior + ) + + # No minimum real range is set. Expect default behavior (scale = 1.0, zp = 0) + self.assertEqual(act_zp, 0) + self.assertEqual(act_sc, 1.0) + self.assertEqual(wgt_zp, 0) + self.assertEqual(wgt_sc, 1.0) + + def test_min_real_range(self): + """ + Test a MinimumRealRange value of 0.0001. + """ + min_real_range = 0.0001 + + act_zp, act_sc, wgt_zp, wgt_sc = self.perform_quantization( + self.zero_range_activations, + self.zero_range_weights, + min_real_range=min_real_range, + ) + + expected_scale = np.float32(min_real_range / 255) + + # Minimum floating-point range is set. Expect small scale values. + self.assertEqual(act_zp, 0) + self.assertEqual(act_sc, expected_scale) + self.assertEqual(wgt_zp, 0) + self.assertEqual(wgt_sc, expected_scale) + + +if __name__ == "__main__": + unittest.main() diff --git a/onnxruntime/test/python/quantization/test_quant_util.py b/onnxruntime/test/python/quantization/test_quant_util.py index 6efa279393..65cdff025b 100644 --- a/onnxruntime/test/python/quantization/test_quant_util.py +++ b/onnxruntime/test/python/quantization/test_quant_util.py @@ -33,6 +33,18 @@ class TestQuantUtil(unittest.TestCase): self.assertEqual(compute_scale_zp(-tiny_float, tiny_float, 0, 255, symmetric=True), [0, 1.0]) self.assertEqual(compute_scale_zp(-tiny_float, 0.0, 0, 255, symmetric=False), [0, 1.0]) + # Test enforcing a minimum floatint-point range. + self.assertEqual(compute_scale_zp(0.0, 0.0, 0, 255, symmetric=False, min_real_range=0.0001), [0, 0.0001 / 255]) + self.assertEqual( + compute_scale_zp(0.0, 0.0, -128, 127, symmetric=True, min_real_range=0.0001), [0, 0.0002 / 255] + ) + self.assertEqual( + compute_scale_zp(0.0, 0.0, 0, 65535, symmetric=False, min_real_range=0.0001), [0, 0.0001 / 65535] + ) + self.assertEqual( + compute_scale_zp(0.0, 0.0, -32768, 32767, symmetric=True, min_real_range=0.0001), [0, 0.0002 / 65535] + ) + def test_load_external_model(self): input_name = "input" output_name = "output"