Remove thread pool's cancel method and suppress some warnings (#7411)

This commit is contained in:
Changming Sun 2021-04-26 09:33:48 -07:00 committed by GitHub
parent 368e4a324f
commit b5592856a7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 98 additions and 123 deletions

View file

@ -723,9 +723,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
worker_data_(num_threads),
all_coprimes_(num_threads),
blocked_(0),
done_(false),
cancelled_(false) {
done_(false) {
// Calculate coprimes of all numbers [1, num_threads].
// Coprimes are used for random walks over all threads in Steal
// and NonEmptyQueueIndex. Iteration is based on the fact that if we take
@ -756,15 +754,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
// Now if all threads block without work, they will start exiting.
// But note that threads can continue to work arbitrary long,
// block, submit new work, unblock and otherwise live full life.
if (!cancelled_) {
WakeAllWorkersForExit();
} else {
// Since we were cancelled, there might be entries in the queues.
// Empty them to prevent their destructor from asserting.
for (size_t i = 0; i < worker_data_.size(); i++) {
worker_data_[i].queue.Flush();
}
}
WakeAllWorkersForExit();
// Join threads explicitly (by destroying) to avoid destruction order within
// this class.
for (size_t i = 0; i < worker_data_.size(); ++i) worker_data_[i].thread.reset();
@ -1107,22 +1097,6 @@ void RunInParallel(std::function<void(unsigned idx)> fn, unsigned n, std::ptrdif
profiler_.LogEnd(ThreadPoolProfiler::WAIT);
}
void Cancel() override {
cancelled_ = true;
// If done_ is true, which means this object is being destructing.
// Therefore worker_data_[i].thread could be NULL.
if (!done_) {
done_ = true;
// Let each thread know it's been cancelled.
for (size_t i = 0; i < worker_data_.size(); i++) {
assert(worker_data_[i].thread != nullptr);
worker_data_[i].thread->OnCancel();
}
}
// Wake up the threads without work to let them exit on their own.
WakeAllWorkersForExit();
}
int NumThreads() const EIGEN_FINAL {
return num_threads_;
@ -1293,7 +1267,6 @@ int CurrentThreadId() const EIGEN_FINAL {
Eigen::MaxSizeVector<Eigen::MaxSizeVector<unsigned>> all_coprimes_;
std::atomic<unsigned> blocked_; // Count of blocked workers, used as a termination condition
std::atomic<bool> done_;
std::atomic<bool> cancelled_;
// Allow control over how many bits to use in each entry in good_worker_hints_.
// We reduce this below the full 64-bit word size for two reasons. First, it
@ -1305,8 +1278,7 @@ int CurrentThreadId() const EIGEN_FINAL {
unsigned num_hint_words_;
std::unique_ptr<std::atomic<uint64_t>[]> good_worker_hints_;
// Wake any blocked workers so that they can cleanly exit WorkerLoop(). For an
// abrupt exit, cancelled_==true and threads will exit their worker loops. For
// Wake any blocked workers so that they can cleanly exit WorkerLoop(). For
// a clean exit, each thread will observe (1) done_ set, indicating that the
// destructor has been called, (2) all threads blocked, and (3) no
// items in the work queues.
@ -1337,82 +1309,80 @@ int CurrentThreadId() const EIGEN_FINAL {
SetDenormalAsZero(set_denormal_as_zero_);
profiler_.LogThreadId(thread_id);
while (!cancelled_ && !should_exit) {
Task t = q.PopFront();
if (!t) {
// Spin waiting for work. We indicate, via SetGOodWorkerHint that we are
// spinning. This will bias other threads toward pushing work to our queue.
// In addition, priodically make a best-effort attempt to steal from other
// threads which are not themselves spinning.
while (!should_exit) {
Task t = q.PopFront();
if (!t) {
// Spin waiting for work. We indicate, via SetGOodWorkerHint that we are
// spinning. This will bias other threads toward pushing work to our queue.
// In addition, priodically make a best-effort attempt to steal from other
// threads which are not themselves spinning.
SetGoodWorkerHint(thread_id, true);
for (int i = 0; i < spin_count && !t && !cancelled_ && !done_; i++) {
t = ((i + 1) % steal_count == 0) ? TrySteal() : q.PopFront();
onnxruntime::concurrency::SpinPause();
}
SetGoodWorkerHint(thread_id, false);
if (!t) {
// No work passed to us while spinning; make a further full attempt to
// steal work from other threads prior to blocking.
if (num_threads_ != 1) {
t = Steal(true /* true => check all queues */);
}
if (!t) {
td.SetBlocked(
// Pre-block test
[&]() -> bool {
bool should_block = true;
// We already did a best-effort emptiness check when stealing; now
// do a full check prior to blocking.
int victim = NonEmptyQueueIndex();
if (victim != -1) {
should_block = false;
if (!cancelled_) {
t = worker_data_[victim].queue.PopBack();
}
}
// Number of blocked threads is used as termination condition.
// If we are shutting down and all worker threads blocked without work,
// that's we are done.
if (should_block) {
blocked_++;
if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
should_block = false;
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
// right after incrementing blocked_ above. Now a free-standing thread
// submits work and calls destructor (which sets done_). If we don't
// re-check queues, we will exit leaving the work unexecuted.
if (NonEmptyQueueIndex() != -1) {
// Note: we must not pop from queues before we decrement blocked_,
// otherwise the following scenario is possible. Consider that instead
// of checking for emptiness we popped the only element from queues.
// Now other worker threads can start exiting, which is bad if the
// work item submits other work. So we just check emptiness here,
// which ensures that all worker threads exit at the same time.
blocked_--;
} else {
should_exit = true;
}
}
}
return should_block;
},
// Post-block update (executed only if we blocked)
[&]() {
blocked_--;
});
}
}
SetGoodWorkerHint(thread_id, true);
for (int i = 0; i < spin_count && !t && !done_; i++) {
t = ((i + 1) % steal_count == 0) ? TrySteal() : q.PopFront();
onnxruntime::concurrency::SpinPause();
}
if (t) {
td.SetActive();
t();
profiler_.LogRun(thread_id);
td.SetSpinning();
SetGoodWorkerHint(thread_id, false);
if (!t) {
// No work passed to us while spinning; make a further full attempt to
// steal work from other threads prior to blocking.
if (num_threads_ != 1) {
t = Steal(true /* true => check all queues */);
}
if (!t) {
td.SetBlocked(
// Pre-block test
[&]() -> bool {
bool should_block = true;
// We already did a best-effort emptiness check when stealing; now
// do a full check prior to blocking.
int victim = NonEmptyQueueIndex();
if (victim != -1) {
should_block = false;
t = worker_data_[victim].queue.PopBack();
}
// Number of blocked threads is used as termination condition.
// If we are shutting down and all worker threads blocked without work,
// that's we are done.
if (should_block) {
blocked_++;
if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
should_block = false;
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
// right after incrementing blocked_ above. Now a free-standing thread
// submits work and calls destructor (which sets done_). If we don't
// re-check queues, we will exit leaving the work unexecuted.
if (NonEmptyQueueIndex() != -1) {
// Note: we must not pop from queues before we decrement blocked_,
// otherwise the following scenario is possible. Consider that instead
// of checking for emptiness we popped the only element from queues.
// Now other worker threads can start exiting, which is bad if the
// work item submits other work. So we just check emptiness here,
// which ensures that all worker threads exit at the same time.
blocked_--;
} else {
should_exit = true;
}
}
}
return should_block;
},
// Post-block update (executed only if we blocked)
[&]() {
blocked_--;
});
}
}
}
if (t) {
td.SetActive();
t();
profiler_.LogRun(thread_id);
td.SetSpinning();
}
}
// Whichever thread(s) observe the termination conditions are responsible for waking
// any other threads that have remained blocked.

View file

@ -48,7 +48,6 @@ using FileOffsetType = off_t;
class EnvThread {
public:
virtual void OnCancel() = 0;
virtual ~EnvThread() = default;
};

View file

@ -154,11 +154,6 @@ class PosixThread : public EnvThread {
#endif
}
// This function is called when the threadpool is cancelled.
// TODO: Find a way to avoid calling TerminateThread
void OnCancel() override {
}
private:
static void* ThreadMain(void* param) {
std::unique_ptr<Param> p((Param*)param);
@ -167,7 +162,7 @@ class PosixThread : public EnvThread {
p->start_address(p->index, p->param);
}
ORT_CATCH(const std::exception&) {
p->param->Cancel();
//ignore any exceptions
}
return nullptr;
}

View file

@ -77,7 +77,7 @@ struct SymbolHelper {
return;
}
_snprintf_s(buffer, _TRUNCATE, "%s(%d): %s", line.FileName, line.LineNumber, symbol.Name);
_snprintf_s(buffer, _TRUNCATE, "%s(%d): %s", line.FileName, static_cast<int>(line.LineNumber), symbol.Name);
string.append(buffer);
}
@ -233,7 +233,7 @@ Memory_LeakCheck::~Memory_LeakCheck() {
std::string string;
char buffer[1024];
_snprintf_s(buffer, _TRUNCATE, "%d bytes of memory leaked in %d allocations", leaked_bytes, leak_count);
_snprintf_s(buffer, _TRUNCATE, "%d bytes of memory leaked in %d allocations", static_cast<int>(leaked_bytes), static_cast<int>(leak_count));
string.append(buffer);
std::cout << "\n----- MEMORY LEAKS: " << string.c_str() << "\n";

View file

@ -64,16 +64,12 @@ class WindowsThread : public EnvThread {
FAIL_FAST_LAST_ERROR_IF(waitStatus == WAIT_FAILED);
}
// This function is called when the threadpool is cancelled.
// TODO: Find a way to avoid calling TerminateThread
void OnCancel() {
#if WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)
TerminateThread(hThread.get(), 1);
#endif
}
private:
typedef HRESULT(WINAPI* SetThreadDescriptionFunc)(HANDLE hThread, PCWSTR lpThreadDescription);
#pragma warning(push)
#pragma warning(disable : 6387)
static unsigned __stdcall ThreadMain(void* param) {
std::unique_ptr<Param> p((Param*)param);
// TODO: should I try to use SetThreadSelectedCpuSets?
@ -82,9 +78,11 @@ class WindowsThread : public EnvThread {
#if WINVER >= _WIN32_WINNT_WIN10
constexpr SetThreadDescriptionFunc pSetThrDesc = SetThreadDescription;
#elif WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)
HMODULE kernelModule = GetModuleHandle(TEXT("kernel32.dll"));
// kernel32.dll is always loaded
assert(kernelModule != nullptr);
auto pSetThrDesc =
(SetThreadDescriptionFunc)GetProcAddress(GetModuleHandle(TEXT("kernel32.dll")), "SetThreadDescription");
(SetThreadDescriptionFunc)GetProcAddress(kernelModule, "SetThreadDescription");
#else
constexpr SetThreadDescriptionFunc pSetThrDesc = nullptr;
#endif
@ -107,6 +105,8 @@ class WindowsThread : public EnvThread {
}
return ret;
}
#pragma warning(pop)
unsigned threadID = 0;
wil::unique_handle hThread;
};

View file

@ -782,9 +782,8 @@ void ResizeImpl(
reinterpret_cast<LinearMappingInfo*>(dims_mapping));
return;
}
ORT_THROW("Only bilinear/trilinear and bicubic modes are supported in Resize");
break;
case UpsampleMode::CUBIC:
if (is_2D) {
_ResizeCubicCoordinateMapping<T><<<blocksPerDimsMappingGrid, 32, 0, stream>>>(
@ -804,6 +803,9 @@ void ResizeImpl(
reinterpret_cast<CubicMappingInfo*>(dims_mapping));
return;
}
ORT_THROW("Only bilinear/trilinear and bicubic modes are supported in Resize");
case UpsampleMode::NN:
ORT_THROW("Only bilinear/trilinear and bicubic modes are supported in Resize");
}
}

View file

@ -8,8 +8,17 @@
#include <mutex>
#include "core/providers/shared/common.h"
#ifndef _Ret_notnull_
#define _Ret_notnull_
#endif
#ifndef _Post_writable_byte_size_
#define _Post_writable_byte_size_(n)
#endif
// Override default new/delete so that we match the host's allocator
void* operator new(size_t n) { return Provider_GetHost()->HeapAllocate(n); }
_Ret_notnull_ _Post_writable_byte_size_(n) void* operator new(size_t n) { return Provider_GetHost()->HeapAllocate(n); }
void operator delete(void* p) { return Provider_GetHost()->HeapFree(p); }
void operator delete(void* p, size_t /*size*/) { return Provider_GetHost()->HeapFree(p); }