onnxruntime/tools/python/find_optimizer_opset_version_updates_required.py
Scott McKay 4d8510611b
Update find_optimizer_opset_version_updates_required.py to use the ONNX headers to determine the latest opset. (#12484)
**Description**: 
Use the onnx headers to find the latest opset for each operator. This
allows the script to detect optimizers with
`graph_utils::IsSupportedOptypeVersionAndDomain` calls that need
updating when run during the update of the onnx commit id. Without this
change issues are not detected until a new kernel is registered.

**Motivation and Context**
Detect optimizers that need updates as part of the ONNX update process.
2022-09-29 16:55:22 +10:00

219 lines
8.3 KiB
Python

#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import argparse
import glob
import logging
import os
import re
import typing
logging.basicConfig(format="[%(levelname)s] - %(message)s", level=logging.DEBUG)
log = logging.getLogger()
def parse_args():
parser = argparse.ArgumentParser(
description="Find optimizers that involve operators which may need an update to the supported opset versions."
)
root_arg = parser.add_argument(
"--ort-root", "-o", required=True, type=str, help="The root directory of the ONNX Runtime repository to search."
)
args = parser.parse_args()
if not os.path.isdir(args.ort_root):
raise argparse.ArgumentError(root_arg, "{} is not a valid directory".format(args.ort_root))
return args
def get_call_args_from_file(filename: str, function_or_declaration: str) -> typing.List[str]:
"""
Search a file for all function calls or declarations that match the provided name.
Requires both the opening '(' and closing ')' to be on the same line.
Handles multiple calls being on the same line.
"""
results = []
with open(filename) as f:
line_num = 0
for line in f:
for match in re.finditer(function_or_declaration, line):
# check we have both the opening and closing brackets for the function call/declaration.
# if we do we have all the arguments
start = line.find("(", match.end())
end = line.find(")", match.end())
have_all_args = start != -1 and end != -1
if have_all_args:
results.append(line[start + 1 : end])
else:
# TODO: handle automatically by merging lines
log.error(
"Call/Declaration is split over multiple lines. Please check manually."
"File:{} Line:{}".format(filename, line_num)
)
continue
line_num += 1
return results
def get_multiline_call_args_from_file(filename: str, function_or_declaration: str) -> typing.List[str]:
"""
Search a file for all function calls or declarations that match the provided name.
Allows the opening '(' and closing ')' to be split across multiple lines.
Supports a single call per line.
"""
results = []
with open(filename) as f:
function_and_args = None
for line in f:
if not function_and_args:
# look for new match
start = line.find(function_or_declaration)
if start != -1:
function_and_args = line[start:].strip()
else:
# append to existing line and look for closing ')'
start = len(function_and_args)
function_and_args += line.strip()
if function_and_args:
end = function_and_args.find(")", start)
if end != -1:
start_args = function_and_args.find("(")
results.append(function_and_args[start_args + 1 : end])
function_and_args = None
return results
def _add_if_newer(domain: str, op: str, opset: int, op_to_opset: typing.Dict[str, int]):
key = domain + "." + op
if key not in op_to_opset or op_to_opset[key] < opset:
op_to_opset[key] = opset
def get_latest_ort_op_versions(root_dir):
"""Find the entries for the latest opset for each operator."""
op_to_opset = {}
files = [
# for ONNX operators we use get_latest_onnx_op_versions
# os.path.join(root_dir, "onnxruntime/core/providers/cpu/cpu_execution_provider.cc"),
# for internal kernels we use the current registrations
os.path.join(root_dir, "onnxruntime/contrib_ops/cpu/cpu_contrib_kernels.cc"),
os.path.join(root_dir, "onnxruntime/contrib_ops/cuda/cuda_contrib_kernels.cc"),
]
for file in files:
# e.g. class ONNX_OPERATOR_KERNEL_CLASS_NAME(kCpuExecutionProvider, kOnnxDomain, 11, Clip);
calls = get_multiline_call_args_from_file(file, "ONNX_OPERATOR_KERNEL_CLASS_NAME")
for call in calls:
args = call.split(",")
domain = args[1].strip()
opset = args[2].strip()
op = args[3].strip()
_add_if_newer(domain, op, int(opset), op_to_opset)
# e.g. class ONNX_OPERATOR_TYPED_KERNEL_CLASS_NAME(kCpuExecutionProvider, kOnnxDomain, 11, float, ArgMax);
calls = get_multiline_call_args_from_file(file, "ONNX_OPERATOR_TYPED_KERNEL_CLASS_NAME")
for call in calls:
args = call.split(",")
domain = args[1].strip()
opset = args[2].strip()
op = args[4].strip()
_add_if_newer(domain, op, int(opset), op_to_opset)
return op_to_opset
def get_latest_onnx_op_versions(root_dir):
"""Get the latest versions of the ONNX operators from the ONNX headers."""
op_to_opset = {}
files = [
# operators with domain of 'Onnx'
os.path.join(root_dir, "cmake/external/onnx/onnx/defs/operator_sets.h"),
# ML operators with domain of 'OnnxML'
os.path.join(root_dir, "cmake/external/onnx/onnx/defs/operator_sets_ml.h"),
]
for file in files:
# e.g. fn(GetOpSchema<ONNX_OPERATOR_SET_SCHEMA_CLASS_NAME(Onnx, 17, LayerNormalization)>());
# fn(GetOpSchema<ONNX_OPERATOR_SET_SCHEMA_CLASS_NAME(OnnxML, 3, TreeEnsembleClassifier)>());
calls = get_multiline_call_args_from_file(file, "ONNX_OPERATOR_SET_SCHEMA_CLASS_NAME")
for call in calls:
args = call.split(",")
orig_domain = args[0].strip()
# convert domain to the ORT constants
domain = "kMLDomain" if orig_domain == "OnnxML" else "kOnnxDomain"
opset = args[1].strip()
op = args[2].strip()
_add_if_newer(domain, op, int(opset), op_to_opset)
return op_to_opset
def find_potential_issues(root_dir, op_to_opset):
optimizer_dir = os.path.join(root_dir, "onnxruntime/core/optimizer")
files = glob.glob(optimizer_dir + "/**/*.cc", recursive=True)
files += glob.glob(optimizer_dir + "/**/*.h", recursive=True)
for file in files:
calls = get_call_args_from_file(file, "graph_utils::IsSupportedOptypeVersionAndDomain")
for call in calls:
# Need to handle multiple comma separated version numbers, and the optional domain argument.
# e.g. IsSupportedOptypeVersionAndDomain(node, "MaxPool", {1, 8, 10})
# IsSupportedOptypeVersionAndDomain(node, "FusedConv", {1}, kMSDomain)
args = call.split(",", 2) # first 2 args are simple, remainder need custom processing
op = args[1].strip()
versions_and_domain_arg = args[2]
v1 = versions_and_domain_arg.find("{")
v2 = versions_and_domain_arg.find("}")
versions = versions_and_domain_arg[v1 + 1 : v2].split(",")
last_version = versions[-1].strip()
domain_arg_start = versions_and_domain_arg.find(",", v2)
if domain_arg_start != -1:
domain = versions_and_domain_arg[domain_arg_start + 1 :].strip()
else:
domain = "kOnnxDomain"
if op.startswith('"') and op.endswith('"'):
op = domain + "." + op[1:-1]
else:
log.error("Symbolic name of '{}' found for op. Please check manually. File:{}".format(op, file))
continue
if op in op_to_opset:
latest = op_to_opset[op]
if int(latest) != int(last_version):
log.warning(
"Newer opset found for {}. Latest:{} Optimizer support ends at {}. File:{}".format(
op, latest, last_version, file
)
)
else:
log.error("Failed to find version information for {}. File:{}".format(op, file))
if __name__ == "__main__":
arguments = parse_args()
ort_to_opset_map = get_latest_ort_op_versions(arguments.ort_root)
onnx_op_to_opset_map = get_latest_onnx_op_versions(arguments.ort_root)
# merge the two maps
op_to_opset_map = {**ort_to_opset_map, **onnx_op_to_opset_map}
find_potential_issues(arguments.ort_root, op_to_opset_map)