mirror of
https://github.com/saymrwulf/onnxruntime.git
synced 2026-05-29 23:06:41 +00:00
Remove SchedulingParams variants of ThreadPool::TryParallelFor (#5050)
This commit is contained in:
parent
fde7a2c848
commit
bbb9d92a5f
4 changed files with 21 additions and 117 deletions
|
|
@ -52,67 +52,6 @@ class LoopCounter;
|
|||
|
||||
class ThreadPool {
|
||||
public:
|
||||
// Scheduling strategies for ParallelFor. The strategy governs how the given
|
||||
// units of work are distributed among the available threads in the
|
||||
// threadpool.
|
||||
enum class SchedulingStrategy {
|
||||
// The Adaptive scheduling strategy adaptively chooses the shard sizes based
|
||||
// on the cost of each unit of work, and the cost model of the underlying
|
||||
// threadpool device.
|
||||
//
|
||||
// The 'cost_per_unit' is an estimate of the number of CPU cycles (or
|
||||
// nanoseconds if not CPU-bound) to complete a unit of work. Overestimating
|
||||
// creates too many shards and CPU time will be dominated by per-shard
|
||||
// overhead, such as Context creation. Underestimating may not fully make
|
||||
// use of the specified parallelism, and may also cause inefficiencies due
|
||||
// to load balancing issues and stragglers.
|
||||
kAdaptive,
|
||||
// The Fixed Block Size scheduling strategy shards the given units of work
|
||||
// into shards of fixed size. In case the total number of units is not
|
||||
// evenly divisible by 'block_size', at most one of the shards may be of
|
||||
// smaller size. The exact number of shards may be found by a call to
|
||||
// NumShardsUsedByFixedBlockSizeScheduling.
|
||||
//
|
||||
// Each shard may be executed on a different thread in parallel, depending
|
||||
// on the number of threads available in the pool. Note that when there
|
||||
// aren't enough threads in the pool to achieve full parallelism, function
|
||||
// calls will be automatically queued.
|
||||
kFixedBlockSize
|
||||
};
|
||||
|
||||
// Contains additional parameters for either the Adaptive or the Fixed Block
|
||||
// Size scheduling strategy.
|
||||
class SchedulingParams {
|
||||
public:
|
||||
explicit SchedulingParams(SchedulingStrategy strategy, optional<int64_t> cost_per_unit,
|
||||
optional<std::ptrdiff_t> block_size)
|
||||
: strategy_(strategy), cost_per_unit_(cost_per_unit), block_size_(block_size) {
|
||||
}
|
||||
|
||||
SchedulingStrategy strategy() const {
|
||||
return strategy_;
|
||||
}
|
||||
optional<int64_t> cost_per_unit() const {
|
||||
return cost_per_unit_;
|
||||
}
|
||||
optional<std::ptrdiff_t> block_size() const {
|
||||
return block_size_;
|
||||
}
|
||||
|
||||
private:
|
||||
// The underlying Scheduling Strategy for which this instance contains
|
||||
// additional parameters.
|
||||
SchedulingStrategy strategy_;
|
||||
|
||||
// The estimated cost per unit of work in number of CPU cycles (or
|
||||
// nanoseconds if not CPU-bound). Only applicable for Adaptive scheduling
|
||||
// strategy.
|
||||
optional<int64_t> cost_per_unit_;
|
||||
|
||||
// The block size of each shard. Only applicable for Fixed Block Size
|
||||
// scheduling strategy.
|
||||
optional<std::ptrdiff_t> block_size_;
|
||||
};
|
||||
#ifdef _WIN32
|
||||
using NAME_CHAR_TYPE = wchar_t;
|
||||
#else
|
||||
|
|
@ -192,34 +131,6 @@ class ThreadPool {
|
|||
#endif
|
||||
}
|
||||
|
||||
// Similar to ParallelFor above, but takes the specified scheduling strategy
|
||||
// into account.
|
||||
void ParallelFor(std::ptrdiff_t total, const SchedulingParams& scheduling_params,
|
||||
const std::function<void(std::ptrdiff_t, std::ptrdiff_t)>& fn);
|
||||
|
||||
static void TryParallelFor(concurrency::ThreadPool* tp, std::ptrdiff_t total,
|
||||
const SchedulingParams& scheduling_params,
|
||||
const std::function<void(std::ptrdiff_t first, std::ptrdiff_t last)>& fn) {
|
||||
#ifdef _OPENMP
|
||||
ORT_UNUSED_PARAMETER(scheduling_params);
|
||||
std::ptrdiff_t num_threads = concurrency::ThreadPool::DegreeOfParallelism(tp);
|
||||
if (total < num_threads) {
|
||||
num_threads = total;
|
||||
}
|
||||
#pragma omp parallel for
|
||||
for (std::ptrdiff_t i = 0; i < num_threads; i++) {
|
||||
auto work = PartitionWork(i, num_threads, total);
|
||||
fn(work.start, work.end);
|
||||
}
|
||||
#else
|
||||
if (tp == nullptr) {
|
||||
fn(0, total);
|
||||
return;
|
||||
}
|
||||
tp->ParallelFor(total, scheduling_params, fn);
|
||||
#endif
|
||||
}
|
||||
|
||||
// Return the degree of parallelism that code should assume when using the thread pool.
|
||||
// This API takes into account if OpenMP is enabled/disabled, and if the thread pool ptr is
|
||||
// nullptr. It decouples the degree of parallelism for use with the thread pool from
|
||||
|
|
|
|||
|
|
@ -242,24 +242,6 @@ int ThreadPool::NumShardsUsedByFixedBlockSizeScheduling(const std::ptrdiff_t tot
|
|||
}
|
||||
}
|
||||
|
||||
void ThreadPool::ParallelFor(std::ptrdiff_t total, const SchedulingParams& scheduling_params,
|
||||
const std::function<void(std::ptrdiff_t, std::ptrdiff_t)>& fn) {
|
||||
switch (scheduling_params.strategy()) {
|
||||
case SchedulingStrategy::kAdaptive: {
|
||||
if (scheduling_params.cost_per_unit().has_value()) {
|
||||
ParallelFor(total, static_cast<double>(scheduling_params.cost_per_unit().value()), fn);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case SchedulingStrategy::kFixedBlockSize: {
|
||||
if (scheduling_params.block_size().has_value()) {
|
||||
ParallelForFixedBlockSizeScheduling(total, scheduling_params.block_size().value(), fn);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
using CostModel = Eigen::TensorCostModel<Eigen::ThreadPoolDevice>;
|
||||
|
||||
// Calculates block size based on (1) the iteration cost and (2) parallel
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@
|
|||
#include <new>
|
||||
#include <random>
|
||||
|
||||
#include "core/common/common.h"
|
||||
|
||||
// aligned memory allocate and free functions
|
||||
inline void* aligned_alloc(size_t size, size_t align) {
|
||||
void* ptr;
|
||||
|
|
|
|||
|
|
@ -292,17 +292,26 @@ static void BM_GeluBatchParallelFor3(benchmark::State& state) {
|
|||
tpo.auto_set_affinity = true;
|
||||
std::unique_ptr<concurrency::ThreadPool> tp(
|
||||
concurrency::CreateThreadPool(&onnxruntime::Env::Default(), tpo, concurrency::ThreadPoolType::INTRA_OP));
|
||||
concurrency::ThreadPool::SchedulingParams p(concurrency::ThreadPool::SchedulingStrategy::kFixedBlockSize, optional<int64_t>(), 4096);
|
||||
|
||||
// Divide work into chunks of 4096 iterations
|
||||
const int64_t length_per_task = 4096;
|
||||
const int64_t task_count = (batch_size + length_per_task - 1) / length_per_task;
|
||||
|
||||
for (auto _ : state) {
|
||||
tp->ParallelFor(batch_size, p, [data, output](ptrdiff_t first, ptrdiff_t last) {
|
||||
ptrdiff_t len = last - first;
|
||||
float* output_ptr = output + first;
|
||||
onnxruntime::ConstEigenVectorArrayMap<float> xm(data + first, len);
|
||||
onnxruntime::EigenVectorArrayMap<float> ym(output_ptr, len);
|
||||
ym = xm * static_cast<float>(M_SQRT1_2);
|
||||
MlasComputeErf(output_ptr, output_ptr, len);
|
||||
ym = xm * 0.5f * (ym + 1.0f);
|
||||
});
|
||||
concurrency::ThreadPool::TryBatchParallelFor(
|
||||
tp.get(),
|
||||
static_cast<ptrdiff_t>(task_count),
|
||||
[batch_size, data, length_per_task, output](ptrdiff_t task_idx) {
|
||||
const auto first = task_idx * length_per_task;
|
||||
const ptrdiff_t len = std::min(length_per_task, static_cast<int64_t>(batch_size - first));
|
||||
float* output_ptr = output + first;
|
||||
onnxruntime::ConstEigenVectorArrayMap<float> xm(data + first, len);
|
||||
onnxruntime::EigenVectorArrayMap<float> ym(output_ptr, len);
|
||||
ym = xm * static_cast<float>(M_SQRT1_2);
|
||||
MlasComputeErf(output_ptr, output_ptr, len);
|
||||
ym = xm * 0.5f * (ym + 1.0f);
|
||||
},
|
||||
0);
|
||||
}
|
||||
aligned_free(data);
|
||||
aligned_free(output);
|
||||
|
|
|
|||
Loading…
Reference in a new issue