diff --git a/onnxruntime/core/providers/cpu/ml/scaler.cc b/onnxruntime/core/providers/cpu/ml/scaler.cc index 2c62bd6e9b..015612613d 100644 --- a/onnxruntime/core/providers/cpu/ml/scaler.cc +++ b/onnxruntime/core/providers/cpu/ml/scaler.cc @@ -60,6 +60,8 @@ ONNX_CPU_OPERATOR_TYPED_ML_KERNEL( KernelDefBuilder().TypeConstraint("T", DataTypeImpl::GetTensorType()).MayInplace(0, 0), ScalerOp); +static constexpr int kParallelizationThreshold = 10 * 1000; + template ScalerOp::ScalerOp(const OpKernelInfo& info) : OpKernel(info), scale_(info.GetAttrsOrDefault("scale")), @@ -84,29 +86,27 @@ common::Status ScalerOp::Compute(OpKernelContext* context) const { size_t x_size = x_shape.Size(); int64_t stride = x_dims.size() == 1 ? x_dims[0] : x_dims[1]; auto* ttp = context->GetOperatorThreadPool(); - auto num_threads = std::min(concurrency::ThreadPool::DegreeOfParallelism(ttp), static_cast(x_size)); + auto conditional_batch_call = [ttp, x_size](std::function f) { + if (x_size < kParallelizationThreshold) { // TODO: tune this, arbitrary threshold + for (size_t i = 0; i < x_size; ++i) { + f(i); + } + } else { + concurrency::ThreadPool::TryBatchParallelFor(ttp, x_size, f, 0); + } + }; if (static_cast(offset_.size()) == stride && static_cast(scale_.size()) == stride) { - concurrency::ThreadPool::TrySimpleParallelFor( - ttp, - num_threads, - [this, num_threads, y_data, x_data, stride, x_size](ptrdiff_t batch_num) { - auto work = concurrency::ThreadPool::PartitionWork(batch_num, num_threads, x_size); - for (auto i = work.start; i < work.end; ++i) { - y_data[i] = static_cast((x_data[i] - offset_[i % stride]) * scale_[i % stride]); - } - }); + auto fn = [this, y_data, x_data, stride](ptrdiff_t i) { + y_data[i] = static_cast((x_data[i] - offset_[i % stride]) * scale_[i % stride]); + }; + conditional_batch_call(fn); } else if (offset_.size() == 1 && scale_.size() == 1) { - concurrency::ThreadPool::TrySimpleParallelFor( - ttp, - num_threads, - [this, num_threads, y_data, x_data, x_size](ptrdiff_t batch_num) { - auto work = concurrency::ThreadPool::PartitionWork(batch_num, num_threads, x_size); - for (auto i = work.start; i < work.end; ++i) { - y_data[i] = static_cast((x_data[i] - offset_[0]) * scale_[0]); - } - }); + auto fn = [this, y_data, x_data](ptrdiff_t i) { + y_data[i] = static_cast((x_data[i] - offset_[0]) * scale_[0]); + }; + conditional_batch_call(fn); } else { std::ostringstream err_msg; err_msg << "Either both scale and offset can be of feature size (" << stride << ") or 1"; diff --git a/onnxruntime/core/providers/cpu/tensor/gather_elements.cc b/onnxruntime/core/providers/cpu/tensor/gather_elements.cc index b5fd0bfcf8..73782d7afd 100644 --- a/onnxruntime/core/providers/cpu/tensor/gather_elements.cc +++ b/onnxruntime/core/providers/cpu/tensor/gather_elements.cc @@ -25,6 +25,8 @@ ONNX_CPU_OPERATOR_KERNEL( DataTypeImpl::GetTensorType()}), GatherElements); +static constexpr int kParallelizationThreshold = 10 * 1000; + // Some helpers needed for GatherElements op - // The following method computes the offset in the flattened array @@ -87,7 +89,8 @@ static inline void increment_over_inner_dim(std::vector& current_dims, } template -static inline int64_t GetNegativeIndexAdjustedValue(const Tin* indices_data, Tin index, int64_t axis, const TensorShape& input_shape) { +static inline int64_t GetNegativeIndexAdjustedValue(const Tin* indices_data, Tin index, int64_t axis, + const TensorShape& input_shape) { int64_t retval = -1; if (indices_data[index] < 0) { retval = static_cast(indices_data[index] + input_shape[axis]); @@ -105,7 +108,7 @@ static inline int64_t GetNegativeIndexAdjustedValue(const Tin* indices_data, Tin #endif template static void core_impl(const Tensor* input_tensor, const Tensor* indices_tensor, - Tensor* output_tensor, int64_t axis) { + Tensor* output_tensor, int64_t axis, concurrency::ThreadPool* ttp) { // get pointer to input data // optimizer will remove the redundant if/else block based on 'is_string' template parameter const T* input_data = nullptr; @@ -135,11 +138,15 @@ static void core_impl(const Tensor* input_tensor, const Tensor* indices_tensor, auto num_elements = indices_tensor->Shape().Size(); int64_t lower_index_limit = -input_shape[axis]; int64_t upper_index_limit = input_shape[axis] - 1; - for (int64_t i = 0; i < num_elements; ++i) { + + auto validation_fn = [indices_data, lower_index_limit, upper_index_limit](ptrdiff_t i) { auto indices_val = indices_data[i]; if (indices_val < lower_index_limit || indices_val > upper_index_limit) ORT_THROW("GatherElements op: Value in indices must be within bounds [", lower_index_limit, " , ", upper_index_limit, "]. Actual value is ", indices_val); + }; + for (int64_t i = 0; i < num_elements; ++i) { // TODO: parallelize this? didn't give any benefit in my tests + validation_fn(i); } int64_t num_inner_dim = calculate_num_inner_dim(indices_shape); @@ -147,11 +154,21 @@ static void core_impl(const Tensor* input_tensor, const Tensor* indices_tensor, bool processing_inner_dim = (axis == input_rank - 1) ? true : false; int64_t base_offset = 0; - Tin indices_counter = -1; - int64_t output_counter = -1; + Tin indices_counter = 0; size_t element_size = input_tensor->DataType()->Size(); std::vector process_dims(input_rank, 0); + int64_t output_counter = 0; + + auto conditional_batch_call = [ttp, inner_dim_size](std::function f) { + if (inner_dim_size < kParallelizationThreshold) { // TODO: tune this, arbitrary threshold + for (int64_t i = 0; i < inner_dim_size; ++i) { + f(i); + } + } else { + concurrency::ThreadPool::TryBatchParallelFor(ttp, inner_dim_size, f, 0); + } + }; if (!processing_inner_dim) { while (num_inner_dim-- != 0) { @@ -160,22 +177,27 @@ static void core_impl(const Tensor* input_tensor, const Tensor* indices_tensor, // process 1 chunk of 'inner dimension' length // optimizer will remove the redundant if/else block based on 'is_string' template parameter if (is_string) { - for (int64_t i = 0; i < inner_dim_size; ++i) { - output_data[++output_counter] = + auto fn = [input_data, output_data, base_offset, input_shape_pitches, + indices_data, indices_counter, axis, input_shape, output_counter](ptrdiff_t i) { + output_data[i + output_counter] = input_data[base_offset + - (GetNegativeIndexAdjustedValue(indices_data, ++indices_counter, axis, input_shape) * + (GetNegativeIndexAdjustedValue(indices_data, static_cast(i) + indices_counter, axis, input_shape) * input_shape_pitches[axis]) + i]; - } + }; + conditional_batch_call(fn); + output_counter += inner_dim_size; } else { - for (int64_t i = 0; i < inner_dim_size; ++i) { - // optimizer will remove the redundant if/else block based on 'is_string' template parameter - memcpy(output_data, - input_data + (base_offset + (GetNegativeIndexAdjustedValue(indices_data, ++indices_counter, axis, input_shape) * input_shape_pitches[axis]) + i) * element_size, + auto fn = [input_data, output_data, base_offset, input_shape_pitches, element_size, + indices_data, indices_counter, axis, input_shape](ptrdiff_t i) { + memcpy(output_data + (i * element_size), + input_data + (base_offset + (GetNegativeIndexAdjustedValue(indices_data, static_cast(i) + indices_counter, axis, input_shape) * input_shape_pitches[axis]) + i) * element_size, element_size); - output_data += element_size; - } + }; + conditional_batch_call(fn); + output_data += inner_dim_size * element_size; } + indices_counter += static_cast(inner_dim_size); increment_over_inner_dim(process_dims, indices_shape); } } @@ -185,27 +207,31 @@ static void core_impl(const Tensor* input_tensor, const Tensor* indices_tensor, base_offset = compute_base_offset(process_dims, input_shape_pitches, axis); // process 1 chunk of 'inner dimension' length - // optimizer will remove the redundant if/else block based on 'is_string' template parameter if (is_string) { - for (int64_t i = 0; i < inner_dim_size; ++i) { + auto fn = [input_data, output_data, base_offset, + indices_data, indices_counter, axis, input_shape, output_counter](ptrdiff_t i) { // for innermost axis, input_shape_pitches[axis] = 1 (so no need to multiply) - output_data[++output_counter] = + output_data[i + output_counter] = input_data[base_offset + - GetNegativeIndexAdjustedValue(indices_data, ++indices_counter, axis, input_shape)]; - } + GetNegativeIndexAdjustedValue(indices_data, static_cast(i) + indices_counter, axis, input_shape)]; + }; + conditional_batch_call(fn); + output_counter += inner_dim_size; } else { - for (int64_t i = 0; i < inner_dim_size; ++i) { - // for innermost axis, input_shape_pitches[axis] = 1 (so no need to multiply) - // optimizer will remove the redundant if/else block based on 'is_string' template parameter - memcpy(output_data, + // for innermost axis, input_shape_pitches[axis] = 1 (so no need to multiply) + auto fn = [input_data, output_data, base_offset, element_size, + indices_data, indices_counter, axis, input_shape](ptrdiff_t i) { + memcpy(output_data + (i * element_size), input_data + (base_offset + - GetNegativeIndexAdjustedValue(indices_data, ++indices_counter, axis, input_shape)) * + GetNegativeIndexAdjustedValue(indices_data, static_cast(i) + indices_counter, axis, input_shape)) * element_size, element_size); - output_data += element_size; - } + }; + conditional_batch_call(fn); + output_data += inner_dim_size * element_size; } + indices_counter += static_cast(inner_dim_size); increment_over_inner_dim(process_dims, indices_shape); } } @@ -272,16 +298,17 @@ Status GatherElements::Compute(OpKernelContext* context) const { if (indices_shape.Size() == 0) return Status::OK(); + auto* ttp = context->GetOperatorThreadPool(); if (input_tensor->IsDataTypeString()) { if (indices_tensor->IsDataType()) - core_impl(input_tensor, indices_tensor, output_tensor, axis); + core_impl(input_tensor, indices_tensor, output_tensor, axis, ttp); else - core_impl(input_tensor, indices_tensor, output_tensor, axis); + core_impl(input_tensor, indices_tensor, output_tensor, axis, ttp); } else { if (indices_tensor->IsDataType()) - core_impl(input_tensor, indices_tensor, output_tensor, axis); + core_impl(input_tensor, indices_tensor, output_tensor, axis, ttp); else - core_impl(input_tensor, indices_tensor, output_tensor, axis); + core_impl(input_tensor, indices_tensor, output_tensor, axis, ttp); } return Status::OK(); diff --git a/onnxruntime/test/providers/cpu/ml/scaler_test.cc b/onnxruntime/test/providers/cpu/ml/scaler_test.cc index 39e616069f..f32f02f17e 100644 --- a/onnxruntime/test/providers/cpu/ml/scaler_test.cc +++ b/onnxruntime/test/providers/cpu/ml/scaler_test.cc @@ -3,19 +3,29 @@ #include "gtest/gtest.h" #include "test/providers/provider_test_utils.h" + using namespace std; namespace onnxruntime { namespace test { template -void TestScalar() { +void TestScalar(bool use_big_input = false) { OpTester test("Scaler", 1, onnxruntime::kMLDomain); vector scale{3.f, -4.f, 3.0f}; vector offset{4.8f, -0.5f, 77.0f}; test.AddAttribute("scale", scale); test.AddAttribute("offset", offset); - vector input{1, -2, 3, 4, 5, -6}; - vector dims{2, 3}; + vector input; + vector dims; + + if (!use_big_input) { + input = vector{1, -2, 3, 4, 5, -6}; + dims = {2, 3}; + } else { + input.resize(15 * 1000); // must be >= kParallelizationThreshold in scaler.cc + std::iota(std::begin(input), std::end(input), static_cast(1)); + dims = {5000, 3}; + } // prepare expected output vector expected_output; @@ -33,6 +43,7 @@ TEST(MLOpTest, ScalerOp) { TestScalar(); TestScalar(); TestScalar(); + TestScalar(true); // use big input } TEST(MLOpTest, ScalerOpScaleOffsetSize1) { @@ -55,5 +66,27 @@ TEST(MLOpTest, ScalerOpScaleOffsetSize1) { test.Run(); } +// tests invocation via TryBatchParallelFor for input of size 10K +TEST(MLOpTest, ScalerOpScaleOffsetSize1BigInput) { + OpTester test("Scaler", 1, onnxruntime::kMLDomain); + vector scale{3.f}; + vector offset{4.8f}; + test.AddAttribute("scale", scale); + test.AddAttribute("offset", offset); + vector input(15 * 1000); // must be >= kParallelizationThreshold in scaler.cc + std::iota(std::begin(input), std::end(input), 1.0f); + vector dims{3, 5000}; + + // prepare expected output + vector expected_output; + for (size_t i = 0; i < input.size(); ++i) { + expected_output.push_back((input[i] - offset[0]) * scale[0]); + } + + test.AddInput("X", dims, input); + test.AddOutput("Y", dims, expected_output); + test.Run(); +} + } // namespace test } // namespace onnxruntime diff --git a/onnxruntime/test/providers/cpu/tensor/gather_elements_op_test.cc b/onnxruntime/test/providers/cpu/tensor/gather_elements_op_test.cc index 47cdeeb739..1d41162a08 100644 --- a/onnxruntime/test/providers/cpu/tensor/gather_elements_op_test.cc +++ b/onnxruntime/test/providers/cpu/tensor/gather_elements_op_test.cc @@ -315,5 +315,23 @@ TEST(GatherElementsOpTest, string) { RunTypedTest(); } +TEST(GatherElementsOpTest, BigIndices) { + // int32_t indices - axis 0 + OpTester test1("GatherElements", 11); + + test1.AddAttribute("axis", 0LL); + const int kNumIndices = 10 * 1000; // must be >= kParallelizationThreshold in gather_elements.cc + std::vector input(2 * kNumIndices); + std::iota(std::begin(input), std::end(input), 0.f); + test1.AddInput("data", {2, kNumIndices}, input); + + std::vector indices(kNumIndices, 0); + std::vector output(kNumIndices); + std::iota(std::begin(output), std::end(output), 0.f); + test1.AddInput("indices", {1, kNumIndices}, indices); + test1.AddOutput("output", {1, kNumIndices}, output); + test1.Run(); +} + } // namespace test } // namespace onnxruntime