From bbb9d92a5fe12a36d5fb6f9b7f53548b77eaa2e8 Mon Sep 17 00:00:00 2001 From: Tim Harris Date: Thu, 3 Sep 2020 17:04:31 +0100 Subject: [PATCH] Remove SchedulingParams variants of ThreadPool::TryParallelFor (#5050) --- .../onnxruntime/core/platform/threadpool.h | 89 ------------------- onnxruntime/core/common/threadpool.cc | 18 ---- onnxruntime/test/onnx/microbenchmark/common.h | 2 + onnxruntime/test/onnx/microbenchmark/gelu.cc | 29 +++--- 4 files changed, 21 insertions(+), 117 deletions(-) diff --git a/include/onnxruntime/core/platform/threadpool.h b/include/onnxruntime/core/platform/threadpool.h index f21c424f10..e2047a4ba8 100644 --- a/include/onnxruntime/core/platform/threadpool.h +++ b/include/onnxruntime/core/platform/threadpool.h @@ -52,67 +52,6 @@ class LoopCounter; class ThreadPool { public: - // Scheduling strategies for ParallelFor. The strategy governs how the given - // units of work are distributed among the available threads in the - // threadpool. - enum class SchedulingStrategy { - // The Adaptive scheduling strategy adaptively chooses the shard sizes based - // on the cost of each unit of work, and the cost model of the underlying - // threadpool device. - // - // The 'cost_per_unit' is an estimate of the number of CPU cycles (or - // nanoseconds if not CPU-bound) to complete a unit of work. Overestimating - // creates too many shards and CPU time will be dominated by per-shard - // overhead, such as Context creation. Underestimating may not fully make - // use of the specified parallelism, and may also cause inefficiencies due - // to load balancing issues and stragglers. - kAdaptive, - // The Fixed Block Size scheduling strategy shards the given units of work - // into shards of fixed size. In case the total number of units is not - // evenly divisible by 'block_size', at most one of the shards may be of - // smaller size. The exact number of shards may be found by a call to - // NumShardsUsedByFixedBlockSizeScheduling. - // - // Each shard may be executed on a different thread in parallel, depending - // on the number of threads available in the pool. Note that when there - // aren't enough threads in the pool to achieve full parallelism, function - // calls will be automatically queued. - kFixedBlockSize - }; - - // Contains additional parameters for either the Adaptive or the Fixed Block - // Size scheduling strategy. - class SchedulingParams { - public: - explicit SchedulingParams(SchedulingStrategy strategy, optional cost_per_unit, - optional block_size) - : strategy_(strategy), cost_per_unit_(cost_per_unit), block_size_(block_size) { - } - - SchedulingStrategy strategy() const { - return strategy_; - } - optional cost_per_unit() const { - return cost_per_unit_; - } - optional block_size() const { - return block_size_; - } - - private: - // The underlying Scheduling Strategy for which this instance contains - // additional parameters. - SchedulingStrategy strategy_; - - // The estimated cost per unit of work in number of CPU cycles (or - // nanoseconds if not CPU-bound). Only applicable for Adaptive scheduling - // strategy. - optional cost_per_unit_; - - // The block size of each shard. Only applicable for Fixed Block Size - // scheduling strategy. - optional block_size_; - }; #ifdef _WIN32 using NAME_CHAR_TYPE = wchar_t; #else @@ -192,34 +131,6 @@ class ThreadPool { #endif } - // Similar to ParallelFor above, but takes the specified scheduling strategy - // into account. - void ParallelFor(std::ptrdiff_t total, const SchedulingParams& scheduling_params, - const std::function& fn); - - static void TryParallelFor(concurrency::ThreadPool* tp, std::ptrdiff_t total, - const SchedulingParams& scheduling_params, - const std::function& fn) { -#ifdef _OPENMP - ORT_UNUSED_PARAMETER(scheduling_params); - std::ptrdiff_t num_threads = concurrency::ThreadPool::DegreeOfParallelism(tp); - if (total < num_threads) { - num_threads = total; - } -#pragma omp parallel for - for (std::ptrdiff_t i = 0; i < num_threads; i++) { - auto work = PartitionWork(i, num_threads, total); - fn(work.start, work.end); - } -#else - if (tp == nullptr) { - fn(0, total); - return; - } - tp->ParallelFor(total, scheduling_params, fn); -#endif - } - // Return the degree of parallelism that code should assume when using the thread pool. // This API takes into account if OpenMP is enabled/disabled, and if the thread pool ptr is // nullptr. It decouples the degree of parallelism for use with the thread pool from diff --git a/onnxruntime/core/common/threadpool.cc b/onnxruntime/core/common/threadpool.cc index 374a882dc7..8819bd7e58 100644 --- a/onnxruntime/core/common/threadpool.cc +++ b/onnxruntime/core/common/threadpool.cc @@ -242,24 +242,6 @@ int ThreadPool::NumShardsUsedByFixedBlockSizeScheduling(const std::ptrdiff_t tot } } -void ThreadPool::ParallelFor(std::ptrdiff_t total, const SchedulingParams& scheduling_params, - const std::function& fn) { - switch (scheduling_params.strategy()) { - case SchedulingStrategy::kAdaptive: { - if (scheduling_params.cost_per_unit().has_value()) { - ParallelFor(total, static_cast(scheduling_params.cost_per_unit().value()), fn); - } - break; - } - case SchedulingStrategy::kFixedBlockSize: { - if (scheduling_params.block_size().has_value()) { - ParallelForFixedBlockSizeScheduling(total, scheduling_params.block_size().value(), fn); - } - break; - } - } -} - using CostModel = Eigen::TensorCostModel; // Calculates block size based on (1) the iteration cost and (2) parallel diff --git a/onnxruntime/test/onnx/microbenchmark/common.h b/onnxruntime/test/onnx/microbenchmark/common.h index 9609f303a7..5850102258 100644 --- a/onnxruntime/test/onnx/microbenchmark/common.h +++ b/onnxruntime/test/onnx/microbenchmark/common.h @@ -6,6 +6,8 @@ #include #include +#include "core/common/common.h" + // aligned memory allocate and free functions inline void* aligned_alloc(size_t size, size_t align) { void* ptr; diff --git a/onnxruntime/test/onnx/microbenchmark/gelu.cc b/onnxruntime/test/onnx/microbenchmark/gelu.cc index 22187493f4..23f9af6e9a 100644 --- a/onnxruntime/test/onnx/microbenchmark/gelu.cc +++ b/onnxruntime/test/onnx/microbenchmark/gelu.cc @@ -292,17 +292,26 @@ static void BM_GeluBatchParallelFor3(benchmark::State& state) { tpo.auto_set_affinity = true; std::unique_ptr tp( concurrency::CreateThreadPool(&onnxruntime::Env::Default(), tpo, concurrency::ThreadPoolType::INTRA_OP)); - concurrency::ThreadPool::SchedulingParams p(concurrency::ThreadPool::SchedulingStrategy::kFixedBlockSize, optional(), 4096); + + // Divide work into chunks of 4096 iterations + const int64_t length_per_task = 4096; + const int64_t task_count = (batch_size + length_per_task - 1) / length_per_task; + for (auto _ : state) { - tp->ParallelFor(batch_size, p, [data, output](ptrdiff_t first, ptrdiff_t last) { - ptrdiff_t len = last - first; - float* output_ptr = output + first; - onnxruntime::ConstEigenVectorArrayMap xm(data + first, len); - onnxruntime::EigenVectorArrayMap ym(output_ptr, len); - ym = xm * static_cast(M_SQRT1_2); - MlasComputeErf(output_ptr, output_ptr, len); - ym = xm * 0.5f * (ym + 1.0f); - }); + concurrency::ThreadPool::TryBatchParallelFor( + tp.get(), + static_cast(task_count), + [batch_size, data, length_per_task, output](ptrdiff_t task_idx) { + const auto first = task_idx * length_per_task; + const ptrdiff_t len = std::min(length_per_task, static_cast(batch_size - first)); + float* output_ptr = output + first; + onnxruntime::ConstEigenVectorArrayMap xm(data + first, len); + onnxruntime::EigenVectorArrayMap ym(output_ptr, len); + ym = xm * static_cast(M_SQRT1_2); + MlasComputeErf(output_ptr, output_ptr, len); + ym = xm * 0.5f * (ym + 1.0f); + }, + 0); } aligned_free(data); aligned_free(output);