diff --git a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h index 9350eda587..3cf4dfdda1 100644 --- a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h +++ b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h @@ -81,7 +81,7 @@ // // This two-layer approach lets us separate out the // super-lightweight per-iteration-batch work from the more -// costly per-loop work of managing Task objects. +// costsly per-loop work of managing Task objects. // // - Tasks for running a parallel section. This is an extension of // the approach taken for parallel loops. However, the Tasks are @@ -99,42 +99,32 @@ // - The run queues follow the usual approach of having push/pop // operations on the front/back, and optimizing the PopFront case // for single-threaded use by the thread owning the run queue. -// Two points to note here are: // -// * We should experiment with simplifying these queues. In ORT, we -// use the CAS-based scheduling layer in threadpool.cc for the -// fine-grained allocation of individual loop iterations to worker -// threads. This means we do not have the form of recursive -// sub-division of work that motivates the original design. +// However, we support an additional Revoke operation to replace an +// item in the middle of a queue with a tombstone. This operation +// is used at the end of parallel loops and parallel sections to +// remove any tasks that were created but not yet executed. Once +// revoked, a thread can rely on the fact that the task will no +// longer execute. Revocation helps manage captured state in +// parallel loops: the alternatives would be (i) waiting for all +// tasks that captured state to reach the head of their queues and +// execute, or (ii) use heap-allocated state in tasks, and use a +// technique such as reference counting to de-allocate it. // -// * We support an additional Revoke operation to replace an item in -// the middle of a queue with a tombstone. This operation is used -// at the end of parallel loops and parallel sections to remove -// any tasks that were created but not yet executed. Once -// revoked, a thread can rely on the fact that the task will no -// longer execute. Revocation helps manage captured state in -// parallel loops: the alternatives would be (i) waiting for all -// tasks that captured state to reach the head of their queues and -// execute, or (ii) use heap-allocated state in tasks, and use a -// technique such as reference counting to de-allocate it. +// To support revoation, each thread has a unique "Tag" to identify +// the items that it adds to the work queues. A thread can revoke +// an item only if it has the thread's own tag. // -// To support revocation, each thread has a unique "Tag" to -// identify the items that it adds to the work queues. A thread -// can revoke an item only if it has the thread's own tag. +// - The worker threads maintain a best-effort bitmap in +// good_worker_hints_ of which threads to push work to. A thread +// controls its status via SetGoodWorkerHint. A thread is a "good" +// worker when it is actively spinning for work, meaning both that +// it is not blocked in the OS, and that it is not busy with work +// already. // -// - When entering a parallel loop (or parallel section), a thread -// maintains a set of "preferred" worker hints, and initially -// submits tasks to these workers. -// When a task executes, it updates the submitting thread's -// preferred workers to reflect the worker that the task ran on. -// Hence, if a task is submitted to thread T1's queue, and then -// stolen by T2 for execution, then T2 will become preferred. -// -// This "stickiness" aims to retain locality between successive -// loops submitted by the same thread, to maintain the same set of -// active threads over time (when the entire pool is not needed), -// and to allow concurrent requests to submit works to their own -// respective sets of preferred workers. +// This heuristic aims to avoid waking additional sleeping threads +// where possible, and in a series of parallel loops or parallel +// sections to push the work to the same set of threads each time. namespace onnxruntime { namespace concurrency { @@ -148,17 +138,6 @@ using CHAR_TYPE = char; class ThreadPoolParallelSection; class ThreadPoolLoop; -enum class StealAttemptKind { - TRY_ONE, - TRY_ALL, -}; - -enum class PushResult { - REJECTED, - ACCEPTED_IDLE, - ACCEPTED_BUSY -}; - // Align to avoid false sharing with prior fields. If required, // alignment or padding must be added subsequently to avoid false // sharing with later fields. Note that: @@ -328,21 +307,14 @@ class ThreadPoolParallelSection { // maximum degree of parallelism that the section will support. std::vector> tasks; - // Number of tasks revoked (i.e., removed from the queues prior to - // execution). We count this at various points, and omit waiting - // for them at the end of a loop. - unsigned tasks_revoked; - - // Current degree of parallelism, including work in the main thread - // and in the dispatcher. - unsigned current_dop; - // State shared between the main thread and worker threads // ------------------------------------------------------- // Flag to signal termination of the parallel section std::atomic active{false}; + std::atomic worker_idx{0}; + // Count of the number of tasks that completed normally. Other // tasks may be running currently, or may be present in work queues, // or may have been removed from the queues by @@ -366,9 +338,10 @@ class ThreadPoolParallelSection { // Members to track asynchronous dispatching int dispatch_q_idx = -1; // index of thread that dispatch work to all other threads unsigned dispatch_w_idx = 0; // index of enqueued work - std::atomic dispatch_started{false}; std::atomic dispatch_done{false}; std::atomic work_done{false}; + std::vector good_hints; + std::vector alt_hints; }; class ThreadPoolLoop { @@ -398,6 +371,22 @@ class RunQueue { assert(Size() == 0); } + // PushFront inserts w at the beginning of the queue. + // If queue is full returns w, otherwise returns default-constructed Work. + Work PushFront(Work w) { + unsigned front = front_.load(std::memory_order_relaxed); + Elem& e = array_[front & kMask]; + ElemState s = e.state.load(std::memory_order_relaxed); + if (s != ElemState::kEmpty || + !e.state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire)) + return w; + front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed); + e.w = std::move(w); + e.tag = Tag(); + e.state.store(ElemState::kReady, std::memory_order_release); + return Work(); + } + // PopFront removes and returns the first element in the queue. // If the queue was empty returns default-constructed Work. Work PopFront() { @@ -454,7 +443,9 @@ class RunQueue { // subsequent call to RevokeWithTag to remove the item from the queue in combination // with w_idx. Typically the tag will be a per-thread ID to distinguish work // submitted from different threads. - PushResult PushBackWithTag(Work w, Tag tag, unsigned &w_idx) { + // + // If the queue is full, returns w, otherwise returns default-constructed work. + Work PushBackWithTag(Work w, Tag tag, unsigned &w_idx) { std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); w_idx = (back-1) & kMask; @@ -462,14 +453,13 @@ class RunQueue { ElemState s = e.state.load(std::memory_order_relaxed); if (s != ElemState::kEmpty || !e.state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire)) - return PushResult::REJECTED; /* Not enqueued */ - bool was_ready = (((back^(front_.load(std::memory_order_relaxed)))&kMask) == 0); + return w; back = ((back - 1) & kMask2) | (back & ~kMask2); back_.store(back, std::memory_order_relaxed); e.w = std::move(w); e.tag = tag; e.state.store(ElemState::kReady, std::memory_order_release); - return was_ready ? PushResult::ACCEPTED_IDLE : PushResult::ACCEPTED_BUSY; /* Enqueued */ + return Work(); } // PopBack removes and returns the last elements in the queue. @@ -566,6 +556,13 @@ class RunQueue { return SizeOrNotEmpty() == 0; } + // Delete all the elements from the queue. + void Flush() { + while (!Empty()) { + PopFront(); + } + } + private: static const unsigned kMask = kSize - 1; static const unsigned kMask2 = (kSize << 1) - 1; @@ -733,7 +730,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter // indices as (t + coprime) % num_threads, we will cover all threads without // repetitions (effectively getting a presudo-random permutation of thread // indices). - for (auto i = 1u; i <= num_threads_; ++i) { + for (int i = 1; i <= num_threads_; ++i) { all_coprimes_.emplace_back(i); ComputeCoprimes(i, &all_coprimes_.back()); } @@ -745,7 +742,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter good_worker_hints_ = std::make_unique[]>(num_hint_words_); worker_data_.resize(num_threads_); - for (auto i = 0u; i < num_threads_; i++) { + for (int i = 0; i < num_threads_; i++) { worker_data_[i].thread.reset(env_.CreateThread(name, i, WorkerLoop, this, thread_options)); } } @@ -769,19 +766,88 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter void Schedule(std::function fn) override { PerThread* pt = GetPerThread(); - int q_idx = Rand(&pt->rand) % num_threads_; - WorkerData &td = worker_data_[q_idx]; - Queue& q = td.queue; - fn = q.PushBack(std::move(fn)); - if (!fn) { - // The queue accepted the work; ensure that the thread will pick it up - td.EnsureAwake(); + if (pt->pool == this) { + // Worker thread of this pool, push onto the thread's queue. + Queue& q = worker_data_[pt->thread_id].queue; + fn = q.PushFront(std::move(fn)); } else { - // Run the work directly if the queue rejected the work - fn(); + // A free-standing thread (or worker of another pool), push onto a random + // queue. + int q_idx = Rand(&pt->rand) % num_threads_; + WorkerData &td = worker_data_[q_idx]; + Queue& q = td.queue; + fn = q.PushBack(std::move(fn)); + if (!fn) { + // The queue accepted the work; ensure that the thread will pick it up + td.EnsureAwake(); + } } + + // Run the work directly if the queue rejected the work + if (fn) fn(); } +// The thread pool maintains a set of hints for which threads will be good to distribute +// work to. A thread is considered "good" if it is actively spinning, meaning both that +// it is not busy with existing work, and that it should respond quickly to the addition +// of new work. + +void SetGoodWorkerHint(int idx, bool is_good) { + assert(idx >= 0 && idx < num_threads_); + std::atomic& u64 = good_worker_hints_[idx / bits_per_hint_word_]; + uint64_t bit = 1ull << (idx % bits_per_hint_word_); + uint64_t saw, want; + do { + saw = u64.load(); + want = is_good ? (saw|bit) : (saw&~bit); + } while (!u64.compare_exchange_weak(saw, want)); +} + +// Retrieve hints for up to n threads to distribute work to. Threads in good_hints +// pass a best-effort check to identify spinning threads via the good_worker_hints_ +// bitmap. Threads in alt_hint do not pass that test, but are distinct from those in +// good_hints, letting the caller avoid distributing more than one work item to +// any individual thread. + +void GetGoodWorkerHints(unsigned n, std::vector& good_hints, std::vector& alt_hints) { + PerThread* pt = GetPerThread(); + unsigned need_alt = n; + good_hints.clear(); + alt_hints.clear(); + + // Iterate through the words of hints, starting from a pseudo-randomly chosen + // base. This aims to distribute work across large machines in cases we + // have multiple threads scheduling work concurrently. + + unsigned base = Rand(&pt->rand) % num_hint_words_; + for (unsigned i = 0u; n && (i < num_hint_words_); i++) { + int u64_idx = (base + i) % num_hint_words_; + std::atomic* u64 = &good_worker_hints_[u64_idx]; + uint64_t saw = u64->load(); + uint64_t want = saw; + + // Pick up to n bits that are set in the current word + for (unsigned j = 0u; n && (j < bits_per_hint_word_); j++) { + uint64_t bit = 1ull << j; + int thread = u64_idx * bits_per_hint_word_ + j; + if (saw & bit) { + good_hints.push_back(thread); + want &= ~bit; + n--; + } else if (need_alt && thread < num_threads_) { + alt_hints.push_back(thread); + need_alt--; + } + } + + // Best-effort attempt to remove the hints. We should measure the impact of + // contention here, but the intuition is that if we conflict on the CAS then the + // machine is likely to be busy in any case, and we will have queuing on the + // work items. + u64->compare_exchange_strong(saw, want); + } +} + //...................................................................... // // Parallel sections @@ -815,12 +881,6 @@ void StartParallelSectionInternal(PerThread &pt, if (!pt.tag.Get()) { pt.tag = Tag::GetNext(); } - ps.dispatch_q_idx = -1; - ps.dispatch_started = false; - ps.dispatch_done = false; - ps.work_done = false; - ps.tasks_revoked = 0; - ps.current_dop = 1; ps.active = true; } @@ -841,73 +901,48 @@ void EndParallelSectionInternal(PerThread &pt, // Notify workers to exit from the section ps.active = false; - // First, attempt to revoke the dispatch task. If we succeed then - // we know we revoked _something_ pushed for the current loop. That - // may be the dispatch task itself, or it may be a task pushed by - // the dispatch task. Those cases are distinguished by whether or - // not the dispatch task itself has started -- if it has not started - // then it cannot have pushed tasks. - if (ps.dispatch_q_idx != -1) { + if (ps.dispatch_q_idx > -1) { Queue& q = worker_data_[ps.dispatch_q_idx].queue; if (q.RevokeWithTag(pt.tag, ps.dispatch_w_idx)) { - if (!ps.dispatch_started.load(std::memory_order_acquire)) { - // We successfully revoked a task, and saw the dispatch task - // not started. Hence we know we revoked the dispatch task. - // This should be the common case. - ps.dispatch_q_idx = -1; - } else { - // We successfully revoked a task, but saw the dispatch task - // had started. Hence we know we revoked one of the _new_ - // tasks created by the dispatcher (not the dispatcher - // itself). This should be the rare case, but can occur if - // one of the tasks created by the dispatcher occupies the - // exact same slot in a work queue that the dispatcher used. - ps.tasks_revoked ++; + ps.dispatch_q_idx = -1; // cancel dispatch if not started yet + } else { + // if dispatch task started, wait for its dispatch completion + while (!ps.dispatch_done.load(std::memory_order_acquire)) { + onnxruntime::concurrency::SpinPause(); } } } - // Second, if we failed to revoke the dispatch task, wait for it to - // finish dispatch work. This avoids new tasks being started - // concurrently with us attempting to end the parallel section. - if (ps.dispatch_q_idx != -1) { - while (!ps.dispatch_done.load(std::memory_order_acquire)) { - onnxruntime::concurrency::SpinPause(); - } - } - - // Now we know that dispatch is finshed, we synchronize with the - // tasks that were created (if any) for the parallel section. We - // revoke tasks still in queues, and then wait for any that are - // still running. profiler_.LogStart(); + // Attempt to revoke any tasks that were sent to workers but not + // started. unsigned tasks_started = static_cast(ps.tasks.size()); + unsigned tasks_revoked = 0; while (!ps.tasks.empty()) { const auto& item = ps.tasks.back(); Queue& q = worker_data_[item.first].queue; if (q.RevokeWithTag(pt.tag, item.second)) { - ps.tasks_revoked++; + tasks_revoked++; } ps.tasks.pop_back(); } profiler_.LogEnd(ThreadPoolProfiler::WAIT_REVOKE); - // Wait for the dispatch task's own work... + // Wait for workers to exit ParLoopWorker + auto tasks_to_wait_for = tasks_started - tasks_revoked; + while (ps.tasks_finished < tasks_to_wait_for) { + onnxruntime::concurrency::SpinPause(); + } + // Clear status to allow the ThreadPoolParallelSection to be + // re-used. + ps.tasks_finished = 0; + if (ps.dispatch_q_idx > -1) { + // if dispatch task started, wait for its work completion while (!ps.work_done.load(std::memory_order_acquire)) { onnxruntime::concurrency::SpinPause(); } } - - // ...and wait for any other tasks not revoked to finish their work - auto tasks_to_wait_for = tasks_started - ps.tasks_revoked; - while (ps.tasks_finished < tasks_to_wait_for) { - onnxruntime::concurrency::SpinPause(); - } - - // Clear status to allow the ThreadPoolParallelSection to be - // re-used. - ps.tasks_finished = 0; } void EndParallelSection(ThreadPoolParallelSection &ps) override { @@ -915,279 +950,75 @@ void EndParallelSection(ThreadPoolParallelSection &ps) override { EndParallelSectionInternal(*pt, ps); } -//---------------------------------------------------------------------- -// -// Preferred workers -// ----------------- -// -// Initialize the set of hints for preferred worker threads we will -// use. We do this once, covering the maximum num_threads_ items, -// in order to avoid resizing preferred_workers concurrent with -// access from worker threads. -// -// For simplicity we initialize with hints round-robin among the -// workers. For simple workloads with 1 main thread this means we -// will distribute work across the pool of workers. For workers -// with multiple main threads it attempts to balance the load. -// -// These hints are just used as a starting point, and are updated by -// the worker thread that actually claims an item (e.g., if an item -// initially assigned to thread T1 is stolen and executed by T2, -// then T2 is assigned at the new preferred worker). -// -// Note that the hints are held in the _main_ thread that submits -// work to the pool. We assume that a thread is primarily -// submitting work to just one pool, but allow for the pool to -// change over time. Hence we allow the hints vector to grow over -// time. -// -// A note on terminology used in the variable names here: -// -// dop - degree of parallelism, as seen by the user. For instance -// dop=4 means 4 threads in total: 1 main thread that enters the -// loop, plus 1 dispatcher thread, plus 2 additional worker -// threads. -// -// par_idx - a thread's index within the loop, in the range [0,dop). -// -// num_threads_ - the number of worker threads in the thread pool. A -// loop with dop=4 will be common on a pool with 3 threads -// (given that the main thread will also participate). -// -// q_idx - a worker queue index, in the range [0,num_threads_). -// -// preferred_workers - this maps from par_idx values to q_idx. Hence, -// with dop=4 the vector will have length 4, and will identify -// which of the workers (0,1,2) should run tasks for the loop. -// Note that mapping from par_idx values means that only slots -// [1,dop) are actually used in preferred_workers. -// -// Here are three examples, all assuming a machine with 4 h/w threads, -// and ORT configured to use dop=4. -// -// * First, suppose that a single job is running a series of loops. -// Its main thread enters a parallel loop. Initially, let's assume -// its preferred worker array is [_,0,1,2], writing "_" for the -// unusued element for the par_idx=0 work that the main thread will -// run. -// -// The main thread schedules the dispatcher task onto worker 0. -// -// The dispatcher task schedules worker tasks onto workers 1 and 2. -// -// The tasks all execute, without any work stealing, on the threads -// they were scheduled on. The preferred worker array remains -// [_,0,1,2]. -// -// * Next, assume we have the same job, and for whatever reason the -// preferred workers were initially [_,0,0,0]. -// -// The main thread schedules the dispatcher onto worker 0. -// -// This dispatcher task runs on worker 0, and pushes the worker -// tasks back onto worker 0's queue. -// -// Workers 1 and 2 are idle, and steal tasks from worker 0. As the -// tasks run, they update the preferred_workers array to record the -// workers that execute them. -// -// After the loop, the preferred worker array may now be [_,0,2,1] -// or [_,0,1,2], reflecting the fact that the work has got -// re-distributed. The next loop will start out by distributing the -// work to those same workers. -// -// * Finally, let's assume we have two jobs running on two main -// threads, and we are now using DoP=2 in the loops, and have 2 -// workers in the thread pool (so the machine is not -// over-subscribed). -// -// Each main thread has its own preferred_workers, and -// let's say initially these are both [_,0]. -// -// Here, with DoP=2, each main thread will just dispatch a single -// task immediately (there is no need for asynchrony with only one -// task to generate). -// -// Initially both main threads will submit these tasks to worker 0. -// -// Once worker 1 steals one of these tasks, the task will update its -// preferred worker to be 1. -// -// From that point onwards, the two main threads will dispatch tasks -// to separate workers, avoiding the need for further work stealing. - -void InitializePreferredWorkers(std::vector &preferred_workers) { - static std::atomic next_worker; - // preferred_workers maps from a par_idx to a q_idx, hence we - // initialize slots in the range [0,num_threads_] - while (preferred_workers.size() <= num_threads_) { - preferred_workers.push_back(next_worker++ % num_threads_); - } -} - -// Update the preferred worker for par_idx to be the calling thread - -void UpdatePreferredWorker(std::vector &preferred_workers, - unsigned par_idx) { - unsigned ran_on_idx = GetPerThread()->thread_id; - assert(ran_on_idx >= 0 && ran_on_idx < num_threads_); - assert(par_idx < preferred_workers.size()); - preferred_workers[par_idx] = ran_on_idx; -} - -// Schedule [par_idx_start,par_idx_end) across the preferred workers - -void ScheduleOnPreferredWorkers(PerThread& pt, - ThreadPoolParallelSection& ps, - std::vector &preferred_workers, - unsigned par_idx_start, - unsigned par_idx_end, - std::function worker_fn) { - for (auto par_idx = par_idx_start; par_idx < par_idx_end; ++par_idx) { - // Look up hint for par_idx. Note that the hints may have been - // recorded from a prior thread pool with a different number of - // threads, hence we must cap at num_threads_. - assert(par_idx < preferred_workers.size()); - unsigned q_idx = preferred_workers[par_idx] % num_threads_; - assert(q_idx < num_threads_); - WorkerData& td = worker_data_[q_idx]; - Queue& q = td.queue; - unsigned w_idx; - - // Attempt to enqueue the task - auto push_status = q.PushBackWithTag([worker_fn, par_idx, &preferred_workers, &ps, this]() { - // Record the worker thread that actually runs this task. - // This will form the preferred worker for the next loop. - UpdatePreferredWorker(preferred_workers, par_idx); - worker_fn(par_idx); - ps.tasks_finished++; - }, - pt.tag, - w_idx); - - // Queue accepted the task; wake the thread that owns the queue. - // In addition, if the queue was non-empty, attempt to wake - // another thread (which may then steal the task). - if (push_status == PushResult::ACCEPTED_IDLE || push_status == PushResult::ACCEPTED_BUSY) { - ps.tasks.push_back({q_idx, w_idx}); - td.EnsureAwake(); - if (push_status == PushResult::ACCEPTED_BUSY) { - worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake(); - } - } - } -} - -//...................................................................... -// -// Parallel loops -// -------------- -// -// Ensure that the ThreadPoolParallelSection has sufficient workers to -// execute a loop with degree of parallelism n. We track the number -// of workers already avaiable to the parallel section, prior to -// submitting tasks to the work queues to make up the total. -// -// Each worker will call in to worker_fn(idx) with a per-worker thread -// ID. Note there are different levels of indirection here: -// -// - In a single-loop parallel section, worker_fn will directly -// execute the threadpool.cc code that implements the parallel loop. -// -// - In a multi-loop parallel section, worker_fn is an intermediate -// function that is long-lived (i.e., that lasts until the end of -// the parallel section, as opposed to just a single loop's -// duration). -// -// For ordinary parallel sections, RunInParallelInternal dispatch -// tasks to a number of workers asynchronously. A worker thread will -// be selected as the dispatcher that distributes tasks. This removes -// the O(n) work off the critical path of starting the first loop -// iteration, helping maintain good performance on very short loops. -// -// See the note on terminology above for the use of variable names -// here. - +// RunInParallelInternal dispatch tasks to a number of workers asynchronously. +// A worker thread will be selected as the dispatcher that distributes tasks. void RunInParallelInternal(PerThread& pt, ThreadPoolParallelSection& ps, - unsigned new_dop, - bool dispatch_async, + unsigned n, std::function worker_fn) { - - // Ensure that the vector of preferred workers is sufficient for the - // size of the loop we are entering. We do this before dispatching - // tasks for the loop in order to avoid any races between changes to - // the size of the vector and recording the locations that tasks run - // in as they complete. - assert(new_dop <= (unsigned)(num_threads_+1)); - std::vector &preferred_workers = pt.preferred_workers; - InitializePreferredWorkers(preferred_workers); - - // current_dop is the degree of parallelism via any workers already - // participating in the current parallel section. Usually, for - // single-loop parallel sections, current_dop=1. - unsigned current_dop = ps.current_dop; - - if (current_dop < new_dop) { - unsigned extra_needed = new_dop - current_dop; - - // Attempt to summon additional workers asynchronously if we - // need more than one. Otherwise, we fall back to simple - // synchronous scheduling. - if (dispatch_async && extra_needed > 1) { - assert(current_dop == 1); - - // Task for dispatching work asynchronously. - Task dispatch_task = [current_dop, new_dop, worker_fn, &preferred_workers, &ps, &pt, this]() { - // Record that dispatch work has started. This must occur - // prior to scheduling tasks, in order to synchronize with - // EndParallelSectionInternal. [ If EndParallelSection - // revoked a task, and then sees distpatch_started=false, then - // it knows that it revoked the dispatcher. Conversely, if it - // revokes a task, and then sees dispatch_started=true, then - // it knows it revoked a worker task. ] - ps.dispatch_started.store(true, std::memory_order_seq_cst); - - // Schedule tasks par_idx=[current_dop+1,new_dop) - ScheduleOnPreferredWorkers(pt, ps, preferred_workers, current_dop+1, new_dop, worker_fn); - ps.dispatch_done.store(true, std::memory_order_release); - - // Record the worker thread that actually runs this task. - // This will form the preferred worker for the next loop. - UpdatePreferredWorker(preferred_workers, current_dop); - - // Run dispatcher task's own work, par_idx=current_dop - worker_fn(current_dop); - - // Dispatcher's work complete - ps.work_done.store(true, std::memory_order_release); - }; - - profiler_.LogStart(); - ps.dispatch_q_idx = preferred_workers[current_dop] % num_threads_; - WorkerData& dispatch_td = worker_data_[ps.dispatch_q_idx]; - Queue& dispatch_que = dispatch_td.queue; - - // assign dispatch task to selected dispatcher - auto push_status = dispatch_que.PushBackWithTag(dispatch_task, pt.tag, ps.dispatch_w_idx); - // Queue accepted the task; wake the thread that owns the queue. - // In addition, if the queue was non-empty, attempt to wake - // another thread (which may then steal the task). - if (push_status == PushResult::ACCEPTED_IDLE || push_status == PushResult::ACCEPTED_BUSY) { - dispatch_td.EnsureAwake(); - if (push_status == PushResult::ACCEPTED_BUSY) { - worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake(); - } - } else { - ps.dispatch_q_idx = -1; // failed to enqueue dispatch_task - } - profiler_.LogEnd(ThreadPoolProfiler::DISTRIBUTION_ENQUEUE); - } else { - // Synchronous dispatch - ScheduleOnPreferredWorkers(pt, ps, preferred_workers, current_dop, new_dop, std::move(worker_fn)); - } - ps.current_dop = new_dop; + // init a few env variables + ps.dispatch_q_idx = -1; + ps.dispatch_done = false; + ps.work_done = false; + // calculate how many workers to summon + unsigned current_dop = static_cast(ps.tasks.size()) + 1; + unsigned extra_needed = n > current_dop ? n - current_dop : 0; + if (0 == extra_needed) { + return; // do nothing if no more workers required } + // get workers that are available + GetGoodWorkerHints(extra_needed, ps.good_hints, ps.alt_hints); + // define task for each worker + Task call_worker_fn = [worker_fn, &ps]() { + worker_fn(++ps.worker_idx); + ps.tasks_finished++; + }; + // define dispatch task for dispatcher + Task dispatch_task = [extra_needed, call_worker_fn, worker_fn, &ps, &pt, this]() { + unsigned good_hints_size = static_cast(ps.good_hints.size()); + for (auto i = 1u; i < extra_needed; ++i) { // dispatcher do dispatch + int q_idx; + if (i < ps.good_hints.size()) { + q_idx = ps.good_hints[i]; + } else { + auto alt_i = i - good_hints_size; + if (alt_i < ps.alt_hints.size()) { + q_idx = ps.alt_hints[alt_i]; + } else { + q_idx = Rand(&pt.rand) % num_threads_; + } + } + WorkerData& td = worker_data_[q_idx]; + Queue& q = td.queue; + unsigned w_idx; + Task t = q.PushBackWithTag(call_worker_fn, pt.tag, w_idx); + if (!t) { + td.EnsureAwake(); + ps.tasks.push_back({q_idx, w_idx}); + } + } + ps.dispatch_done.store(true, std::memory_order_release); // dispatch complete + worker_fn(++ps.worker_idx); // dispatcher also needs to do its part + ps.work_done.store(true, std::memory_order_release); // work complete + }; // dispatch_task + profiler_.LogStart(); + if (!ps.good_hints.empty()) { // get dispatcher + ps.dispatch_q_idx = ps.good_hints[0]; + } else if (!ps.alt_hints.empty()) { + ps.dispatch_q_idx = ps.alt_hints[0]; + } else { + ps.dispatch_q_idx = Rand(&pt.rand) % num_threads_; + } + WorkerData& dispatch_td = worker_data_[ps.dispatch_q_idx]; + Queue& dispatch_que = dispatch_td.queue; + // assign dispatch task to selected dispatcher + Task t = dispatch_que.PushBackWithTag(dispatch_task, pt.tag, ps.dispatch_w_idx); + if (t) { + ps.dispatch_q_idx = -1; // failed to enqueue dispatch_task + } else { + dispatch_td.EnsureAwake(); // make sure that dipatch worker is awake + } + profiler_.LogEnd(ThreadPoolProfiler::DISTRIBUTION_ENQUEUE); } // Run a single parallel loop in an existing parallel section. This @@ -1196,8 +1027,7 @@ void RunInParallelInternal(PerThread& pt, // dispatching the loop to those workers. void RunInParallelSection(ThreadPoolParallelSection &ps, std::function fn, - unsigned n, - std::ptrdiff_t block_size) override { + unsigned n, std::ptrdiff_t block_size) override { profiler_.LogStartAndCoreAndBlock(block_size); PerThread* pt = GetPerThread(); assert(pt->leading_par_section && "RunInParallel, but not in parallel section"); @@ -1212,28 +1042,37 @@ void RunInParallelSection(ThreadPoolParallelSection &ps, // Increase the worker count if needed. Each worker will pick up // loops to execute from the current parallel section. - std::function worker_fn = [&ps](unsigned par_idx) { + std::function worker_fn = [&ps](unsigned my_idx) { while (ps.active) { if (!ps.current_loop) { onnxruntime::concurrency::SpinPause(); } else { ps.workers_in_loop++; ThreadPoolLoop *work_item = ps.current_loop; - if (work_item && par_idx < work_item->threads_needed) { - work_item->fn(par_idx); + if (work_item && my_idx < work_item->threads_needed) { + work_item->fn(my_idx); } ps.workers_in_loop--; } } }; - RunInParallelInternal(*pt, ps, n, false, std::move(worker_fn)); - assert(ps.dispatch_q_idx == -1); + RunInParallelInternal(*pt, ps, n, std::move(worker_fn)); profiler_.LogEndAndStart(ThreadPoolProfiler::DISTRIBUTION); // Run work in the main thread loop.fn(0); profiler_.LogEndAndStart(ThreadPoolProfiler::RUN); - + if (ps.dispatch_q_idx > -1) { + Queue& q = worker_data_[ps.dispatch_q_idx].queue; + if (q.RevokeWithTag(pt->tag, ps.dispatch_w_idx)) { + ps.dispatch_q_idx = -1; // cancel dispatch if not started yet + } else { + // if dispatch task started, wait for its dispatch completion + while (!ps.dispatch_done.load(std::memory_order_acquire)) { + onnxruntime::concurrency::SpinPause(); + } + } + } // Wait for workers to exit the loop ps.current_loop = 0; while (ps.workers_in_loop) { @@ -1259,7 +1098,7 @@ void RunInParallel(std::function fn, unsigned n, std::ptrdif PerThread* pt = GetPerThread(); ThreadPoolParallelSection ps; StartParallelSectionInternal(*pt, ps); - RunInParallelInternal(*pt, ps, n, true, fn); // select dispatcher and do job distribution; + RunInParallelInternal(*pt, ps, n, fn); // select dispatcher and do job distribution; profiler_.LogEndAndStart(ThreadPoolProfiler::DISTRIBUTION); fn(0); // run fn(0) profiler_.LogEndAndStart(ThreadPoolProfiler::RUN); @@ -1281,6 +1120,18 @@ int CurrentThreadId() const EIGEN_FINAL { } private: + +#ifdef NDEBUG + void AssertBounds(int, int) { + } +#else + void AssertBounds(int start, int end) { + assert(start >= 0); + assert(start < end); // non-zero sized partition + assert(end <= num_threads_); + } +#endif + void ComputeCoprimes(int N, Eigen::MaxSizeVector* coprimes) { for (int i = 1; i <= N; i++) { unsigned a = i; @@ -1300,10 +1151,13 @@ int CurrentThreadId() const EIGEN_FINAL { typedef typename Environment::EnvThread Thread; struct WorkerData; - // PerThread objects are allocated in thread-local storage and - // allocated on the thread's first call to GetPerThread. PerThread - // objects are allocated for all threads that submit work to the - // thread pool, in addition to threads within the pool. + // PerThread objects are allocated in thread-local storage and allocated + // on the thread's first call to GetPerThread. The object should + // remain trivially-destructable, with other state placed in the + // WorkerData objects that are allocated and cleaned-up explicitly. + // + // PerThread objects are allocated for all threads that submit work to + // the thread pool, in addition to threads within the pool. // // In contrast, the WorkerData objects are allocated only for the // threads in the pool, and their lifetime is managed along with the @@ -1313,21 +1167,15 @@ int CurrentThreadId() const EIGEN_FINAL { constexpr PerThread() : pool(nullptr) { } ThreadPoolTempl* pool; // Parent pool, or null for normal threads. - bool initialized{false}; // Non-trivial initialization ran (e.g. for RNG) uint64_t rand{0}; // Random generator state. int thread_id{-1}; // Worker thread index in pool. Tag tag{}; // Work item tag used to identify this thread. bool leading_par_section{false}; // Leading a parallel section (used only for asserts) - - // When this thread is entering a parallel section, it will - // initially push work to this set of workers. The aim is to - // retain cache state within the workers, and to reduce the number - // of times that the work-stealing code paths are used for - // rebalancing. - std::vector preferred_workers; - PaddingToAvoidFalseSharing padding_2; }; + static_assert(std::is_trivially_destructible::value, + "Per-thread state should be trivially destructible"); + struct WorkerData { constexpr WorkerData() : thread(), queue() { } @@ -1421,7 +1269,7 @@ int CurrentThreadId() const EIGEN_FINAL { }; Environment& env_; - const unsigned num_threads_; + const int num_threads_; const bool allow_spinning_; const bool set_denormal_as_zero_; Eigen::MaxSizeVector worker_data_; @@ -1457,9 +1305,11 @@ int CurrentThreadId() const EIGEN_FINAL { Queue& q = td.queue; bool should_exit = false; pt->pool = this; + pt->rand = GlobalThreadIdHash(); pt->thread_id = thread_id; assert(td.GetStatus() == WorkerData::ThreadStatus::Spinning); + SetGoodWorkerHint(thread_id, true /* Is good */); const int log2_spin = 20; const int spin_count = allow_spinning_ ? (1ull< bool { - bool should_block = true; - // Number of blocked threads is used as termination condition. - // If we are shutting down and all worker threads blocked without work, - // that's we are done. - blocked_++; - if (done_ && blocked_ == num_threads_) { - should_block = false; - // Almost done, but need to re-check queues. - // Consider that all queues are empty and all worker threads are preempted - // right after incrementing blocked_ above. Now a free-standing thread - // submits work and calls destructor (which sets done_). If we don't - // re-check queues, we will exit leaving the work unexecuted. - if (NonEmptyQueueIndex() != -1) { - // Note: we must not pop from queues before we decrement blocked_, - // otherwise the following scenario is possible. Consider that instead - // of checking for emptiness we popped the only element from queues. - // Now other worker threads can start exiting, which is bad if the - // work item submits other work. So we just check emptiness here, - // which ensures that all worker threads exit at the same time. - blocked_--; - } else { - should_exit = true; - } - } - return should_block; - }, - // Post-block update (executed only if we blocked) - [&]() { - blocked_--; - }); - // Thread just unblocked. Aside from exit conditions, - // either work was pushed to us, or it was pushed to an - // overloaded queue - assert(!t); - t = q.PopFront(); - if (!t) t = Steal(StealAttemptKind::TRY_ALL);} + // No work passed to us while spinning; make a further full attempt to + // steal work from other threads prior to blocking. + if (num_threads_ != 1) { + t = Steal(true /* true => check all queues */); + } + if (!t) { + td.SetBlocked( + // Pre-block test + [&]() -> bool { + bool should_block = true; + // We already did a best-effort emptiness check when stealing; now + // do a full check prior to blocking. + int victim = NonEmptyQueueIndex(); + if (victim != -1) { + should_block = false; + t = worker_data_[victim].queue.PopBack(); + } + // Number of blocked threads is used as termination condition. + // If we are shutting down and all worker threads blocked without work, + // that's we are done. + if (should_block) { + blocked_++; + if (done_ && blocked_ == static_cast(num_threads_)) { + should_block = false; + // Almost done, but need to re-check queues. + // Consider that all queues are empty and all worker threads are preempted + // right after incrementing blocked_ above. Now a free-standing thread + // submits work and calls destructor (which sets done_). If we don't + // re-check queues, we will exit leaving the work unexecuted. + if (NonEmptyQueueIndex() != -1) { + // Note: we must not pop from queues before we decrement blocked_, + // otherwise the following scenario is possible. Consider that instead + // of checking for emptiness we popped the only element from queues. + // Now other worker threads can start exiting, which is bad if the + // work item submits other work. So we just check emptiness here, + // which ensures that all worker threads exit at the same time. + blocked_--; + } else { + should_exit = true; + } + } + } + return should_block; + }, + // Post-block update (executed only if we blocked) + [&]() { + blocked_--; + }); + } + } } if (t) { td.SetActive(); @@ -1529,46 +1392,61 @@ int CurrentThreadId() const EIGEN_FINAL { td.SetSpinning(); } } - - // Whichever thread(s) observe the termination conditions are responsible for waking - // any other threads that have remained blocked. - if (should_exit) { - WakeAllWorkersForExit(); + + // Whichever thread(s) observe the termination conditions are responsible for waking + // any other threads that have remained blocked. + if (should_exit) { + WakeAllWorkersForExit(); + } } - } - // Steal tries to steal work from other worker threads in a - // best-effort manner. We steal only from threads that are running - // in user code (ThreadStatus::Active). The intuition behind this - // is that the thread is busy with other work, and we will avoid - // "snatching" work from a thread which is just about to notice the - // work itself. + // Steal tries to steal work from other worker threads in the range [start, + // limit) in best-effort manner. We make two passes over the threads: + // + // - round 0 : we attempt to steal from threads that are running in + // user code (ThreadStatus::Active). The intuition behind this is that + // the thread is busy with other work, and that by preferring to + // steel from busy victims we will avoid "snatching" work from a + // thread which is just about to notice the work itself. + // + // - round 1 : we steal work from any thread, including those which claim + // to be spinning. In these cases, even though the victim thread is + // looking for work itself, it may have been pre-empted. - Task Steal(StealAttemptKind steal_kind) { + Task Steal(bool check_all) { PerThread* pt = GetPerThread(); - unsigned size = num_threads_; - unsigned num_attempts = (steal_kind == StealAttemptKind::TRY_ALL) ? size : 1; + unsigned size = static_cast(num_threads_); unsigned r = Rand(&pt->rand); unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()]; - unsigned victim = r % size; - - for (unsigned i = 0; i < num_attempts; i++) { - assert(victim < size); - if (worker_data_[victim].GetStatus() == WorkerData::ThreadStatus::Active) { - Task t = worker_data_[victim].queue.PopBack(); - if (t) { - return t; + + for (int round = 0; round < 2; round++) { + unsigned victim = r % size; + for (unsigned i = 0; i < size; i++) { + assert(victim < size); + if (round == 1 || + worker_data_[victim].GetStatus() == WorkerData::ThreadStatus::Active) { + Task t = worker_data_[victim].queue.PopBack(); + if (t) { + return t; + } + } + if (!check_all) { + return Task(); + } + victim += inc; + if (victim >= size) { + victim -= size; } - } - victim += inc; - if (victim >= size) { - victim -= size; } } return Task(); } + Task TrySteal() { + return Steal(false); + } + int NonEmptyQueueIndex() { PerThread* pt = GetPerThread(); const unsigned size = static_cast(worker_data_.size()); @@ -1594,10 +1472,6 @@ int CurrentThreadId() const EIGEN_FINAL { static EIGEN_STRONG_INLINE PerThread* GetPerThread() { static thread_local PerThread per_thread_; PerThread* pt = &per_thread_; - if (!pt->initialized) { - pt->rand = GlobalThreadIdHash(); - pt->initialized = true; - } return pt; } diff --git a/onnxruntime/test/platform/threadpool_test.cc b/onnxruntime/test/platform/threadpool_test.cc index efbe04ef53..1755807475 100644 --- a/onnxruntime/test/platform/threadpool_test.cc +++ b/onnxruntime/test/platform/threadpool_test.cc @@ -168,7 +168,6 @@ void TestPoolCreation(const std::string&, int iter) { ASSERT_EQ(ctr, iter * per_iter); } -// Test multi-loop parallel sections, with a series of fixed-size loops void TestMultiLoopSections(const std::string& name, int num_threads, int num_loops) { for (int rep = 0; rep < 5; rep++) { const int num_tasks = 1024; @@ -187,35 +186,6 @@ void TestMultiLoopSections(const std::string& name, int num_threads, int num_loo } } -// Test multi-loop parallel sections, with alternating larger and -// smaller loops. This helps test that we can dispatch work to -// differing numbers of threads over time. -void TestStagedMultiLoopSections(const std::string& name, int num_threads, int num_loops) { - for (int rep = 0; rep < 5; rep++) { - auto test_data1 = CreateTestData(num_threads/2); - auto test_data2 = CreateTestData(num_threads); - CreateThreadPoolAndTest(name, num_threads, [&](ThreadPool* tp) { - ThreadPool::ParallelSection ps(tp); - for (int l = 0; l < num_loops; l++) { - // Loop needing few threads - ThreadPool::TrySimpleParallelFor(tp, - num_threads / 2, - [&](std::ptrdiff_t i) { - IncrementElement(*test_data1, i); - }); - // Loop needing more threads, forcing growth of set of threads in use - ThreadPool::TrySimpleParallelFor(tp, - num_threads, - [&](std::ptrdiff_t i) { - IncrementElement(*test_data2, i); - }); - } - }); - ValidateTestData(*test_data1, num_loops); - ValidateTestData(*test_data2, num_loops); - } -} - } // namespace namespace onnxruntime { @@ -381,10 +351,6 @@ TEST(ThreadPoolTest, TestMultiLoopSections_1Thread_1Loop) { TestMultiLoopSections("TestMultiLoopSections_1Thread_1Loop", 1, 1); } -TEST(ThreadPoolTest, TestMultiLoopSections_1Thread_2Loop) { - TestMultiLoopSections("TestMultiLoopSections_1Thread_2Loop", 1, 2); -} - TEST(ThreadPoolTest, TestMultiLoopSections_2Thread_0Loop) { TestMultiLoopSections("TestMultiLoopSections_2Thread_0Loop", 2, 0); } @@ -413,17 +379,6 @@ TEST(ThreadPoolTest, TestMultiLoopSections_4Thread_100Loop) { TestMultiLoopSections("TestMultiLoopSections_4Thread_100Loop", 4, 100); } -TEST(ThreadPoolTest, TestStagedMultiLoopSections_4Thread_1Loop) { - TestStagedMultiLoopSections("TestStagedMultiLoopSections_4Thread_1Loop", 4, 1); -} - -TEST(ThreadPoolTest, TestStagedMultiLoopSections_4Thread_10Loop) { - TestStagedMultiLoopSections("TestStagedMultiLoopSections_4Thread_10Loop", 4, 10); -} - -TEST(ThreadPoolTest, TestStagedMultiLoopSections_4Thread_100Loop) { - TestStagedMultiLoopSections("TestStagedMultiLoopSections_4Thread_100Loop", 4, 100); -} #ifdef _WIN32 #pragma warning(push) #pragma warning(disable : 6387)