diff --git a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h index ad9d2cf163..5be7364070 100644 --- a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h +++ b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h @@ -723,9 +723,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter worker_data_(num_threads), all_coprimes_(num_threads), blocked_(0), - done_(false), - cancelled_(false) { - + done_(false) { // Calculate coprimes of all numbers [1, num_threads]. // Coprimes are used for random walks over all threads in Steal // and NonEmptyQueueIndex. Iteration is based on the fact that if we take @@ -756,15 +754,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter // Now if all threads block without work, they will start exiting. // But note that threads can continue to work arbitrary long, // block, submit new work, unblock and otherwise live full life. - if (!cancelled_) { - WakeAllWorkersForExit(); - } else { - // Since we were cancelled, there might be entries in the queues. - // Empty them to prevent their destructor from asserting. - for (size_t i = 0; i < worker_data_.size(); i++) { - worker_data_[i].queue.Flush(); - } - } + WakeAllWorkersForExit(); // Join threads explicitly (by destroying) to avoid destruction order within // this class. for (size_t i = 0; i < worker_data_.size(); ++i) worker_data_[i].thread.reset(); @@ -1107,22 +1097,6 @@ void RunInParallel(std::function fn, unsigned n, std::ptrdif profiler_.LogEnd(ThreadPoolProfiler::WAIT); } -void Cancel() override { - cancelled_ = true; - // If done_ is true, which means this object is being destructing. - // Therefore worker_data_[i].thread could be NULL. - if (!done_) { - done_ = true; - // Let each thread know it's been cancelled. - for (size_t i = 0; i < worker_data_.size(); i++) { - assert(worker_data_[i].thread != nullptr); - worker_data_[i].thread->OnCancel(); - } - } - - // Wake up the threads without work to let them exit on their own. - WakeAllWorkersForExit(); -} int NumThreads() const EIGEN_FINAL { return num_threads_; @@ -1293,7 +1267,6 @@ int CurrentThreadId() const EIGEN_FINAL { Eigen::MaxSizeVector> all_coprimes_; std::atomic blocked_; // Count of blocked workers, used as a termination condition std::atomic done_; - std::atomic cancelled_; // Allow control over how many bits to use in each entry in good_worker_hints_. // We reduce this below the full 64-bit word size for two reasons. First, it @@ -1305,8 +1278,7 @@ int CurrentThreadId() const EIGEN_FINAL { unsigned num_hint_words_; std::unique_ptr[]> good_worker_hints_; - // Wake any blocked workers so that they can cleanly exit WorkerLoop(). For an - // abrupt exit, cancelled_==true and threads will exit their worker loops. For + // Wake any blocked workers so that they can cleanly exit WorkerLoop(). For // a clean exit, each thread will observe (1) done_ set, indicating that the // destructor has been called, (2) all threads blocked, and (3) no // items in the work queues. @@ -1337,82 +1309,80 @@ int CurrentThreadId() const EIGEN_FINAL { SetDenormalAsZero(set_denormal_as_zero_); profiler_.LogThreadId(thread_id); - while (!cancelled_ && !should_exit) { - Task t = q.PopFront(); - if (!t) { - // Spin waiting for work. We indicate, via SetGOodWorkerHint that we are - // spinning. This will bias other threads toward pushing work to our queue. - // In addition, priodically make a best-effort attempt to steal from other - // threads which are not themselves spinning. + while (!should_exit) { + Task t = q.PopFront(); + if (!t) { + // Spin waiting for work. We indicate, via SetGOodWorkerHint that we are + // spinning. This will bias other threads toward pushing work to our queue. + // In addition, priodically make a best-effort attempt to steal from other + // threads which are not themselves spinning. - SetGoodWorkerHint(thread_id, true); - for (int i = 0; i < spin_count && !t && !cancelled_ && !done_; i++) { - t = ((i + 1) % steal_count == 0) ? TrySteal() : q.PopFront(); - onnxruntime::concurrency::SpinPause(); - } - SetGoodWorkerHint(thread_id, false); - - if (!t) { - // No work passed to us while spinning; make a further full attempt to - // steal work from other threads prior to blocking. - if (num_threads_ != 1) { - t = Steal(true /* true => check all queues */); - } - if (!t) { - td.SetBlocked( - // Pre-block test - [&]() -> bool { - bool should_block = true; - // We already did a best-effort emptiness check when stealing; now - // do a full check prior to blocking. - int victim = NonEmptyQueueIndex(); - if (victim != -1) { - should_block = false; - if (!cancelled_) { - t = worker_data_[victim].queue.PopBack(); - } - } - // Number of blocked threads is used as termination condition. - // If we are shutting down and all worker threads blocked without work, - // that's we are done. - if (should_block) { - blocked_++; - if (done_ && blocked_ == static_cast(num_threads_)) { - should_block = false; - // Almost done, but need to re-check queues. - // Consider that all queues are empty and all worker threads are preempted - // right after incrementing blocked_ above. Now a free-standing thread - // submits work and calls destructor (which sets done_). If we don't - // re-check queues, we will exit leaving the work unexecuted. - if (NonEmptyQueueIndex() != -1) { - // Note: we must not pop from queues before we decrement blocked_, - // otherwise the following scenario is possible. Consider that instead - // of checking for emptiness we popped the only element from queues. - // Now other worker threads can start exiting, which is bad if the - // work item submits other work. So we just check emptiness here, - // which ensures that all worker threads exit at the same time. - blocked_--; - } else { - should_exit = true; - } - } - } - return should_block; - }, - // Post-block update (executed only if we blocked) - [&]() { - blocked_--; - }); - } - } + SetGoodWorkerHint(thread_id, true); + for (int i = 0; i < spin_count && !t && !done_; i++) { + t = ((i + 1) % steal_count == 0) ? TrySteal() : q.PopFront(); + onnxruntime::concurrency::SpinPause(); } - if (t) { - td.SetActive(); - t(); - profiler_.LogRun(thread_id); - td.SetSpinning(); + SetGoodWorkerHint(thread_id, false); + + if (!t) { + // No work passed to us while spinning; make a further full attempt to + // steal work from other threads prior to blocking. + if (num_threads_ != 1) { + t = Steal(true /* true => check all queues */); + } + if (!t) { + td.SetBlocked( + // Pre-block test + [&]() -> bool { + bool should_block = true; + // We already did a best-effort emptiness check when stealing; now + // do a full check prior to blocking. + int victim = NonEmptyQueueIndex(); + if (victim != -1) { + should_block = false; + t = worker_data_[victim].queue.PopBack(); + } + // Number of blocked threads is used as termination condition. + // If we are shutting down and all worker threads blocked without work, + // that's we are done. + if (should_block) { + blocked_++; + if (done_ && blocked_ == static_cast(num_threads_)) { + should_block = false; + // Almost done, but need to re-check queues. + // Consider that all queues are empty and all worker threads are preempted + // right after incrementing blocked_ above. Now a free-standing thread + // submits work and calls destructor (which sets done_). If we don't + // re-check queues, we will exit leaving the work unexecuted. + if (NonEmptyQueueIndex() != -1) { + // Note: we must not pop from queues before we decrement blocked_, + // otherwise the following scenario is possible. Consider that instead + // of checking for emptiness we popped the only element from queues. + // Now other worker threads can start exiting, which is bad if the + // work item submits other work. So we just check emptiness here, + // which ensures that all worker threads exit at the same time. + blocked_--; + } else { + should_exit = true; + } + } + } + return should_block; + }, + // Post-block update (executed only if we blocked) + [&]() { + blocked_--; + }); + } } } + if (t) { + td.SetActive(); + t(); + profiler_.LogRun(thread_id); + td.SetSpinning(); + } + } // Whichever thread(s) observe the termination conditions are responsible for waking // any other threads that have remained blocked. diff --git a/onnxruntime/core/platform/env.h b/onnxruntime/core/platform/env.h index 268f729acf..8f412dd84a 100644 --- a/onnxruntime/core/platform/env.h +++ b/onnxruntime/core/platform/env.h @@ -48,7 +48,6 @@ using FileOffsetType = off_t; class EnvThread { public: - virtual void OnCancel() = 0; virtual ~EnvThread() = default; }; diff --git a/onnxruntime/core/platform/posix/env.cc b/onnxruntime/core/platform/posix/env.cc index a6eed0664a..8805c3b10e 100644 --- a/onnxruntime/core/platform/posix/env.cc +++ b/onnxruntime/core/platform/posix/env.cc @@ -154,11 +154,6 @@ class PosixThread : public EnvThread { #endif } - // This function is called when the threadpool is cancelled. - // TODO: Find a way to avoid calling TerminateThread - void OnCancel() override { - } - private: static void* ThreadMain(void* param) { std::unique_ptr p((Param*)param); @@ -167,7 +162,7 @@ class PosixThread : public EnvThread { p->start_address(p->index, p->param); } ORT_CATCH(const std::exception&) { - p->param->Cancel(); + //ignore any exceptions } return nullptr; } diff --git a/onnxruntime/core/platform/windows/debug_alloc.cc b/onnxruntime/core/platform/windows/debug_alloc.cc index d8f1d092ec..c459645b04 100644 --- a/onnxruntime/core/platform/windows/debug_alloc.cc +++ b/onnxruntime/core/platform/windows/debug_alloc.cc @@ -77,7 +77,7 @@ struct SymbolHelper { return; } - _snprintf_s(buffer, _TRUNCATE, "%s(%d): %s", line.FileName, line.LineNumber, symbol.Name); + _snprintf_s(buffer, _TRUNCATE, "%s(%d): %s", line.FileName, static_cast(line.LineNumber), symbol.Name); string.append(buffer); } @@ -233,7 +233,7 @@ Memory_LeakCheck::~Memory_LeakCheck() { std::string string; char buffer[1024]; - _snprintf_s(buffer, _TRUNCATE, "%d bytes of memory leaked in %d allocations", leaked_bytes, leak_count); + _snprintf_s(buffer, _TRUNCATE, "%d bytes of memory leaked in %d allocations", static_cast(leaked_bytes), static_cast(leak_count)); string.append(buffer); std::cout << "\n----- MEMORY LEAKS: " << string.c_str() << "\n"; diff --git a/onnxruntime/core/platform/windows/env.cc b/onnxruntime/core/platform/windows/env.cc index ef9d8920c1..f33be15444 100644 --- a/onnxruntime/core/platform/windows/env.cc +++ b/onnxruntime/core/platform/windows/env.cc @@ -64,16 +64,12 @@ class WindowsThread : public EnvThread { FAIL_FAST_LAST_ERROR_IF(waitStatus == WAIT_FAILED); } - // This function is called when the threadpool is cancelled. - // TODO: Find a way to avoid calling TerminateThread - void OnCancel() { -#if WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP) - TerminateThread(hThread.get(), 1); -#endif - } private: typedef HRESULT(WINAPI* SetThreadDescriptionFunc)(HANDLE hThread, PCWSTR lpThreadDescription); + +#pragma warning(push) +#pragma warning(disable : 6387) static unsigned __stdcall ThreadMain(void* param) { std::unique_ptr p((Param*)param); // TODO: should I try to use SetThreadSelectedCpuSets? @@ -82,9 +78,11 @@ class WindowsThread : public EnvThread { #if WINVER >= _WIN32_WINNT_WIN10 constexpr SetThreadDescriptionFunc pSetThrDesc = SetThreadDescription; #elif WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP) + HMODULE kernelModule = GetModuleHandle(TEXT("kernel32.dll")); // kernel32.dll is always loaded + assert(kernelModule != nullptr); auto pSetThrDesc = - (SetThreadDescriptionFunc)GetProcAddress(GetModuleHandle(TEXT("kernel32.dll")), "SetThreadDescription"); + (SetThreadDescriptionFunc)GetProcAddress(kernelModule, "SetThreadDescription"); #else constexpr SetThreadDescriptionFunc pSetThrDesc = nullptr; #endif @@ -107,6 +105,8 @@ class WindowsThread : public EnvThread { } return ret; } +#pragma warning(pop) + unsigned threadID = 0; wil::unique_handle hThread; }; diff --git a/onnxruntime/core/providers/cuda/tensor/resize_impl.cu b/onnxruntime/core/providers/cuda/tensor/resize_impl.cu index a73a3e7598..73728f5182 100644 --- a/onnxruntime/core/providers/cuda/tensor/resize_impl.cu +++ b/onnxruntime/core/providers/cuda/tensor/resize_impl.cu @@ -782,9 +782,8 @@ void ResizeImpl( reinterpret_cast(dims_mapping)); return; } - + ORT_THROW("Only bilinear/trilinear and bicubic modes are supported in Resize"); break; - case UpsampleMode::CUBIC: if (is_2D) { _ResizeCubicCoordinateMapping<<>>( @@ -804,6 +803,9 @@ void ResizeImpl( reinterpret_cast(dims_mapping)); return; } + ORT_THROW("Only bilinear/trilinear and bicubic modes are supported in Resize"); + case UpsampleMode::NN: + ORT_THROW("Only bilinear/trilinear and bicubic modes are supported in Resize"); } } diff --git a/onnxruntime/core/providers/shared_library/provider_bridge_provider.cc b/onnxruntime/core/providers/shared_library/provider_bridge_provider.cc index 78fcc2b042..1bab852728 100644 --- a/onnxruntime/core/providers/shared_library/provider_bridge_provider.cc +++ b/onnxruntime/core/providers/shared_library/provider_bridge_provider.cc @@ -8,8 +8,17 @@ #include #include "core/providers/shared/common.h" +#ifndef _Ret_notnull_ +#define _Ret_notnull_ +#endif + + +#ifndef _Post_writable_byte_size_ +#define _Post_writable_byte_size_(n) +#endif + // Override default new/delete so that we match the host's allocator -void* operator new(size_t n) { return Provider_GetHost()->HeapAllocate(n); } +_Ret_notnull_ _Post_writable_byte_size_(n) void* operator new(size_t n) { return Provider_GetHost()->HeapAllocate(n); } void operator delete(void* p) { return Provider_GetHost()->HeapFree(p); } void operator delete(void* p, size_t /*size*/) { return Provider_GetHost()->HeapFree(p); }