Quantization tool improvement (#4933)

Improve quantization tools:
1. Support QAT
2. Make quantization tool to register Operators.
3. Make the API clear to use

Co-authored-by: t-yguo <t-yguo@microsoft.com>
This commit is contained in:
Yufeng Li 2020-09-01 09:07:46 -07:00 committed by GitHub
parent 464bbd27a9
commit ffc2b25a3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 2393 additions and 1708 deletions

View file

@ -204,8 +204,9 @@ file(GLOB onnxruntime_python_tools_featurizers_src CONFIGURE_DEPENDS
file(GLOB onnxruntime_python_quantization_src CONFIGURE_DEPENDS
"${ONNXRUNTIME_ROOT}/python/tools/quantization/*.py"
)
list(REMOVE_ITEM onnxruntime_python_quantization_src
"${ONNXRUNTIME_ROOT}/python/tools/quantization/test_calibrate.py")
file(GLOB onnxruntime_python_quantization_operators_src CONFIGURE_DEPENDS
"${ONNXRUNTIME_ROOT}/python/tools/quantization/operators/*.py"
)
file(GLOB onnxruntime_python_datasets_srcs CONFIGURE_DEPENDS
"${ONNXRUNTIME_ROOT}/python/datasets/*.py"
)
@ -225,6 +226,7 @@ add_custom_command(
COMMAND ${CMAKE_COMMAND} -E make_directory $<TARGET_FILE_DIR:${test_data_target}>/onnxruntime/tools
COMMAND ${CMAKE_COMMAND} -E make_directory $<TARGET_FILE_DIR:${test_data_target}>/onnxruntime/tools/featurizer_ops
COMMAND ${CMAKE_COMMAND} -E make_directory $<TARGET_FILE_DIR:${test_data_target}>/onnxruntime/quantization
COMMAND ${CMAKE_COMMAND} -E make_directory $<TARGET_FILE_DIR:${test_data_target}>/onnxruntime/quantization/operators
COMMAND ${CMAKE_COMMAND} -E copy
${ONNXRUNTIME_ROOT}/__init__.py
$<TARGET_FILE_DIR:${test_data_target}>/onnxruntime/
@ -267,6 +269,9 @@ add_custom_command(
COMMAND ${CMAKE_COMMAND} -E copy
${onnxruntime_python_quantization_src}
$<TARGET_FILE_DIR:${test_data_target}>/onnxruntime/quantization/
COMMAND ${CMAKE_COMMAND} -E copy
${onnxruntime_python_quantization_operators_src}
$<TARGET_FILE_DIR:${test_data_target}>/onnxruntime/quantization/operators/
COMMAND ${CMAKE_COMMAND} -E copy
${REPO_ROOT}/VERSION_NUMBER
$<TARGET_FILE_DIR:${test_data_target}>

View file

@ -10,13 +10,11 @@ from PIL import Image
import onnx
import onnxruntime
from onnx import helper, TensorProto, numpy_helper
from quantize import quantize, QuantizationMode
from calibrate import calibrate
from calibrate import CalibrationDataReader
from onnxruntime.quantization import quantize_static, calibrate, CalibrationDataReader
class ResNet50DataReader(CalibrationDataReader):
def __init__(self,calibration_image_folder,augmented_model_path='augmented_model.onnx'):
def __init__(self, calibration_image_folder, augmented_model_path='augmented_model.onnx'):
self.image_folder = calibration_image_folder
self.augmented_model_path = augmented_model_path
self.preprocess_flag = True
@ -27,12 +25,12 @@ class ResNet50DataReader(CalibrationDataReader):
if self.preprocess_flag:
self.preprocess_flag = False
session = onnxruntime.InferenceSession(self.augmented_model_path, None)
(_,height,width,_) = session.get_inputs()[0].shape
nhwc_data_list = preprocess_func(self.image_folder,height,width,size_limit = 0)
(_, height, width, _) = session.get_inputs()[0].shape
nhwc_data_list = preprocess_func(self.image_folder, height, width, size_limit=0)
input_name = session.get_inputs()[0].name
self.datasize = len(nhwc_data_list)
self.enum_data_dicts = iter([{input_name:nhwc_data_list[i]} for i in range(self.datasize)])
return next(self.enum_data_dicts,None)
self.datasize = len(nhwc_data_list)
self.enum_data_dicts = iter([{input_name: nhwc_data_list[i]} for i in range(self.datasize)])
return next(self.enum_data_dicts, None)
def preprocess_func(images_folder, height, width, size_limit=0):
@ -64,18 +62,13 @@ def preprocess_func(images_folder, height, width, size_limit=0):
def main():
model_path = './resnet50_v1.onnx'
calibration_dataset_path = './calibration_data_set'
dr = ResNet50DataReader(calibration_dataset_path)
#call calibrate to generate quantization dictionary containing the zero point and scale values
quantization_params_dict = calibrate(model_path,dr)
calibrated_quantized_model = quantize(onnx.load(model_path),
quantization_mode=QuantizationMode.QLinearOps,
force_fusions=True,
quantization_params=quantization_params_dict)
input_model_path = './resnet50_v1.onnx'
output_model_path = './calibrated_quantized_model.onnx'
onnx.save(calibrated_quantized_model, output_model_path)
calibration_dataset_path = './test_images'
dr = ResNet50DataReader(calibration_dataset_path)
quantize_static(input_model_path, output_model_path, dr)
print('Calibrated and quantized model saved.')
if __name__ == '__main__':
main()
main()

View file

@ -1,9 +1,6 @@
# Quantization and Calibration Tools
# Quantization Tools
Quantization in ORT refers to 8 bit linear quantization of an onnx model. There are 2 tools which aid converting an onnx model to an onnx quantized model.
* Quantization Tool
* Calibration Tool
Quantization in ORT refers to 8 bit linear quantization of an onnx model.
## Quantization specifics
During quantization the floating point real values are mapped to an 8 bit quantization space and it is of the form :
@ -23,29 +20,27 @@ Quantization in ORT refers to 8 bit linear quantization of an onnx model. There
Zero point represents zero in quantization space. It is important that floating point zero value be exactly representable in quantization space. This is because in lot of CNNs, zero padding is used and if after quantization it is not possible to represent 0 uniquely then it will lead to accuracy errors.
## Quantizing an onnx model
There are 2 ways of quantizing a model
There are 3 ways of quantizing a model: dynamic, static and auantize-aware training quantization.
* Only use quantization : This method assumes the model owner is going to use Integer Ops for quantization or has pre calculated the quantization params as they are required inputs for using QLinear Ops
* Dynamic quantization : This method calculates the quantization parameter (scale and zero point) for activations dynamically.
ONNX Model ---> quantize.py ---> ONNX Quantized Model
* Static quantization: It leverages the calibration data to calculates the quantization parameter of activations.
* Use both calibration and quantization : This method is preferred when using QLinear Ops for quantization.
ONNX Mode --> calibrate.py --> quantize.py --> ONNX Quantized model
Today ORT does not guarantee support for E2E model quantization, meaning since not all ONNX ops have support for 8 bit data types therefore only the supported ops in the model are quantized. For rest of the ops inputs are reconverted to FP32.
* Quantize-Aware training quantization. The quantization parameter of activation are calculated while training, and the training process can control activation to a certain range.
### List of Supported Quantized Ops:
The following ops were chosen as phase 1 ops because in most of the CNN models these ops consume most amount of compute and power and therefore there is benefit in quantizing these ops to get perf benefits.
* Convolution
* Matmul
* Data type agnostic ops like transpose, identity etc. ( Note: special quantization is not done for these ops.)
* Conv
* MatMul
* MaxPool
* Relu
* Clip
* Add (Experimental)
* Mul (Experimental)
### Quantization and model opset versions
Quantization is fairly new in ONNX and ONNXRuntime. Quantization ops were introduced in ONNX opset version 10. Therefore it is important that the model which is being quantized be opset 10 or higher. In case the model opset version is < 10 then it is recommended that the model should be reconverted to ONNX from its original framework using the latest opset.
Quantization tool displays a warning when the model opset version is < 10 and still goes ahead and quantizes the model and at the end changes the opset version to 10. It is the responsibility of the model owner to run model checker and make sure the model is valid. If the model is not valid then use the above recommended way i.e. reconvert the model from original framework.
### Quantization and Graph Optimization
Please note quantization and graph optimizations may not always work together.
@ -57,150 +52,70 @@ Same goes the other way round. After quantizing a model some graph optimizations
It is advised that the model owner be aware of this and run perf evaluations to understand which technique gives the best performance for their model.
## Quantization tool
quantize() takes a model in ModelProto format and returns the quantized model in ModelProto format.
### Various quantization modes
Default is set to QuantizationMode.IntegerOps with dynamic input quantization.
- **QuantizationMode.IntegerOps with static input quantization**:
Quantize using integer ops. Inputs/activations are quantized using static scale and zero point values which are specified through "quantization_params" option.
```python
quantized_model = quantize(model, quantization_mode=QuantizationMode.IntegerOps,
static=True,
quantization_params={
'input_1': [np.uint8(113), np.float32(0.05)]
})
```
- **QuantizationMode.IntegerOps with dynamic input quantization**:
Quantize using integer ops. Inputs/activations are quantized using dynamic scale and zero point values which are computed while running the model. This is the default quantization mode.
```python
quantized_model = quantize(model, quantization_mode=QuantizationMode.IntegerOps, static=False)
```
- **QuantizationMode.QLinearOps with static input quantization**:
Quantize using QLinear ops. Inputs/activations are quantized using static scale and zero point values which are specified through "quantization_params" option.
```python
quantized_model = quantize(model, quantization_mode=QuantizationMode.QLinearOps,
static=True,
quantization_params={
'input_1': [np.uint8(113), np.float32(0.05)]
'output_1': [np.uint8(113), np.float32(0.05)]
})
```
- **QuantizationMode.QLinearOps with dynamic input quantization**:
Quantize using QLinear ops. Inputs/activations are quantized using dynamic scale and zero point values which are computed while running the model.
Output scale and zero point values have to be specified using "quantization_params" option.
```python
quantized_model = quantize(model, quantization_mode=QuantizationMode.QLinearOps,
static=False,
quantization_params={
'output_1': [np.uint8(113), np.float32(0.05)]
})
```
## Quantization API
Quantization has 3 main APIs quantize_dynamic, quantize_static, and quantize_qat, which corresponds to dynamic quantization, static quantization and quantize-aware training quantization respectively.
### Options
See below for a description of all the options to quantize():
See below for a description of the common options to quantize_dynamic, quantize_static and quantize_qat:
- **model**: ModelProto to quantize
- **model_input**:
-
file path of model to quantize
- **model_output**:
-
file path of model to quantize
- **op_types_to_quantize**: *defalut: []
-
specify the types of operators to quantize, like ['Conv'] to quantize Conv only. It quantizes all supported operators by default.
- **per_channel**: *default: False*
-
If True, weights of Conv nodes are quantized per output channel.
If False, they are quantized per tensor. Refer [QLinearConv](https://github.com/onnx/onnx/blob/master/docs/Operators.md#qlinearconv) for more information.
- **nbits**: *default: 8*
Number of bits to represent quantized data. Currently only nbits=8 is supported.
- **quantization_mode**: *default: QuantizationMode.IntegerOps*
*QuantizationMode.IntegerOps*: Quantize using integer ops. Only [ConvInteger](https://github.com/onnx/onnx/blob/master/docs/Operators.md#ConvInteger) and [MatMulInteger](https://github.com/onnx/onnx/blob/master/docs/Operators.md#MatMulInteger) ops are supported now.
*QuantizationMode.QLinearOps*: Quantize using QLinear ops. Only [QLinearConv](https://github.com/onnx/onnx/blob/master/docs/Operators.md#qlinearconv) and [QLinearMatMul](https://github.com/onnx/onnx/blob/master/docs/Operators.md#QLinearMatMul) ops are supported now.
- **static**: *default:False*
If True, the inputs/activations are quantized using static scale and zero point values specified through quantization_params.
If False, the inputs/activations are quantized using dynamic scale and zero point values computed while running the model.
- **asymmetric_input_types**: *default: False*
If True, weights are quantized into signed integers and inputs/activations into unsigned integers.
If False, weights and inputs/activations are quantized into unsigned integers.
- **force_fusions**: *default: False*
If True, nodes added for dynamic quantization are fused.
If False, no fusion is applied for nodes which are added for dynamic quantization.
This optimization is available from opset 11.
- **quantization_params**: *default: None*
Dictionary to specify the zero point and scale values for inputs to and outputs from conv and matmul nodes.
Should be specified when static is set to True.
The quantization_params should be specified in the following format:
{
"input_name": [zero_point, scale]
}.
zero_point should be of type np.uint8 and scale should be of type np.float32.
example:
{
'resnet_model/Relu_1:0': [np.uint8(0), np.float32(0.019539741799235344)],
'resnet_model/Relu_2:0': [np.uint8(0), np.float32(0.011359662748873234)]
}
- **nodes_to quantize**: *default: None*
- **activation_type**: *defalut: QuantType.QUInt8*
-
quantization data type of activation. It can be QuantType.QInt8 or QuantType.QUInt8
- **weight_type**: *defalut: QuantType.QUInt8*
-
quantization data type of weight. It can be QuantType.QInt8 or QuantType.QUInt8
- **nodes_to_quantize**: *default: []*
-
List of nodes names to quantize. When this list is not None only the nodes in this list
are quantized.
example:
[
'Conv__224',
'Conv__252'
]
are quantized.
example:
[
'Conv__224',
'Conv__252'
]
- **nodes_to_exclude**: *default: []*
-
List of nodes names to exclude. The nodes in this list will be excluded from quantization
when it is not None.
### Example - Quantize an ONNX Model
In addition, user needs to provide an implementation of CalibrationDataReader for quantize_static CalibrationDataReader takes in the calibration data and generates input of the model
### Example
- Dynamic quantization
```python
import onnx
from quantize import quantize, QuantizationMode
from onnxruntime.quantization import quantize_dynamic, QuantType
# Load the onnx model
model = onnx.load('path/to/the/model.onnx')
# Quantize
quantized_model = quantize(model, quantization_mode=QuantizationMode.IntegerOps)
# Save the quantized model
onnx.save(quantized_model, 'path/to/the/quantized_model.onnx')
model_fp32 = 'path/to/the/model.onnx'
model_quant = 'path/to/the/model.quant.onnx'
quantized_model = quantize_dynamic(model_fp32, model_quant, weight_type=QuantType.QUInt8)
```
## Calibration tool
Calibration can be used to improve quantization, adding reduced-precision computation for neural networks while retaining high accuracy without retraining.
- QAT quantization
```python
import onnx
from onnxruntime.quantization import quantize_qat, QuantType
Calibration uses a small data set representative of the original data set to calculate quantization thresholds. To calculate the quantization thresholds it updates the original onnx model by adding `ReduceMin` and `ReduceMax` nodes to all the nodes which are candidates for quantization (Today this is applicable for `Conv` and `MatMul` nodes). It then runs through the calibration datasets to gather these outputs and finally calculates the quantization thresholds. These are then passed as inputs to quantize.py for quantizing the model.
model_fp32 = 'path/to/the/model.onnx'
model_quant = 'path/to/the/model.quant.onnx'
quantized_model = quantize_qat(model_fp32, model_quant)
```
### Options
- Static quantization
See below for a description of all the options to calibrate():
- **model_path**: Path to the original FP32 model
- **data_reader**: User-implemented object to read in and preprocess calibration dataset based on CalibrationDataReader interface, which takes in `calibration_image_data` and can generate the next input data dictionary for ONNXinferencesession run.
- **op_types**: Operator types to be calibrated and quantized, *default = 'Conv,MatMul'*
- **black_nodes**: Operator names that should not be calibrated and quantized, *default = ''*
- **white_nodes**: Operator names that force to be calibrated and quantized, *default = ''*
- **augmented_model_path**: Path to save the augmented_model.
### End-to-end example
This is an E2E example to demonstrate calibration, quantization and accuracy testing for a ResNet50 model. As discussed above, if you want to use the quantization tool only, please follow the example above in `Quantization Tool` section.
We leverage the instructions as the following:
* Download the model : Download the [resnet50_v1](./E2E_example_model/resnet50_v1.onnx).
* Install latest versions of ONNX and ONNXRuntime.
* Download the test calibration data set:
* A `calibration_data_set_test` folder is included under `./E2E_example_model`. It is used as the test calibration data set for this E2E example.
* Run the E2E example. [e2e_example](./E2E_example_model/e2e_user_example.py).
* `ResNet50DataReader`is implemented based on `CalibrationDataReader` interface and it's used specifically for reading in the image data for ResNet50.`preprocess_func` is used by `ResNet50DataReader`to load and preprocess the image data.
- *preprocess_func*: resizes and normalizes image to NHWC format, in a [technique used by mlperf 0.5](https://github.com/mlperf/inference/blob/master/v0.5/classification_and_detection/python/dataset.py#L250) for variants of ResNet.
- Alternatively, if user wants to accept preprocessed tensors in .pb format. Refer to [this article](https://github.com/onnx/onnx/blob/master/docs/PythonAPIOverview.md#manipulating-tensorproto-and-numpy-array) to understand how to hop between numpy arrays and tensorproto and write corresponding preprocess function.
* Run the calibration tool:
```
python3 e2e_user_example.py
```
* After successfuly running the E2E example, a `calibrated_quantized_model` will be saved. (The `quantization_mode` used here is QLinear Ops.)
* Setup and run mlperf accuracy tests : Now that quantized model is ready run the accuracy tests using the mlperf accuracy benchmarks.
* Set up the [mlperf benchmark](https://github.com/mlperf/inference/tree/master/v0.5/classification_and_detection#prerequisites-and-installation)
* Run accuracy test : For example
```
./run_local.sh onnxruntime resnet50 --accuracy --count 5000
```
Please refer to ./E2E_example_model for an example of static quantization.

View file

@ -1,2 +1,5 @@
from .quantize import quantize
from .quantize import QuantizationMode
from .quantize import quantize, quantize_static, quantize_dynamic, quantize_qat
from .quantize import QuantizationMode
from .calibrate import CalibrationDataReader
from .calibrate import calibrate
from .quant_utils import QuantType

View file

@ -16,23 +16,18 @@ import abc
class CalibrationDataReader(metaclass=abc.ABCMeta):
@classmethod
def __subclasshook__(cls,subclass):
return (hasattr(subclass,'get_next') and callable(subclass.get_next) or NotImplemented)
def __subclasshook__(cls, subclass):
return (hasattr(subclass, 'get_next') and callable(subclass.get_next) or NotImplemented)
@abc.abstractmethod
def get_next(self) -> dict:
"""generate the input data dict for ONNXinferenceSession run"""
raise NotImplementedError
class ONNXCalibrater:
def __init__(self,
model_path,
data_reader:CalibrationDataReader,
calibrate_op_types,
black_nodes,
white_nodes,
augmented_model_path,
input_name_to_nodes):
def __init__(self, model_path, data_reader: CalibrationDataReader, calibrate_op_types, black_nodes, white_nodes,
augmented_model_path):
'''
:param model_path: ONNX model to calibrate
:param data_reader: user implemented object to read in and preprocess calibration dataset
@ -49,8 +44,8 @@ class ONNXCalibrater:
self.black_nodes = black_nodes
self.white_nodes = white_nodes
self.augmented_model_path = augmented_model_path
self.input_name_to_nodes = input_name_to_nodes
self.input_name_to_nodes = {}
def augment_graph(self):
'''
Adds ReduceMin and ReduceMax nodes to all quantization_candidates op type nodes in
@ -60,43 +55,45 @@ class ONNXCalibrater:
model = onnx.load(self.model_path)
model = onnx.shape_inference.infer_shapes(model)
value_infos = {vi.name: vi for vi in model.graph.value_info}
value_infos = {vi.name: vi for vi in model.graph.value_info}
added_nodes = []
added_outputs = []
tensors_to_calibrate = set()
for node in model.graph.node:
should_be_calibrate = ((node.op_type in self.calibrate_op_types) and
(node.name not in self.black_nodes)) or (node.name in self.white_nodes)
(node.name not in self.black_nodes)) or (node.name in self.white_nodes)
if should_be_calibrate:
for input_tensor_name in node.input:
if input_tensor_name in value_infos.keys():
if input_tensor_name in value_infos.keys():
vi = value_infos[input_tensor_name]
if vi.type.HasField(
'tensor_type') and vi.type.tensor_type.elem_type == TensorProto.FLOAT and (
if vi.type.HasField('tensor_type') and vi.type.tensor_type.elem_type == TensorProto.FLOAT and (
input_tensor_name not in model.graph.initializer):
tensors_to_calibrate.add(input_tensor_name)
for output_tensor_name in node.output:
if output_tensor_name in value_infos.keys():
if output_tensor_name in value_infos.keys():
vi = value_infos[output_tensor_name]
if vi.type.HasField(
'tensor_type') and vi.type.tensor_type.elem_type == TensorProto.FLOAT:
if vi.type.HasField('tensor_type') and vi.type.tensor_type.elem_type == TensorProto.FLOAT:
tensors_to_calibrate.add(output_tensor_name)
for tensor in tensors_to_calibrate:
# Adding ReduceMin nodes
reduce_min_name = tensor + '_ReduceMin'
reduce_min_node = onnx.helper.make_node('ReduceMin', [tensor], [tensor + '_ReduceMin'], reduce_min_name, keepdims=0)
reduce_min_node = onnx.helper.make_node('ReduceMin', [tensor], [tensor + '_ReduceMin'],
reduce_min_name,
keepdims=0)
added_nodes.append(reduce_min_node)
added_outputs.append(helper.make_tensor_value_info(reduce_min_node.output[0], TensorProto.FLOAT, ()))
# Adding ReduceMax nodes
reduce_max_name = tensor + '_ReduceMax'
reduce_max_node = onnx.helper.make_node('ReduceMax', [tensor], [tensor + '_ReduceMax'], reduce_max_name, keepdims=0)
reduce_max_node = onnx.helper.make_node('ReduceMax', [tensor], [tensor + '_ReduceMax'],
reduce_max_name,
keepdims=0)
added_nodes.append(reduce_max_node)
added_outputs.append(helper.make_tensor_value_info(reduce_max_node.output[0], TensorProto.FLOAT, ()))
@ -106,7 +103,7 @@ class ONNXCalibrater:
return model
#Using augmented outputs to generate inputs for quantization
def get_intermediate_outputs(self,calib_mode='naive'):
def get_intermediate_outputs(self, calib_mode='naive'):
'''
Gather intermediate model outputs after running inference
parameter calib_mode: type 'naive' gives (ReduceMin, ReduceMax) pairs
@ -127,8 +124,10 @@ class ONNXCalibrater:
break
intermediate_outputs.append(session.run(None, inputs))
node_output_names = [session.get_outputs()[i].name for i in range(len(intermediate_outputs[0]))]
output_dicts_list = [dict(zip(node_output_names, intermediate_outputs[i])) for i in range(self.data_reader.datasize)]
output_dicts_list = [
dict(zip(node_output_names, intermediate_outputs[i])) for i in range(self.data_reader.datasize)
]
#number of outputs in original model
model = onnx.load(self.model_path)
num_model_outputs = len(model.graph.output)
@ -138,7 +137,7 @@ class ONNXCalibrater:
merged_dict.setdefault(k, []).append(v)
added_node_output_names = node_output_names[num_model_outputs:]
node_names = [added_node_output_names[i].rpartition('_')[0]
for i in range(0, len(added_node_output_names), 2)] #output names
for i in range(0, len(added_node_output_names), 2)] #output names
# Characterizing distribution of a node's values across test data sets
clean_merged_dict = dict((i, merged_dict[i]) for i in merged_dict if i != list(merged_dict.keys())[0])
@ -156,7 +155,6 @@ class ONNXCalibrater:
return final_dict
def _get_input_name_to_nodes(self, model):
'''
Helper function to get input_name_to_nodes dictionary
@ -167,26 +165,8 @@ class ONNXCalibrater:
if input_name not in self.input_name_to_nodes:
self.input_name_to_nodes[input_name] = [node]
else:
self.input_name_to_nodes[input_name].append(node)
self.input_name_to_nodes[input_name].append(node)
def _get_next_nodes(self, model, curr_node):
'''
Helper function to get child nodes for a given node
'''
if not self.input_name_to_nodes:
self._get_input_name_to_nodes(model)
children = []
for output in curr_node.output:
if output in self.input_name_to_nodes:
for child_node in self.input_name_to_nodes[output]:
children.append(child_node)
return children
def calculate_scale_zeropoint(self, node, next_node, rmin, rmax):
zp_and_scale = []
@ -218,7 +198,7 @@ class ONNXCalibrater:
return zp_and_scale
def calculate_quantization_params(self,quantization_thresholds):
def calculate_quantization_params(self, quantization_thresholds):
'''
Given quantization thresholds, calculate the quantization params.
:param quantization_thresholds:
@ -239,31 +219,34 @@ class ONNXCalibrater:
}
'''
if quantization_thresholds is None:
raise ValueError('quantization thresholds is required to calculate quantization params (zero point and scale)')
raise ValueError(
'quantization thresholds is required to calculate quantization params (zero point and scale)')
quantization_params = {}
model = onnx.load(self.model_path)
self._get_input_name_to_nodes(model)
for node in model.graph.node:
next_nodes = self._get_next_nodes(model,node)
for next_node in next_nodes:
node_output_name = next_node.output[0]
if node_output_name in quantization_thresholds:
node_thresholds = quantization_thresholds[node_output_name]
node_params = self.calculate_scale_zeropoint(node, next_node, node_thresholds[0], node_thresholds[1])
quantization_params[node_output_name] = node_params
for node_output_name in node.output:
if node_output_name in self.input_name_to_nodes:
children = self.input_name_to_nodes[node_output_name]
for child in children:
if node_output_name in quantization_thresholds:
node_thresholds = quantization_thresholds[node_output_name]
node_params = self.calculate_scale_zeropoint(node, child, node_thresholds[0],
node_thresholds[1])
quantization_params[node_output_name] = node_params
return quantization_params
def calibrate(model_path,
data_reader:CalibrationDataReader,
op_types=['Conv','MatMul'],
data_reader: CalibrationDataReader,
op_types=['Conv', 'MatMul'],
black_nodes=[],
white_nodes=[],
augmented_model_path ='augmented_model.onnx'):
augmented_model_path='augmented_model.onnx'):
'''
Given an onnx model, augment and run the augmented model on calibration data set, aggregate and calculate the quantization parameters.
@ -274,18 +257,15 @@ def calibrate(model_path,
:param white_nodes: operator names that force to be quantized, default = ''
:param augmented_model_path: save augmented_model to this path
'''
input_name_to_nodes = {}
#1. initialize a calibrater
calibrater = ONNXCalibrater(model_path, data_reader, op_types, black_nodes, white_nodes, augmented_model_path, input_name_to_nodes)
calibrater = ONNXCalibrater(model_path, data_reader, op_types, black_nodes, white_nodes, augmented_model_path)
#2. augment
augmented_model = calibrater.augment_graph()
onnx.save(augmented_model, augmented_model_path)
#3. generate quantization thresholds
#3. generate quantization thresholds
dict_for_quantization = calibrater.get_intermediate_outputs()
#4. generate quantization parameters dict
quantization_params_dict = calibrater.calculate_quantization_params(dict_for_quantization)
print("Calibrated,quantized parameters calculated and returned.")
return quantization_params_dict
return quantization_params_dict

View file

@ -0,0 +1,127 @@
import onnx
from .quant_utils import _find_by_name
class ONNXModel:
def __init__(self, model):
self.model = model
self.node_name_counter = {}
def nodes(self):
return self.model.graph.node
def initializer(self):
return self.model.graph.initializer
def graph(self):
return self.model.graph
def ir_version(self):
return self.model.ir_version
def opset_import(self):
return self.model.opset_import
def remove_node(self, node):
if node in self.model.graph.node:
self.model.graph.node.remove(node)
def remove_nodes(self, nodes_to_remove):
for node in nodes_to_remove:
self.remove_node(node)
def add_node(self, node):
self.model.graph.node.extend([node])
def add_nodes(self, nodes_to_add):
self.model.graph.node.extend(nodes_to_add)
def add_initializer(self, tensor):
if _find_by_name(tensor.name, self.model.graph.initializer) is None:
self.model.graph.initializer.extend([tensor])
def get_initializer(self, name):
for tensor in self.model.graph.initializer:
if tensor.name == name:
return tensor
return None
def remove_initializer(self, tensor):
if tensor in self.model.graph.initializer:
self.model.graph.initializer.remove(tensor)
def remove_initializers(self, init_to_remove):
for initializer in init_to_remove:
self.remove_initializer(initializer)
def input_name_to_nodes(self):
input_name_to_nodes = {}
for node in self.model.graph.node:
for input_name in node.input:
if input_name not in input_name_to_nodes:
input_name_to_nodes[input_name] = [node]
else:
input_name_to_nodes[input_name].append(node)
return input_name_to_nodes
def output_name_to_node(self):
output_name_to_node = {}
for node in self.model.graph.node:
for output_name in node.output:
output_name_to_node[output_name] = node
return output_name_to_node
def get_children(self, node, input_name_to_nodes=None):
if input_name_to_nodes is None:
input_name_to_nodes = self.input_name_to_nodes()
children = []
for output in node.output:
if output in input_name_to_nodes:
for node in input_name_to_nodes[output]:
children.append(node)
return children
def get_parents(self, node, output_name_to_node=None):
if output_name_to_node is None:
output_name_to_node = self.output_name_to_node()
parents = []
for input in node.input:
if input in output_name_to_node:
parents.append(output_name_to_node[input])
return parents
def get_parent(self, node, idx, output_name_to_node=None):
if output_name_to_node is None:
output_name_to_node = self.output_name_to_node()
if len(node.input) <= idx:
return None
input = node.input[idx]
if input not in output_name_to_node:
return None
return output_name_to_node[input]
def find_node_by_name(self, node_name, new_nodes_list, graph):
'''
Find out if a node exists in a graph or a node is in the
new set of nodes created during quantization. Return the node found.
'''
graph_nodes_list = list(graph.node) #deep copy
graph_nodes_list.extend(new_nodes_list)
node = _find_by_name(node_name, graph_nodes_list)
return node
def find_nodes_by_initializer(self, graph, initializer):
'''
Find all nodes with given initializer as an input.
'''
nodes = []
for node in graph.node:
for node_input in node.input:
if node_input == initializer.name:
nodes.append(node)
return nodes

View file

@ -0,0 +1,901 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import os
import onnx
import onnx.numpy_helper
import struct
from pathlib import Path
import numpy as np
from onnx import onnx_pb as onnx_proto
from onnx import shape_inference
from onnxruntime import SessionOptions, InferenceSession, GraphOptimizationLevel
from .quant_utils import QuantizationMode, QuantizedValueType, QuantizedInitializer, QuantizedValue, quantization_modes
from .quant_utils import _find_by_name, _get_elem_index, _get_mul_node, _generate_identified_filename, _attribute_to_kwarg
from .quant_utils import QuantType, onnx_domain, __producer__, __version__
from .registry import CreateOpQuantizer, CreateDefaultOpQuantizer
from .onnx_model import ONNXModel
def quantize_data(data, quantize_range, qType):
'''
:parameter data: data to quantize
:parameter quantize_range: list of data to weight pack.
:parameter qType: data type to quantize to. Supported types UINT8 and INT8
:return: minimum, maximum, zero point, scale, and quantized weights
To pack weights, we compute a linear transformation
- when data type == uint8 mode, from [rmin, rmax] -> [0, 2^{b-1}] and
- when data type == int8, from [-m , m] -> [-(2^{b-1}-1), 2^{b-1}-1] where
m = max(abs(rmin), abs(rmax))
and add necessary intermediate nodes to trasnform quantized weight to full weight using the equation
r = S(q-z), where
r: real original value
q: quantized value
S: scale
z: zero point
'''
rmin = min(min(data), 0)
rmax = max(max(data), 0)
if qType == onnx_proto.TensorProto.INT8:
max_range = max(abs(rmin), abs(rmax))
scale = (float(max_range) * 2) / quantize_range
zero_point = 0
# signed byte type
quantized_data = (np.asarray(data) / scale).round().astype('b')
elif qType == onnx_proto.TensorProto.UINT8:
scale = (float(rmax) - rmin) / quantize_range if rmin != rmax else 1
zero_point = round((0 - rmin) / scale) # round to nearest integer
quantized_data = ((np.asarray(data) / scale).round() + zero_point).astype('B') # unsigned byte type
else:
raise ValueError("Unexpected data type {} requested. Only INT8 and UINT8 are supported.".format(qType))
return rmin, rmax, zero_point, scale, quantized_data
def _get_qrange_for_qType(qType):
'''
Helper function to get the quantization range for a type.
parameter qType: quantization type.
return: quantization range.
'''
if qType == onnx_proto.TensorProto.UINT8:
return 255 # 2^b - 1
elif qType == onnx_proto.TensorProto.INT8:
return 254 # [-(2^{b-1}-1), 2^{b-1}-1]: [-127, 127] for 8 bits.
else:
raise ValueError('unsupported quantization data type')
class ONNXQuantizer:
def __init__(self, model, per_channel, mode, static, weight_qType, input_qType, quantization_params,
nodes_to_quantize, nodes_to_exclude, op_types_to_quantize):
onnx_model = shape_inference.infer_shapes(model)
self.model = ONNXModel(onnx_model)
self.value_infos = {vi.name: vi for vi in onnx_model.graph.value_info}
self.per_channel = per_channel # weight-pack per channel
self.mode = mode # QuantizationMode.Value
self.static = static # use static quantization for inputs.
self.fuse_dynamic_quant = False
self.input_qType = input_qType # quantize input type
self.weight_qType = weight_qType # quantize data type
self.quantization_params = quantization_params
self.nodes_to_quantize = nodes_to_quantize # specific nodes to quantize
self.nodes_to_exclude = nodes_to_exclude # specific nodes to exclude
self.op_types_to_quantize = op_types_to_quantize
self.new_nodes = []
self.check_opset_version()
if not self.mode in quantization_modes:
raise ValueError('unsupported quantization mode {}'.format(self.mode))
# QuantizeRange tensor name and zero tensor name for scale and zero point calculation.
# Used when static is False
self.fixed_qrange_uint8_name = "fixed_quantization_range_uint8"
self.fixed_qrange_int8_name = "fixed_quantization_range_int8"
# For uint8 data-type, to compute zero point, we subtract rmin from 0 (represented by fixed_zero_name tensor)
self.fixed_zero_name = "fixed_zero"
# For int8 data-type, zero point is always zero (respresented by fixed_zero_point_name tensor)
self.fixed_zero_zp_name = "fixed_zero_zp"
# List of quantized weights
self._quantized_weights = []
# Map of all original value names to quantized value names
self.quantized_value_map = {}
def check_opset_version(self):
ai_onnx_domain = [
opset for opset in self.model.model.opset_import if not opset.domain or opset.domain == "ai.onnx"
]
if 1 != len(ai_onnx_domain):
raise ValueError('Failed to find proper ai.onnx domain')
opset_version = ai_onnx_domain[0].version
if opset_version < 10:
raise ValueError("The original model opset version is {}, which does not support quantized operators.\n\
The opset version of quantized model will be set to 10. Use onnx model checker to verify model after quantization."
.format(opset_version))
if opset_version == 10:
self.fuse_dynamic_quant = False
else:
self.fuse_dynamic_quant = True
def replace_gemm_with_matmul(self):
nodes_to_remove = []
nodes_to_add = []
for node in self.model.nodes():
if node.op_type == 'Gemm':
alpha = 1.0
beta = 1.0
transA = 0
transB = 0
for attr in node.attribute:
if attr.name == 'alpha':
alpha = onnx.helper.get_attribute_value(attr)
elif attr.name == 'beta':
beta = onnx.helper.get_attribute_value(attr)
elif attr.name == 'transA':
transA = onnx.helper.get_attribute_value(attr)
elif attr.name == 'transB':
transB = onnx.helper.get_attribute_value(attr)
if alpha == 1.0 and beta == 1.0 and transA == 0 and transB == 0:
matmul_node = onnx.helper.make_node('MatMul', [node.input[0], node.input[1]],
[node.output[0] + '_MatMul'],
name=node.output[0] + '_MatMul')
add_node = onnx.helper.make_node('Add',
inputs=[node.output[0] + '_MatMul', node.input[2]],
outputs=node.output,
name=node.output[0] + '_Add')
nodes_to_add.extend([matmul_node, add_node])
nodes_to_remove.extend([node])
self.model.add_nodes(nodes_to_add)
self.model.remove_nodes(nodes_to_remove)
def remove_fake_quantized_nodes(self):
'''
Detect and remove the quantize/dequantizelinear node pairs(fake quantized nodes in Quantization-Aware training)
and reconnect and update the nodes.
'''
nodes_to_remove = []
initializers_to_remove = []
for curr_node in self.model.nodes():
if curr_node.op_type == 'QuantizeLinear':
next_node, prev_node, succ_node = None, None, None
for child_node in self.model.get_children(curr_node):
if child_node.op_type == 'DequantizeLinear':
next_node = child_node
if next_node is None:
raise ValueError(
"Remove fake-quantized node pair Error: DequantizeLinear node is not found for {}.".format(
curr_node.name))
prev_node = self.model.get_parent(curr_node, 0)
if prev_node is None:
raise ValueError("Remove fake-quantized node pair Error: Parent node is not found for {}.".format(
curr_node.name))
succ_nodes = self.model.get_children(next_node)
if len(succ_nodes) == 0:
raise ValueError("Remove fake-quantized node pair Error: No successive nodes found for {}.".format(
next_node.name))
# TODO: convert it to the specified input_type
scale_tensor_name = curr_node.input[1]
zp_tensor_name = curr_node.input[2]
initializer_scale = _find_by_name(scale_tensor_name, self.model.initializer())
initializer_zp = _find_by_name(zp_tensor_name, self.model.initializer())
zp_and_scale = [
onnx.numpy_helper.to_array(initializer_zp),
onnx.numpy_helper.to_array(initializer_scale)
]
#connect the previous and successive node input and output
for succ_node in succ_nodes:
succ_idx = _get_elem_index(next_node.output[0], succ_node.input)
if succ_idx != -1:
succ_node.input[succ_idx] = curr_node.input[0]
else:
raise ValueError(
"Remove fake-quantized node pair Error: Connection failed. No matched successive node input found for {}."
.format(next_node.name))
param_name = curr_node.input[0]
if self.quantization_params is None:
self.quantization_params = {}
self.quantization_params[param_name] = zp_and_scale
#remove fake-quantized nodes
nodes_to_remove.extend([curr_node])
nodes_to_remove.extend([next_node])
#remove unused initializers in graph
initializers_to_remove.extend([initializer_scale])
initializers_to_remove.extend([initializer_zp])
self.model.remove_nodes(nodes_to_remove)
self.model.remove_initializers(initializers_to_remove)
return self.model.model
def should_quantize(self, node):
if (node.op_type not in self.op_types_to_quantize):
return False
if self.nodes_to_quantize is not None and len(
self.nodes_to_quantize) != 0 and node.name not in self.nodes_to_quantize:
return False
if self.nodes_to_exclude is not None and node.name in self.nodes_to_exclude:
return False
return True
def quantize_model(self):
self.replace_gemm_with_matmul()
self.remove_fake_quantized_nodes()
for node in self.model.nodes():
if self.should_quantize(node):
op_quantizer = CreateOpQuantizer(self, node)
else:
op_quantizer = CreateDefaultOpQuantizer(self, node)
op_quantizer.quantize()
self._dequantize_outputs()
# extend is used to append to the list for a protobuf fields
# https://developers.google.com/protocol-buffers/docs/reference/python-generated?csw=1#fields
self.model.graph().ClearField('node')
self.model.graph().node.extend(self.new_nodes)
# Remove weights which are already quantized from graph.
self._remove_quantized_weights()
self.model.model.producer_name = __producer__
self.model.model.producer_version = __version__
return self.model.model
def find_weight_data(self, initializer):
'''
:param initializer: TensorProto initializer object from a graph
:return: a list of initialized data in a given initializer object
'''
if initializer.data_type == onnx_proto.TensorProto.FLOAT:
weights = onnx.numpy_helper.to_array(initializer)
else:
raise ValueError('Only float type quantization is supported. Weights {} is {}. '.format(
initializer.name, type_to_name[initializer.data_type]))
return weights
def _is_valid_quantize_value(self, value_name):
if value_name in self.value_infos:
value_info = self.value_infos[value_name]
return value_info.type.HasField(
'tensor_type') and value_info.type.tensor_type.elem_type == onnx_proto.TensorProto.FLOAT
return self._is_valid_initializer_value(value_name)
def _is_valid_initializer_value(self, value_name):
weight = _find_by_name(value_name, self.model.initializer())
return weight is not None and weight.data_type == onnx_proto.TensorProto.FLOAT
def _is_valid_quantize_weight(self, weight_name):
weight = _find_by_name(weight_name, self.model.initializer())
return weight is not None and weight.data_type == onnx_proto.TensorProto.FLOAT
def _remove_quantized_weights(self):
''' Remove the weights which are already quantized from graph initializer list.
This function assumes that after quantization, all nodes that previously use a weight:
- use output from DequantizeLinear as input if they do not support quantization.
- use quantized weight if they support quantization.
'''
for weight in self._quantized_weights:
# Remove existing weight initializer
self.model.initializer().remove(weight.initializer)
# Removing input weight to a convolution
try:
weight_input = next(val for val in self.model.graph().input if val.name == weight.name)
self.model.graph().input.remove(weight_input)
except StopIteration:
if self.model.ir_version() < 4:
print("Warning: invalid weight name {} found in the graph (not a graph input)".format(weight.name))
def _update_graph(self, weight):
'''
Given a weight object, update the graph by doing the following:
- remove old initializer, update new initializers for quantized weight, zero point, and scale
- remove old weight input, update with new inputs for quantized weight, zero point, and scale
This function does NOT update the nodes in the graph, just initializers and inputs
'''
quantized_value = self.quantized_value_map[weight.name]
assert (quantized_value is not None)
packed_weight_name = quantized_value.q_name
scale_name = quantized_value.scale_name
zero_point_name = quantized_value.zp_name
# Update packed weight, zero point, and scale initializers
packed_weight_np_data = np.asarray(weight.quantized_data,
dtype=onnx.mapping.TENSOR_TYPE_TO_NP_TYPE[weight.qType]).reshape(
weight.initializer.dims)
packed_weight_initializer = onnx.numpy_helper.from_array(packed_weight_np_data, packed_weight_name)
if weight.axis is not None:
zero_scale_shape = [weight.initializer.dims[weight.axis]]
else: # scale and zero point must be scalar
zero_scale_shape = []
zero_point_type = weight.qType
scale_initializer = onnx.helper.make_tensor(scale_name, onnx_proto.TensorProto.FLOAT, zero_scale_shape,
weight.scales)
zero_initializer = onnx.helper.make_tensor(zero_point_name, zero_point_type, zero_scale_shape,
weight.zero_points)
self.model.initializer().extend([packed_weight_initializer, scale_initializer, zero_initializer])
self._quantized_weights.append(weight)
def _get_quantized_weight(self, initializer, qType):
'''
:param initializer: TensorProto initializer
:param qType: type to quantize to
:return: Weight class with quantization information
'''
weights_data = self.find_weight_data(initializer)
rmin, rmax, zero_point, scale, quantized_weights_data = quantize_data(weights_data.flatten().tolist(),
_get_qrange_for_qType(qType), qType)
weight = QuantizedInitializer(initializer.name,
initializer, [rmin], [rmax], [zero_point], [scale],
weights_data,
quantized_weights_data,
axis=None,
qType=qType)
# Log entry for this quantized weight
assert (weight.name not in self.quantized_value_map)
quantized_value = QuantizedValue(weight.name, weight.name + "_quantized", weight.name + "_scale",
weight.name + "_zero_point", QuantizedValueType.Initializer, None, qType)
self.quantized_value_map[weight.name] = quantized_value
return weight
def _get_quantized_weight_convolution(self, initializer, qType):
'''
:param initializer: initializer TypeProto to quantize
:param qType: type to quantize to
:return: Weight class object with quantization information for a given initializer
'''
if not self.per_channel:
return self._get_quantized_weight(initializer, qType)
weights = self.find_weight_data(initializer)
# Quantize per output channel
# Assuming (M x C/group x kH x kW) format where M is number of output channels.
channel_count = initializer.dims[0]
np_data = np.reshape(weights, initializer.dims)
rmin_list = []
rmax_list = []
zero_point_list = []
scale_list = []
quantized_per_channel_data_list = []
for i in range(channel_count):
# for each channel, compute quantization data. Assuming (M x C/group x kH x kW)
per_channel_data = np_data[i, :, :, :].flatten()
rmin, rmax, zero_point, scale, quantized_per_channel_data = quantize_data(
per_channel_data.flatten().tolist(), _get_qrange_for_qType(qType), qType)
rmin_list.append(rmin)
rmax_list.append(rmax)
zero_point_list.append(zero_point)
scale_list.append(scale)
quantized_per_channel_data_list.append(quantized_per_channel_data)
channel_index = 0 # (M x C/group x kH x kW)
# combine per_channel_data into one
reshape_dims = list(initializer.dims) # deep copy
reshape_dims[channel_index] = 1 # only one per channel for reshape
quantized_weights = np.asarray(quantized_per_channel_data_list[0]).reshape(reshape_dims)
for i in range(1, len(quantized_per_channel_data_list)):
channel_weights = np.asarray(quantized_per_channel_data_list[i]).reshape(reshape_dims)
quantized_weights = np.concatenate((quantized_weights, channel_weights), axis=0)
weight = QuantizedInitializer(initializer.name, initializer, rmin_list, rmax_list, zero_point_list, scale_list,
weights,
quantized_weights.flatten().tolist(), channel_index, qType)
# Make entry for this quantized weight
assert (weight.name not in self.quantized_value_map)
quantized_value = QuantizedValue(weight.name, weight.name + "_quantized", weight.name + "_scale",
weight.name + "_zero_point", QuantizedValueType.Initializer, None, qType)
self.quantized_value_map[weight.name] = quantized_value
return weight
def _get_dynamic_input_quantization_params(self, input_name, nodes_list, qType):
'''
Create nodes for dynamic quantization of input and add them to nodes_list.
parameter input_name: Name of the input.
parameter nodes_list: new nodes are appended to this list.
parameter qType: type to quantize to.
return: scale_name, zero_point_name, scale_shape, zero_point_shape.
'''
if qType == onnx_proto.TensorProto.INT8:
return self._get_dynamic_input_quantization_params_int8(input_name, nodes_list)
return self._get_dynamic_input_quantization_params_uint8(input_name, nodes_list)
def _get_dynamic_input_quantization_params_int8(self, input_name, nodes_list):
'''
Create nodes for dynamic quantization of input to int8 and add them to nodes_list
parameter input_name: Name of the input.
parameter nodes_list: new nodes are appended to this list.
return: scale_name, zero_point_name, scale_shape, zero_point_shape.
'''
qType = onnx_proto.TensorProto.INT8
# Reduce min and Reduce max
input_scale_name = input_name + "_scale"
reduce_min_name = input_name + "_ReduceMin"
reduce_min_node = onnx.helper.make_node("ReduceMin", [input_name], [reduce_min_name + ":0"],
reduce_min_name,
keepdims=0)
nodes_list.append(reduce_min_node)
reduce_max_name = input_name + "_ReduceMax"
reduce_max_node = onnx.helper.make_node("ReduceMax", [input_name], [reduce_max_name + ":0"],
reduce_max_name,
keepdims=0)
nodes_list.append(reduce_max_node)
# Compute scale
# Find abs(rmin)
reduce_min_abs_name = reduce_min_name + "_Abs"
reduce_min_abs_node = onnx.helper.make_node("Abs", [reduce_min_node.output[0]], [reduce_min_abs_name + ":0"],
reduce_min_abs_name)
nodes_list.append(reduce_min_abs_node)
# Find abs(rmax)
reduce_max_abs_name = reduce_max_name + "_Abs"
reduce_max_abs_node = onnx.helper.make_node("Abs", [reduce_max_node.output[0]], [reduce_max_abs_name + ":0"],
reduce_max_abs_name)
nodes_list.append(reduce_max_abs_node)
# Compute max of abs(rmin) and abs(rmax)
abs_max_name = input_name + "_Abs_Max"
abs_max_node = onnx.helper.make_node("Max", [reduce_min_abs_node.output[0], reduce_max_abs_node.output[0]],
[abs_max_name + ":0"], abs_max_name)
nodes_list.append(abs_max_node)
# and divide by (quantize_range/2.0) which will be equal to max(...)*2.0/quantize_range
initializer_div = onnx.helper.make_tensor(self.fixed_qrange_int8_name, onnx_proto.TensorProto.FLOAT, [],
[_get_qrange_for_qType(qType) / 2.0])
self.model.add_initializer(initializer_div)
scale_div_name = input_name + "scale_Div"
scale_div_node = onnx.helper.make_node("Div", [abs_max_node.output[0], self.fixed_qrange_int8_name],
[input_scale_name], scale_div_name)
nodes_list.append(scale_div_node)
# Zero point
initializer_zp = onnx.helper.make_tensor(self.fixed_zero_zp_name, qType, [], [0])
self.model.add_initializer(initializer_zp)
return input_scale_name, self.fixed_zero_zp_name, [], []
def _get_dynamic_input_quantization_params_uint8(self, input_name, nodes_list):
'''
Create nodes for dynamic quantization of input to uint8 and add them to nodes_list
parameter input_name: Name of the input.
parameter nodes_list: new nodes are appended to this list.
return: scale_name, zero_point_name, scale_shape, zero_point_shape.
'''
qType = onnx_proto.TensorProto.UINT8
# Reduce min and Reduce max
input_scale_name = input_name + "_scale"
input_zp_name = input_name + "_zero_point"
reduce_min_name = input_name + "_ReduceMin"
reduce_min_node = onnx.helper.make_node("ReduceMin", [input_name], [reduce_min_name + ":0"],
reduce_min_name,
keepdims=0)
nodes_list.append(reduce_min_node)
reduce_max_name = input_name + "_ReduceMax"
reduce_max_node = onnx.helper.make_node("ReduceMax", [input_name], [reduce_max_name + ":0"],
reduce_max_name,
keepdims=0)
nodes_list.append(reduce_max_node)
# Add tensors for quantize range and zero value.
initializer_qrange = onnx.helper.make_tensor(self.fixed_qrange_uint8_name, onnx_proto.TensorProto.FLOAT, [],
[_get_qrange_for_qType(qType)])
self.model.add_initializer(initializer_qrange)
initializer_qvalue = onnx.helper.make_tensor(self.fixed_zero_name, onnx_proto.TensorProto.FLOAT, [], [0.0])
self.model.add_initializer(initializer_qvalue)
# Compute Scale
# Subtract rmax and rmin
scale_sub_name = input_name + "_scale_Sub"
scale_sub_node = onnx.helper.make_node("Sub", [reduce_max_node.output[0], reduce_min_node.output[0]],
[scale_sub_name + ":0"], scale_sub_name)
nodes_list.append(scale_sub_node)
# and divide by quantize range
scale_div_name = input_name + "_scale_Div"
scale_div_node = onnx.helper.make_node("Div", [scale_sub_node.output[0], self.fixed_qrange_uint8_name],
[input_scale_name], scale_div_name)
nodes_list.append(scale_div_node)
# Compute zero point
# Subtract zero and rmin
zp_sub_name = input_name + "_zero_point_Sub"
zp_sub_node = onnx.helper.make_node("Sub", [self.fixed_zero_name, reduce_min_node.output[0]],
[zp_sub_name + ":0"], zp_sub_name)
nodes_list.append(zp_sub_node)
# Divide by scale
zp_div_name = input_name + "_zero_point_Div"
zp_div_node = onnx.helper.make_node("Div", [zp_sub_node.output[0], input_scale_name], [zp_div_name + ":0"],
zp_div_name)
nodes_list.append(zp_div_node)
# Compute floor
zp_floor_name = input_name + "_zero_point_Floor"
zp_floor_node = onnx.helper.make_node("Floor", zp_div_node.output, [zp_floor_name + ":0"], zp_floor_name)
nodes_list.append(zp_floor_node)
# Cast to integer
zp_cast_name = input_name + "_zero_point_Cast"
zp_cast_node = onnx.helper.make_node("Cast", zp_floor_node.output, [input_zp_name], zp_cast_name, to=qType)
nodes_list.append(zp_cast_node)
return input_scale_name, input_zp_name, [], []
def _get_quantization_params(self, param_name):
'''
Create initializers and inputs in the graph for zero point and scale of output.
Zero point and scale values are obtained from self.quantization_params if specified.
parameter param_name: Name of the quantization parameter.
return: result, scale_name, zero_point_name, scale_shape, zero_point_shape.
'''
if self.quantization_params is None or param_name not in self.quantization_params:
return False, "", "", "", ""
params = self.quantization_params[param_name]
if params is None or len(params) != 2:
raise ValueError("Quantization parameters should contain zero point and scale. "
"Specified values for output {}: {}".format(param_name, params))
zero_point_values = [params[0].item()]
zero_point_shape = []
zero_point_name = param_name + "_zero_point"
zero_point_type = onnx.mapping.NP_TYPE_TO_TENSOR_TYPE[params[0].dtype]
scale_values = [params[1].item()]
scale_shape = []
scale_name = param_name + "_scale"
# Add initializers
init_zp = onnx.helper.make_tensor(zero_point_name, zero_point_type, zero_point_shape, zero_point_values)
self.model.add_initializer(init_zp)
init_scale = onnx.helper.make_tensor(scale_name, onnx_proto.TensorProto.FLOAT, scale_shape, scale_values)
self.model.add_initializer(init_scale)
return True, scale_name, zero_point_name, scale_shape, zero_point_shape
def _get_quantize_input_nodes(self, node, input_index, qType):
'''
Given an input for a node (which is not a initializer), this function
- add nodes to compute zero point and scale for this input if they don't exist.
- add new QuantizeLinear node to quantize the input.
parameter node: node being quantized in NodeProto format.
parameter input_index: index of input in node.input.
parameter qType: type to quantize to.
return: List of newly created nodes in NodeProto format.
'''
input_name = node.input[input_index]
output_name = input_name + "_quantized"
data_found, scale_name, zp_name, _, _ = \
self._get_quantization_params(input_name)
if self.static:
if data_found == False:
raise ValueError(
"Quantization parameters are not specified for param {}."
"In static mode quantization params for inputs and outputs of nodes to be quantized are required.".
format(input_name))
qlinear_node = onnx.helper.make_node("QuantizeLinear", [input_name, scale_name, zp_name], [output_name],
input_name + "_QuantizeLinear")
return [qlinear_node]
else:
if data_found == True:
qlinear_node = onnx.helper.make_node("QuantizeLinear", [input_name, scale_name, zp_name], [output_name],
input_name + "_QuantizeLinear")
return [qlinear_node]
else:
# Scale and Zero Points not available for this input. Add nodes to dynamically compute it
if self.fuse_dynamic_quant and qType == onnx_proto.TensorProto.UINT8:
scale_name = input_name + "_scale"
zeropoint_name = input_name + "_zero_point"
qlinear_node = onnx.helper.make_node("DynamicQuantizeLinear", [input_name],
[output_name, scale_name, zeropoint_name],
input_name + "_QuantizeLinear")
return [qlinear_node]
else:
nodes = []
scale_name, zp_name, scale_shape, zp_shape = \
self._get_dynamic_input_quantization_params(
input_name, nodes, qType)
qlinear_node = onnx.helper.make_node("QuantizeLinear", [input_name, scale_name, zp_name],
[output_name], input_name + "_QuantizeLinear")
return nodes + [qlinear_node]
def _get_bias_add_nodes(self, nodes, node, last_output, quantized_bias_name):
'''
Given a node, this function handles bias add by adding a "reshape" node on bias and an "add" node
parameter nodes: new nodes would be appended into nodes
parameter node: current node (Conv)
parameter last_output: output of previous node (input to bias add)
return: the name of output
'''
# Add an Add operation for bias
# Add reshape for correct broadcase
reshape_input = [quantized_bias_name]
# Add tensors for the shape to be reshaped to
init_shape = onnx.helper.make_tensor("reshape_shape", onnx_proto.TensorProto.INT64, [4], [1, -1, 1, 1])
self.model.add_initializer(init_shape)
reshape_input.append('reshape_shape')
reshape_op_output = node.output[0] + "_reshape"
reshape_node = onnx.helper.make_node("Reshape", reshape_input, [reshape_op_output],
quantized_bias_name + "reshape")
nodes.append(reshape_node)
bias_add_input = [last_output]
bias_add_input.append(reshape_op_output)
add_node_output = node.output[0] + "_bias_add"
add_node = onnx.helper.make_node("Add", bias_add_input, [add_node_output], quantized_bias_name + "bias_add")
nodes.append(add_node)
return add_node_output
def _update_nodes_using_weight(self):
'''Find all nodes using a weight that do not support quantization and
add a DequantizeLinear node before those nodes. This includes all nodes except Conv, MatMul.
parameter weight: Weight object
parameter new_nodes_list: List of new nodes created before processing current node.
return: List of new nodes created.
'''
nodes_list = []
for weight in self._quantized_weights:
nodes_using_weight = self.model.find_nodes_by_initializer(self.new_nodes, weight.initializer)
dequantize_linear_name = weight.name + "_DequantizeLinear"
output_name = weight.name + "_dequantized"
# Check if DequantizeLinear node needs to be added to graph.
if len(nodes_using_weight) != 0 and \
self.model.find_node_by_name(dequantize_linear_name,self.new_nodes,self.model.graph()) is None:
inputs = [weight.name + "_quantized", weight.name + "_scale", weight.name + "_zero_point"]
node = onnx.helper.make_node("DequantizeLinear", inputs, [output_name], dequantize_linear_name)
nodes_list.append(node)
# Update unsupported nodes to take dequantized weight as input.
for node in nodes_using_weight:
for i, node_input in enumerate(node.input):
if node_input == weight.name:
node.input[i] = output_name
self.new_nodes += nodes_list
def _dynamic_quantize_bias(self, input_name, weight_scale_name, bias_name, quantized_bias_name, new_node_list):
'''
Adds series of nodes required to quantize the bias dynamically.
parameter input_name: Input name
parameter weight_scale_name: Weight scale.
parameter bias_scale_name: Bias to quantize.
parameter quantied_bias_name: Output name to use for quantized bias.
'''
qType = onnx_proto.TensorProto.INT32
input_scale_name = input_name + "_scale"
bias_scale_node = onnx.helper.make_node("Mul", [input_scale_name, weight_scale_name], [bias_name + "_scale"],
bias_name + "_scale_node")
new_node_list.append(bias_scale_node)
quantize_bias_node = onnx.helper.make_node("Div", [bias_name, bias_scale_node.output[0]],
[bias_name + "_tmp_quant:0"], bias_name + "_tmp_qaunt")
new_node_list.append(quantize_bias_node)
bias_rounded_node = onnx.helper.make_node("Floor", quantize_bias_node.output, [bias_name + "_quant_rounded:0"],
bias_name + "_quant_rounded")
new_node_list.append(bias_rounded_node)
bias_cast_node = onnx.helper.make_node("Cast",
bias_rounded_node.output, [quantized_bias_name],
quantized_bias_name + "_node",
to=qType)
new_node_list.append(bias_cast_node)
return
def quantize_bias(self, node, new_node_list):
'''
Quantized the bias. Zero Point == 0 and Scale == Input_Scale * Weight_Scale
'''
# get scale for weight
weight_scale_name = self.quantized_value_map[node.input[1]].scale_name
weight_initializer = _find_by_name(weight_scale_name, self.model.initializer())
weight_scale = self.find_weight_data(weight_initializer)
# get bias
bias_name = node.input[2]
bias_initializer = _find_by_name(bias_name, self.model.initializer())
bias_data = self.find_weight_data(bias_initializer)
quantized_bias_name = bias_name + "_quantized"
# input scale is not provided and this input is dynamically quantized so it is not pre-computed at this point
# so resort to dynamic quantization for bias
if self.quantization_params is None or node.input[0] not in self.quantization_params and node.input[
0] not in self.quantized_value_map:
self._dynamic_quantize_bias(node.input[0], weight_scale_name, bias_name, quantized_bias_name, new_node_list)
else:
# get scale for input
if node.input[0] in self.quantized_value_map:
input_scale_name = self.quantized_value_map[node.input[0]].scale_name
elif node.input[0] in self.quantization_params:
_, input_scale_name, _, _, _ = self._get_quantization_params(node.input[0])
else:
raise ValueError("Expected {} to be in quantized value map for static quantization".format(
node.input[0]))
inputscale_initializer = _find_by_name(input_scale_name, self.model.initializer())
input_scale = self.find_weight_data(inputscale_initializer)
# calcuate scale for bias
bias_scale = input_scale * weight_scale
# quantize bias
quantized_data = (np.asarray(bias_data) / bias_scale).round().astype(np.int32)
# update bias initializer
bias_np_data = np.asarray(quantized_data, dtype=np.int32).reshape(bias_initializer.dims)
packed_bias_initializer = onnx.numpy_helper.from_array(bias_np_data, quantized_bias_name)
self.model.initializer().extend([packed_bias_initializer])
# log entries for this quantized bias value
quantized_bias_entry = QuantizedInitializer(bias_name,
bias_initializer, [0], [0], [0], [bias_scale],
bias_data,
quantized_data,
qType=onnx_proto.TensorProto.INT32)
self._quantized_weights.append(quantized_bias_entry)
assert (bias_name not in self.quantized_value_map)
quantized_value = QuantizedValue(bias_name, quantized_bias_name, "", "", QuantizedValueType.Initializer,
None, onnx_proto.TensorProto.INT32)
self.quantized_value_map[bias_name] = quantized_value
return quantized_bias_name
def _quantize_inputs(self, node, indices):
'''
Given a node, this function quantizes the inputs as follows:
- If input is an initializer, quantize the initializer data, replace old initializer
with new initializer
- Else, add QuantizeLinear nodes to perform quantization
parameter node: node being quantized in NodeProto format.
parameter indices: input indices to quantize.
parameter new_nodes_list: List of new nodes created before processing this node. This is used to
check that two QuantizeLinear nodes are not being added for same input.
return: (List of quantized input names,
List of zero point names used for input quantization,
List of scale names used for input quantization,
List of new QuantizeLinear nodes created)
'''
quantized_input_names = []
zero_point_names = []
scale_names = []
nodes = []
for input_index in indices:
node_input = node.input[input_index]
# Find if this input is already quantized
if node_input in self.quantized_value_map:
quantized_value = self.quantized_value_map[node_input]
qType = self.weight_qType if quantized_value.value_type == QuantizedValueType.Initializer else self.input_qType
if quantized_value.qType != qType:
raise ValueError(
"{} is being used by multiple nodes which are being quantized to different types. "
"This is not suported.", node_input)
quantized_input_names.append(quantized_value.q_name)
scale_names.append(quantized_value.scale_name)
zero_point_names.append(quantized_value.zp_name)
continue
# Quantize the input
initializer = _find_by_name(node_input, self.model.initializer())
if initializer is not None:
if node.op_type == "Conv":
weight = self._get_quantized_weight_convolution(initializer, self.weight_qType)
else:
weight = self._get_quantized_weight(initializer, self.weight_qType)
# Update graph
self._update_graph(weight)
quantized_input_names.append(weight.name + "_quantized")
zero_point_names.append(weight.name + "_zero_point")
scale_names.append(weight.name + "_scale")
else:
# Add QuantizeLinear node.
qlinear_node = self.model.find_node_by_name(node_input + "_QuantizeLinear", self.new_nodes,
self.model.graph())
if qlinear_node is None:
quantize_input_nodes = self._get_quantize_input_nodes(node, input_index, self.input_qType)
nodes.extend(quantize_input_nodes)
qlinear_node = quantize_input_nodes[-1]
if qlinear_node.op_type == "QuantizeLinear":
quantized_input_names.extend(qlinear_node.output)
scale_names.append(qlinear_node.input[1])
zero_point_names.append(qlinear_node.input[2])
else:
quantized_input_names.append(qlinear_node.output[0])
scale_names.append(qlinear_node.output[1])
zero_point_names.append(qlinear_node.output[2])
return (quantized_input_names, zero_point_names, scale_names, nodes)
def _dequantize_value(self, value_name):
'''
Given a value (input/output) which is quantized, add a DequantizeLinear node to dequantize
it back to float32
parameter value_name: value to dequantize
parameter new_nodes_list: List of new nodes created before processing current node
return: None if there is already a DequantizeLinear node that dequantizes it
A DequantizeLinear node otherwise
'''
if value_name in self.quantized_value_map:
quantized_value = self.quantized_value_map[value_name]
# Add DequantizeLinear Node for this input
dqlinear_name = value_name + "_DequantizeLinear"
dqlinear_node = self.model.find_node_by_name(dqlinear_name, self.new_nodes, self.model.graph())
if dqlinear_node is None:
dqlinear_inputs = [quantized_value.q_name, quantized_value.scale_name, quantized_value.zp_name]
dequantize_node = onnx.helper.make_node("DequantizeLinear", dqlinear_inputs, [value_name],
dqlinear_name)
return dequantize_node
else:
# DQ op is already present, assert it's output matches the input of current node
assert (value_name == dqlinear_node.output[0])
return None
def _dequantize_outputs(self):
'''
Dequantize output if it is quantized
parameter new_nodes_list: List of new nodes created before processing current node
return: List of new nodes created
'''
for output in self.model.graph().output:
dequantize_node = self._dequantize_value(output.name)
if dequantize_node is not None:
self.new_nodes.append(dequantize_node)

View file

@ -0,0 +1,2 @@
#from .base_operator import QuantOperatorBase
#from .matmul import MatMulInteger

View file

@ -0,0 +1,22 @@
import onnx
from .base_operator import QuantOperatorBase
from onnx import onnx_pb as onnx_proto
class QLinearActivation(QuantOperatorBase):
def __init__(self, onnx_quantizer, onnx_node):
super().__init__(onnx_quantizer, onnx_node)
def quantize(self):
node = self.node
assert (node.op_type == "Relu" or node.op_type == 'Clip')
# When mode is QLinearOps, the output quantization params are calculated based on outputs from
# activation nodes, therefore these nodes can be removed from the graph if they follow a quantized op.
# If input to this node is not quantized then keep this node
if node.input[0] not in self.quantizer.quantized_value_map:
self.quantizer.new_nodes += [node]
return
quantized_value = self.quantizer.quantized_value_map[node.input[0]]
self.quantizer.quantized_value_map[node.output[0]] = quantized_value

View file

@ -0,0 +1,43 @@
import onnx
from .base_operator import QuantOperatorBase
from ..quant_utils import _attribute_to_kwarg, ms_domain
from onnx import onnx_pb as onnx_proto
'''
Quantize Attention
'''
class AttentionQuant(QuantOperatorBase):
def __init__(self, onnx_quantizer, onnx_node):
super().__init__(onnx_quantizer, onnx_node)
def quantize(self):
'''
parameter node: Attention node.
parameter new_nodes_list: List of new nodes created before processing this node.
return: a list of nodes in topological order that represents quantized Attention node.
'''
node = self.node
assert (node.op_type == "Attention")
(quantized_input_names, zero_point_names, scale_names, nodes) = \
self.quantizer._quantize_inputs(node, [0, 1])
qattention_name = "" if node.name == "" else node.name + "_quant"
inputs = []
inputs.extend(quantized_input_names)
inputs.extend([node.input[2]])
inputs.extend(scale_names)
inputs.extend([node.input[3] if len(node.input) > 3 else ""])
inputs.extend(zero_point_names)
inputs.extend([node.input[4] if len(node.input) > 4 else ""])
kwargs = {}
for attribute in node.attribute:
kwargs.update(_attribute_to_kwarg(attribute))
kwargs["domain"] = ms_domain
qattention_node = onnx.helper.make_node("QAttention", inputs, node.output, qattention_name, **kwargs)
nodes.append(qattention_node)
self.quantizer.new_nodes += nodes

View file

@ -0,0 +1,22 @@
class QuantOperatorBase:
def __init__(self, onnx_quantizer, onnx_node):
self.quantizer = onnx_quantizer
self.node = onnx_node
def quantize(self):
'''
Given a node which does not support quantization(Conv, Matmul, Gather), this method
checks whether the input to this node is quantized and adds a DequantizeLinear node
to dequantize this input back to FP32
parameter node: Current node
parameter new_nodes_list: List of new nodes created before processing current node
return: List of new nodes created
'''
nodes = []
for index, node_input in enumerate(self.node.input):
dequantize_node = self.quantizer._dequantize_value(node_input)
if dequantize_node is not None:
self.quantizer.new_nodes.append(dequantize_node)
# Append the original node
self.quantizer.new_nodes.append(self.node)

View file

@ -0,0 +1,54 @@
import onnx
from .base_operator import QuantOperatorBase
from ..quant_utils import _attribute_to_kwarg, ms_domain, QuantizedValue, QuantizedValueType
from onnx import onnx_pb as onnx_proto
class QLinearBinaryOp(QuantOperatorBase):
def __init__(self, onnx_quantizer, onnx_node):
super().__init__(onnx_quantizer, onnx_node)
def quantize(self):
node = self.node
data_found, output_scale_name, output_zp_name, _, _ = \
self.quantizer._get_quantization_params(node.output[0])
if (not data_found): # only try to quantize when given quantization parameters for it
return super().quantize()
(quantized_input_names, zero_point_names, scale_names, nodes) = \
self.quantizer._quantize_inputs(node, [0, 1])
qlinear_binary_math_output = node.output[0] + "_quantized"
qlinear_binary_math_name = node.name + "_quant" if node.name != "" else ""
kwargs = {}
for attribute in node.attribute:
kwargs.update(_attribute_to_kwarg(attribute))
kwargs["domain"] = ms_domain
qlinear_binary_math_inputs = []
# Input 0
qlinear_binary_math_inputs.append(quantized_input_names[0])
qlinear_binary_math_inputs.append(scale_names[0])
qlinear_binary_math_inputs.append(zero_point_names[0])
# Input 1
qlinear_binary_math_inputs.append(quantized_input_names[1])
qlinear_binary_math_inputs.append(scale_names[1])
qlinear_binary_math_inputs.append(zero_point_names[1])
# Output
qlinear_binary_math_inputs.append(output_scale_name)
qlinear_binary_math_inputs.append(output_zp_name)
qlinear_binary_math_node = onnx.helper.make_node("QLinear" + node.op_type, qlinear_binary_math_inputs,
[qlinear_binary_math_output], qlinear_binary_math_name,
**kwargs)
nodes.append(qlinear_binary_math_node)
# Create an entry for this quantized value
q_output = QuantizedValue(node.output[0], qlinear_binary_math_output, output_scale_name, output_zp_name,
QuantizedValueType.Input)
self.quantizer.quantized_value_map[node.output[0]] = q_output
self.quantizer.new_nodes += nodes

View file

@ -0,0 +1,124 @@
import onnx
from .base_operator import QuantOperatorBase
from ..quant_utils import _find_by_name, _get_mul_node, QuantizedValue, QuantizedValueType, _attribute_to_kwarg
from onnx import onnx_pb as onnx_proto
class ConInteger(QuantOperatorBase):
def __init__(self, onnx_quantizer, onnx_node):
super().__init__(onnx_quantizer, onnx_node)
def quantize(self):
node = self.node
assert (node.op_type == "Conv")
(quantized_input_names, zero_point_names, scale_names, nodes) = \
self.quantizer._quantize_inputs(node, [0, 1])
# quantize bias if exist
quantized_bias_name = ""
bias_present = False
if len(node.input) == 3:
quantized_bias_name = self.quantizer.quantize_bias(node, nodes)
bias_present = True
conv_integer_output = node.output[0] + "_quantized"
conv_integer_name = node.name + "_quant" if node.name != "" else ""
kwargs = {}
for attribute in node.attribute:
kwargs.update(_attribute_to_kwarg(attribute))
conv_integer_node = onnx.helper.make_node("ConvInteger", quantized_input_names + zero_point_names,
[conv_integer_output], conv_integer_name, **kwargs)
nodes.append(conv_integer_node)
# Add bias add nodes
if bias_present:
conv_integer_output = self.quantizer.get_bias_add_nodes(nodes, node, conv_integer_output,
quantized_bias_name)
# Add cast operation to cast convInteger output to float.
cast_op_output = conv_integer_output + "_cast_output"
cast_node = onnx.helper.make_node("Cast", [conv_integer_output], [cast_op_output],
conv_integer_output + "_cast",
to=onnx_proto.TensorProto.FLOAT)
nodes.append(cast_node)
# Add mul operation to multiply scales of two inputs.
assert (len(scale_names) == 2)
if conv_integer_name != "":
scales_mul_op = conv_integer_name + "_scales_mul"
else:
scales_mul_op = scale_names[0] + "_" + scale_names[1] + "_mul"
scales_mul_node = _find_by_name(scales_mul_op, self.nodes)
if scales_mul_node is None:
scales_mul_node = _get_mul_node(scale_names, scales_mul_op + ":0", scales_mul_op)
nodes.append(scales_mul_node)
scales_mul_op_output = scales_mul_node.output[0]
# Add mul operation to multiply mul_scales_op result with output of ConvInteger
# and make the output of this node the same as output of original conv node.
output_scale_mul_op = conv_integer_name + "_output_scale_mul" if conv_integer_name != "" else ""
nodes.append(_get_mul_node([cast_op_output, scales_mul_op_output], node.output[0], output_scale_mul_op))
self.new_nodes += nodes
class QLinearCov(QuantOperatorBase):
def __init__(self, onnx_quantizer, onnx_node):
super().__init__(onnx_quantizer, onnx_node)
def quantize(self):
node = self.node
assert (node.op_type == "Conv")
(quantized_input_names, zero_point_names, scale_names, nodes) = \
self.quantizer._quantize_inputs(node, [0, 1])
quantized_bias_name = ""
bias_present = False
if len(node.input) == 3:
quantized_bias_name = self.quantizer.quantize_bias(node, nodes)
bias_present = True
data_found, output_scale_name, output_zp_name, _, _ = \
self.quantizer._get_quantization_params(node.output[0])
if not data_found:
raise ValueError("Quantization parameters for output:\"{}\" of node:\"{}\" not specified".format(
node.output[0], node.name))
qlinear_conv_output = node.output[0] + "_quantized"
qlinear_conv_name = qlinear_conv_name = node.name + "_quant" if node.name != "" else ""
kwargs = {}
for attribute in node.attribute:
kwargs.update(_attribute_to_kwarg(attribute))
qlinear_conv_inputs = []
# Input 0
qlinear_conv_inputs.append(quantized_input_names[0])
qlinear_conv_inputs.append(scale_names[0])
qlinear_conv_inputs.append(zero_point_names[0])
# Input 1
qlinear_conv_inputs.append(quantized_input_names[1])
qlinear_conv_inputs.append(scale_names[1])
qlinear_conv_inputs.append(zero_point_names[1])
# Output
qlinear_conv_inputs.append(output_scale_name)
qlinear_conv_inputs.append(output_zp_name)
if bias_present:
qlinear_conv_inputs.append(quantized_bias_name)
qlinear_conv_node = onnx.helper.make_node("QLinearConv", qlinear_conv_inputs, [qlinear_conv_output],
qlinear_conv_name, **kwargs)
nodes.append(qlinear_conv_node)
# Create an entry for this quantized value
q_output = QuantizedValue(node.output[0], qlinear_conv_output, output_scale_name, output_zp_name,
QuantizedValueType.Input)
self.quantizer.quantized_value_map[node.output[0]] = q_output
self.quantizer.new_nodes += nodes

View file

@ -0,0 +1,22 @@
import onnx
from .base_operator import QuantOperatorBase
from onnx import onnx_pb as onnx_proto
'''
Quantize EmbedLayerNormalization
'''
class EmbedLayerNormalizationQuant(QuantOperatorBase):
def __init__(self, onnx_quantizer, onnx_node):
super().__init__(onnx_quantizer, onnx_node)
def quantize(self):
node = self.node
assert (node.op_type == "EmbedLayerNormalization")
(quantized_input_names, zero_point_names, scale_names, nodes) = \
self.quantizer._quantize_inputs(node, [2, 3, 4])
nodes.append(node)
self.quantizer.new_nodes += nodes

View file

@ -0,0 +1,36 @@
import onnx
from .base_operator import QuantOperatorBase
from ..quant_utils import QuantizedValue, QuantizedValueType
from onnx import onnx_pb as onnx_proto
'''
Quantize Gather
'''
class GatherQuant(QuantOperatorBase):
def __init__(self, onnx_quantizer, onnx_node):
super().__init__(onnx_quantizer, onnx_node)
def quantize(self):
node = self.node
assert (node.op_type == "Gather")
if (not self.quantizer._is_valid_quantize_weight(node.input[0])):
self.quantizer.new_nodes += [node]
return
(quantized_input_names, zero_point_names, scale_names, nodes) = \
self.quantizer._quantize_inputs(node, [0])
gather_new_output = node.output[0] + "_quantized"
# Create an entry for this quantized value
q_output = QuantizedValue(node.output[0], gather_new_output, scale_names[0], zero_point_names[0],
QuantizedValueType.Input)
self.quantizer.quantized_value_map[node.output[0]] = q_output
gather_original_output = node.output[0]
node.output[0] = gather_new_output
node.input[0] = quantized_input_names[0]
nodes.append(node)
self.quantizer.new_nodes += nodes

View file

@ -0,0 +1,103 @@
import onnx
from .base_operator import QuantOperatorBase
from ..quant_utils import _find_by_name, _get_mul_node, QuantizedValue, QuantizedValueType
from onnx import onnx_pb as onnx_proto
'''
Used when quantize mode is QuantizationMode.IntegerOps.
'''
class MatMulInteger(QuantOperatorBase):
def __init__(self, onnx_quantizer, onnx_node):
super().__init__(onnx_quantizer, onnx_node)
def quantize(self):
node = self.node
assert (node.op_type == "MatMul")
(quantized_input_names, zero_point_names, scale_names, nodes) = \
self.quantizer._quantize_inputs(node, [0, 1])
matmul_integer_output = node.output[0] + "_quantized"
matmul_integer_name = node.name + "_quant" if node.name != "" else ""
matmul_integer_node = onnx.helper.make_node("MatMulInteger", quantized_input_names + zero_point_names,
[matmul_integer_output], matmul_integer_name)
nodes.append(matmul_integer_node)
# Add cast operation to cast matmulInteger output to float.
cast_op_output = matmul_integer_output + "_cast_output"
cast_node = onnx.helper.make_node("Cast", [matmul_integer_output], [cast_op_output],
matmul_integer_output + "_cast",
to=onnx_proto.TensorProto.FLOAT)
nodes.append(cast_node)
# Add mul operation to multiply scales of two inputs.
assert (len(scale_names) == 2)
scales_mul_op = matmul_integer_name + "_scales_mul" if matmul_integer_name != "" else scale_names[
0] + "_" + scale_names[1] + "_mul"
scales_mul_node = _find_by_name(scales_mul_op, self.quantizer.new_nodes)
if scales_mul_node is None:
scales_mul_node = _get_mul_node(scale_names, scales_mul_op + ":0", scales_mul_op)
nodes.append(scales_mul_node)
scales_mul_op_output = scales_mul_node.output[0]
# Add mul operation to multiply mul_scales_op result with output of MatMulInteger
# and make the output of this node the same as output of original matmul node.
output_scale_mul_op = ""
if matmul_integer_name != "":
output_scale_mul_op = matmul_integer_name + "_output_scale_mul"
nodes.append(_get_mul_node([cast_op_output, scales_mul_op_output], node.output[0], output_scale_mul_op))
self.quantizer.new_nodes += nodes
'''
Used when quantize mode is QuantizationMode.QLinearOps
'''
class QLinearMatMul(QuantOperatorBase):
def __init__(self, onnx_quantizer, onnx_node):
super().__init__(onnx_quantizer, onnx_node)
def quantize(self):
node = self.node
assert (node.op_type == "MatMul")
(quantized_input_names, zero_point_names, scale_names, nodes) = \
self.quantizer._quantize_inputs(node, [0, 1])
data_found, output_scale_name, output_zp_name, _, _ = \
self.quantizer._get_quantization_params(node.output[0])
if not data_found:
raise ValueError("Quantization parameters for output:\"{}\" of node:\"{}\" not specified".format(
node.output[0], node.name))
qlinear_matmul_output = node.output[0] + "_quantized"
qlinear_matmul_name = node.name + "_quant" if node.name != "" else ""
qlinear_matmul_inputs = []
# Input 0
qlinear_matmul_inputs.append(quantized_input_names[0])
qlinear_matmul_inputs.append(scale_names[0])
qlinear_matmul_inputs.append(zero_point_names[0])
# Input 1
qlinear_matmul_inputs.append(quantized_input_names[1])
qlinear_matmul_inputs.append(scale_names[1])
qlinear_matmul_inputs.append(zero_point_names[1])
# Output quantization parameter
qlinear_matmul_inputs.append(output_scale_name)
qlinear_matmul_inputs.append(output_zp_name)
qlinear_matmul_node = onnx.helper.make_node("QLinearMatMul", qlinear_matmul_inputs, [qlinear_matmul_output],
qlinear_matmul_name)
nodes.append(qlinear_matmul_node)
# Create an entry for this quantized value
q_output = QuantizedValue(node.output[0], qlinear_matmul_output, output_scale_name, output_zp_name,
QuantizedValueType.Input)
self.quantizer.quantized_value_map[node.output[0]] = q_output
self.quantizer.new_nodes += nodes

View file

@ -0,0 +1,31 @@
import onnx
from .base_operator import QuantOperatorBase
from ..quant_utils import QuantizedValue, QuantizedValueType
from onnx import onnx_pb as onnx_proto
class QMaxPool(QuantOperatorBase):
def __init__(self, onnx_quantizer, onnx_node):
super().__init__(onnx_quantizer, onnx_node)
def quantize(self):
node = self.node
assert (node.op_type == "MaxPool")
# When mode is QLinearOps, the output quantization params are calculated based on outputs from
# activation nodes, therefore these nodes can be removed from the graph if they follow a quantized op.
# If input to this node is not quantized then keep this node
if node.input[0] not in self.quantizer.quantized_value_map:
self.quantizer.new_nodes += [node]
return
# Create an entry for output quantized value
quantized_input_value = self.quantizer.quantized_value_map[node.input[0]]
quantized_output_value = QuantizedValue(node.output[0], node.output[0] + "_quantized",
quantized_input_value.scale_name, quantized_input_value.zp_name,
QuantizedValueType.Input)
self.quantizer.quantized_value_map[node.output[0]] = quantized_output_value
node.input[0] = quantized_input_value.q_name
node.output[0] = quantized_output_value.q_name
self.quantizer.new_nodes += [node]

View file

@ -0,0 +1,181 @@
import onnx
from onnx import onnx_pb as onnx_proto
from enum import Enum
from pathlib import Path
__producer__ = "onnx.quantize"
__version__ = "0.1.0"
onnx_domain = "ai.onnx"
ms_domain = "com.microsoft"
type_to_name = {
1: "FLOAT",
2: "UINT8",
3: "INT8",
4: "UINT16",
5: "INT16",
6: "INT32",
7: "INT64",
8: "STRING",
9: "BOOL",
10: "FLOAT16",
11: "DOUBLE",
12: "UINT32",
13: "UINT64",
14: "COMPLEX64",
15: "COMPLEX128",
}
# Quantization mode
# IntegerOps: Use IntegerOps in quantized model. Only ConvInteger and MatMulInteger ops are supported now.
# QLinearOps: Use QLinearOps in quantized model. Only QLinearConv and QLinearMatMul ops are supported now.
class QuantizationMode():
IntegerOps = 0
QLinearOps = 1
quantization_modes = [
getattr(QuantizationMode, attr) for attr in dir(QuantizationMode)
if not callable(getattr(QuantizationMode, attr)) and not attr.startswith("__")
]
class QuantizedValueType():
Input = 0
Initializer = 1
class QuantType(Enum):
QInt8 = 1
QUInt8 = 2
class QuantizedInitializer:
'''
Represents a linearly quantized weight input from ONNX operators
'''
def __init__(self,
name,
initializer,
rmins,
rmaxs,
zero_points,
scales,
data=[],
quantized_data=[],
axis=None,
qType=onnx_proto.TensorProto.UINT8):
self.name = name
self.initializer = initializer # TensorProto initializer in ONNX graph
self.rmins = rmins # List of minimum range for each axis
self.rmaxs = rmaxs # List of maximum range for each axis
# 1D tensor of zero points computed for each axis. scalar if axis is empty
self.zero_points = zero_points
self.scales = scales # 1D tensor of scales computed for each axis. scalar if axis is empty
self.data = data # original data from initializer TensorProto
self.quantized_data = quantized_data # weight-packed data from data
# Scalar to specify which dimension in the initializer to weight pack.
self.axis = axis
# If empty, single zero point and scales computed from a single rmin and rmax
self.qType = qType # type of quantized data.
class QuantizedValue:
'''
Represents a linearly quantized value (input\output\intializer)
'''
def __init__(self,
name,
new_quantized_name,
scale_name,
zero_point_name,
quantized_value_type,
axis=None,
qType=onnx_proto.TensorProto.UINT8):
self.original_name = name
self.q_name = new_quantized_name
self.scale_name = scale_name
self.zp_name = zero_point_name
self.value_type = quantized_value_type
self.axis = axis
self.qType = qType
def _attribute_to_kwarg(attribute):
'''
Convert attribute to kwarg format for use with onnx.helper.make_node.
:parameter attribute: attribute in AttributeProto format.
:return: attribute in {key: value} format.
'''
if (attribute.type == 0):
raise ValueError('attribute {} does not have type specified.'.format(attribute.name))
# Based on attribute type definitions from AttributeProto
# definition in https://github.com/onnx/onnx/blob/master/onnx/onnx.proto
if (attribute.type == 1):
value = attribute.f
elif (attribute.type == 2):
value = attribute.i
elif (attribute.type == 3):
value = attribute.s
elif (attribute.type == 4):
value = attribute.t
elif (attribute.type == 5):
value = attribute.g
elif (attribute.type == 6):
value = attribute.floats
elif (attribute.type == 7):
value = attribute.ints
elif (attribute.type == 8):
value = attribute.strings
elif (attribute.type == 9):
value = attribute.tensors
elif (attribute.type == 10):
value = attribute.graphs
else:
raise ValueError('attribute {} has unsupported type {}.'.format(attribute.name, attribute.type))
return {attribute.name: value}
def _find_by_name(item_name, item_list):
'''
Helper function to find item by name in a list.
parameter item_name: name of the item.
parameter item_list: list of items.
return: item if found. None otherwise.
'''
items = [item for item in item_list if item.name == item_name]
return items[0] if len(items) > 0 else None
def _get_elem_index(elem_name, elem_list):
'''
Helper function to return index of an item in a node list
'''
elem_idx = -1
for i in range(0, len(elem_list)):
if elem_list[i] == elem_name:
elem_idx = i
return elem_idx
def _get_mul_node(inputs, output, name):
'''
Helper function to create a Mul node.
parameter inputs: list of input names.
parameter output: output name.
parameter name: name of the node.
return: Mul node in NodeProto format.
'''
return onnx.helper.make_node("Mul", inputs, [output], name)
def _generate_identified_filename(filename: Path, identifier: str) -> Path:
'''
Helper function to generate a identifiable filepath by concatenating the given identifier as a suffix.
'''
return filename.parent.joinpath(filename.stem + identifier).with_suffix(filename.suffix)

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,41 @@
from .quant_utils import QuantizationMode
from .operators.base_operator import QuantOperatorBase
from .operators.matmul import MatMulInteger, QLinearMatMul
from .operators.attention import AttentionQuant
from .operators.embed_layernorm import EmbedLayerNormalizationQuant
from .operators.gather import GatherQuant
from .operators.conv import QLinearCov, ConInteger
from .operators.activation import QLinearActivation
from .operators.binary_op import QLinearBinaryOp
from .operators.maxpool import QMaxPool
CommonOpsRegistry = {"Gather": GatherQuant, "EmbedLayerNormalization": EmbedLayerNormalizationQuant}
IntegerOpsRegistry = {
"Conv": ConInteger,
"MatMul": MatMulInteger,
"Attention": AttentionQuant,
}
IntegerOpsRegistry.update(CommonOpsRegistry)
QLinearOpsRegistry = {
"Conv": QLinearCov,
"MatMul": QLinearMatMul,
"Add": QLinearBinaryOp,
"Mul": QLinearBinaryOp,
"Relu": QLinearActivation,
"Clip": QLinearActivation,
"MaxPool": QMaxPool,
}
QLinearOpsRegistry.update(CommonOpsRegistry)
def CreateDefaultOpQuantizer(onnx_quantizer, node):
return QuantOperatorBase(onnx_quantizer, node)
def CreateOpQuantizer(onnx_quantizer, node):
registry = IntegerOpsRegistry if onnx_quantizer.mode == QuantizationMode.IntegerOps else QLinearOpsRegistry
if node.op_type in registry.keys():
return registry[node.op_type](onnx_quantizer, node)
return QuantOperatorBase(onnx_quantizer, node)

View file

@ -0,0 +1,331 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# -*- coding: UTF-8 -*-
import numpy as np
import onnx
from onnx import helper, numpy_helper, TensorProto, ValueInfoProto
from onnx import shape_inference
import onnxruntime
from pathlib import Path
import unittest
import urllib.request
from onnxruntime.quantization.quantize import optimize_model, ONNXQuantizer
from onnxruntime.quantization.onnx_model import ONNXModel
from onnxruntime.quantization.quant_utils import QuantizationMode
from onnx import onnx_pb as onnx_proto
def generate_input_initializer(tensor_shape, tensor_dtype, input_name):
'''
Helper function to generate initializers for inputs
'''
tensor = np.random.ranf(tensor_shape).astype(tensor_dtype)
init = numpy_helper.from_array(tensor, input_name)
return init
def generate_qat_model(model_names):
test_models = []
test_initializers = []
'''
TEST_MODEL_CONFIG_1
'''
# Main graph:
#
# [A] [input_bias]
# \ /
# Add [scale_zp_const] [input_weight]
# | \ /
# | QuantizeLinear_1
# QuantizeLinear_0 |
# | DequantizeLinear_1
# | /
# DequantizeLinear_0 Transpose
# \ /
# \ / <--- (actual graph: this branch is folded)
# Matmul
# |
# |
# [B]
graph = helper.make_graph(
[
#nodes
helper.make_node("Add", ["A", "input_bias"], ["add_out"], "add0"),
helper.make_node("QuantizeLinear", ["add_out", "quant0_scale_const", "quant0_zp_const"], ["quant0_out"],
"qlinear0"),
helper.make_node("DequantizeLinear", ["quant0_out", "dequant0_scale_const", "dequant0_zp_const"],
["dequant0_out"], "dqlinear0"),
helper.make_node("MatMul", ["dequant0_out", "trans_out"], ["B"], "matmul"),
],
"QAT_model_1", #name
[ #input
helper.make_tensor_value_info('A', TensorProto.FLOAT, ['unk_1'])
],
[ #output
helper.make_tensor_value_info('B', TensorProto.FLOAT, [1024])
],
[ #initializers
helper.make_tensor('quant0_scale_const', TensorProto.FLOAT, [], [0.01961481384932995]),
helper.make_tensor('quant0_zp_const', TensorProto.INT8, [], [0]),
helper.make_tensor('dequant0_scale_const', TensorProto.FLOAT, [], [0.01961481384932995]),
helper.make_tensor('dequant0_zp_const', TensorProto.INT8, [], [0]),
])
input_weight_1 = generate_input_initializer([1024, 1024], np.float32, 'trans_out')
input_bias_1 = generate_input_initializer([1024], np.float32, 'input_bias')
graph.initializer.add().CopyFrom(input_weight_1)
graph.initializer.add().CopyFrom(input_bias_1)
model_1 = onnx.helper.make_model(graph)
model_1.ir_version = onnx.IR_VERSION
opset = model_1.opset_import.add()
opset.version = 11
onnx.save(model_1, model_names[0])
test_models.extend([model_1])
initiazliers_1 = [input_weight_1, input_bias_1]
test_initializers.append(initiazliers_1)
'''
TEST_MODEL_CONFIG_2
'''
# Main graph:
#
# [A]
# |
# MaxPool
# / \
# QuantizeLinear_0 QuantizeLinear_1
# | |
# DequantizeLinear_0 DequantizeLinear_1
# | |
# Conv_0-[weight,bias] Conv_1-[weight,bias]
# \ /
# \ /
# Add
# |
# [B]
graph = helper.make_graph(
[
#nodes
helper.make_node("MaxPool", ["A"], ["maxpool_out"], "maxpool"),
helper.make_node("QuantizeLinear", ["maxpool_out", "quant0_scale_const", "quant0_zp_const"], ["quant0_out"],
"qlinear0"),
helper.make_node("DequantizeLinear", ["quant0_out", "dequant0_scale_const", "dequant0_zp_const"],
["dequant0_out"], "dqlinear0"),
helper.make_node("Conv", ["dequant0_out"], ["conv0_out"], "conv0"),
helper.make_node("QuantizeLinear", ["maxpool_out", "quant1_scale_const", "quant1_zp_const"], ["quant1_out"],
"qlinear1"),
helper.make_node("DequantizeLinear", ["quant1_out", "dequant1_scale_const", "dequant1_zp_const"],
["dequant1_out"], "dqlinear1"),
helper.make_node("Conv", ["dequant1_out"], ["conv1_out"], "conv1"),
helper.make_node("Add", ["conv0_out", "conv1_out"], ["B"], "add"),
],
"QAT_model_2", #name
[ #input
helper.make_tensor_value_info('A', TensorProto.FLOAT, ['unk_1'])
],
[ #output
helper.make_tensor_value_info('B', TensorProto.FLOAT, [256, 64, 1, 1])
],
[ #initializers
helper.make_tensor('quant0_scale_const', TensorProto.FLOAT, [], [0.2062656134366989]),
helper.make_tensor('quant0_zp_const', TensorProto.UINT8, [], [165]),
helper.make_tensor('dequant0_scale_const', TensorProto.FLOAT, [], [0.2062656134366989]),
helper.make_tensor('dequant0_zp_const', TensorProto.UINT8, [], [165]),
helper.make_tensor('quant1_scale_const', TensorProto.FLOAT, [], [0.10088317096233368]),
helper.make_tensor('quant1_zp_const', TensorProto.UINT8, [], [132]),
helper.make_tensor('dequant1_scale_const', TensorProto.FLOAT, [], [0.10088317096233368]),
helper.make_tensor('dequant1_zp_const', TensorProto.UINT8, [], [132]),
])
conv_weight_0 = generate_input_initializer([256, 64, 1, 1], np.float32, 'conv_weight_0')
conv_bias_0 = generate_input_initializer([256], np.float32, 'conv_bias_0')
graph.initializer.add().CopyFrom(conv_weight_0)
graph.initializer.add().CopyFrom(conv_bias_0)
conv_weight_1 = generate_input_initializer([256, 64, 1, 1], np.float32, 'conv_weight_1')
conv_bias_1 = generate_input_initializer([256], np.float32, 'conv_bias_1')
graph.initializer.add().CopyFrom(conv_weight_1)
graph.initializer.add().CopyFrom(conv_bias_1)
model_2 = onnx.helper.make_model(graph)
model_2.ir_version = onnx.IR_VERSION
opset = model_2.opset_import.add()
opset.version = 11
onnx.save(model_2, model_names[1])
test_models.extend([model_2])
initializers_2 = [conv_weight_0, conv_bias_0, conv_weight_1, conv_weight_1]
test_initializers.append(initializers_2)
return test_models, test_initializers
def generate_qat_support_model(model_names, test_initializers):
'''
EXPECTED_TEST_RESULT_CONFIG_1
'''
test_qat_support_models = []
# Main graph:
# [A] [input_bias]
# \ /
# Add [Transpose_output]
# \ |
# \ /
# Matmul -([input_weight])
# |
# |
# [B]
graph = helper.make_graph(
[ #nodes
helper.make_node("Add", ["A", "input_bias"], ["add_out"], "add0"),
helper.make_node("MatMul", ["add_out", "trans_out"], ["B"], "matmul"),
],
"QAT_support_model_1", #name
[
#input
helper.make_tensor_value_info('A', TensorProto.FLOAT, ['unk_1'])
],
[
#output
helper.make_tensor_value_info('B', TensorProto.FLOAT, [1024])
])
#initializers
init_1 = test_initializers[0]
for init in init_1:
graph.initializer.add().CopyFrom(init)
model_1 = onnx.ModelProto()
model_1.ir_version = onnx.IR_VERSION
opset = model_1.opset_import.add()
opset.version = 11
model_1 = onnx.helper.make_model(graph)
onnx.save(model_1, model_names[0])
test_qat_support_models.extend([model_1])
'''
EXPECTED_TEST_RESULT_CONFIG_2
'''
# Main graph:
# [A]
# |
# MaxPool
# / \
# Conv_0-[weight,bias] Conv_1-[weight,bias]
# \ /
# \ /
# Add
# |
# [B]
graph = helper.make_graph(
[ #nodes
helper.make_node("MaxPool", ["A"], ["maxpool_out"], "maxpool"),
helper.make_node("Conv", ["maxpool_out"], ["conv0_out"], "conv0"),
helper.make_node("Conv", ["maxpool_out"], ["conv1_out"], "conv1"),
helper.make_node("Add", ["conv0_out", "conv1_out"], ["B"], "add"),
],
"QAT_support_model_2", #name
[ #input
helper.make_tensor_value_info('A', TensorProto.FLOAT, ['unk_1'])
],
[ #output
helper.make_tensor_value_info('B', TensorProto.FLOAT, [256, 64, 1, 1])
])
#initializers
init_2 = test_initializers[1]
for init in init_2:
graph.initializer.add().CopyFrom(init)
model_2 = onnx.ModelProto()
model_2.ir_version = onnx.IR_VERSION
opset = model_2.opset_import.add()
opset.version = 11
model_2 = onnx.helper.make_model(graph)
onnx.save(model_1, model_names[1])
test_qat_support_models.extend([model_2])
return test_qat_support_models
def compare_two_models(model_1, model_2):
'''
Helper function to check if two models are the same
:param: model_1 - expected model
:param: model_2 - actual model
Return true if two models are the same. Otherwise return false.
'''
check_1, check_2 = True, True
#check nodes
for node_1 in model_1.graph.node:
node_found = False
for node_2 in model_2.graph.node:
if node_2.name == node_1.name:
node_found = True
if node_2.input != node_1.input or node_2.output != node_1.output:
check_1 = False
print("Error: Node {} in test model dismatch with the expected model.".format(node_2.name))
break
if not node_found:
check_1 = False
print("Error:Node {} in the expected model not found in test model.".format(node_1.name))
break
#check initializers:
for init_1 in model_1.graph.initializer:
init1_arr = numpy_helper.to_array(init_1)
init_found = False
for init_2 in model_2.graph.initializer:
if init_2.name == init_1.name:
init_found = True
init2_arr = numpy_helper.to_array(init_2)
if not np.array_equal(init1_arr, init2_arr):
check_2 = False
print("Error: Initializer {} in test model dismatches with the expected model.".format(
init_2.name))
break
if not init_found:
check_2 = False
print("Error: Initializer {} in the expected model not found in test model.".format(init_1.name))
break
return check_1 and check_2
class TestQAT(unittest.TestCase):
def test_remove_fakequant_nodes(self):
model_names = ["qat_model_1.onnx", "qat_model_2.onnx"]
qat_support_model_names = ["qat_support_model_1.onnx", "qat_support_model_2.onnx"]
test_models, test_initializers = generate_qat_model(model_names)
qat_support_models_expected = generate_qat_support_model(qat_support_model_names, test_initializers)
for i in range(len(test_models)):
quantizer = ONNXQuantizer(test_models[i], False, QuantizationMode.IntegerOps, False, True, TensorProto.INT8,
TensorProto.INT8, None, None, None, ['Conv', 'MatMul', 'MaxPool'])
#test remove editting to the graph
qat_support_model_actual = quantizer.remove_fake_quantized_nodes()
assert compare_two_models(qat_support_models_expected[i], qat_support_model_actual)
print("TEST_MODEL {} finished: ".format(i) + qat_support_model_names[i])
if __name__ == '__main__':
unittest.main()

View file

@ -230,6 +230,7 @@ packages = [
'onnxruntime.datasets',
'onnxruntime.tools',
'onnxruntime.quantization',
'onnxruntime.quantization.operators',
]
# TODO: thiagofc: Temporary 'experimental' namespace for new PyTorch front-end