diff --git a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h index ed94b14578..5c34df9841 100644 --- a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h +++ b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h @@ -681,6 +681,18 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter ThreadPoolProfiler profiler_; + void SignalAllAndWait() { + done_ = true; + + // Now if all threads block without work, they will start exiting. + // But note that threads can continue to work arbitrary long, + // block, submit new work, unblock and otherwise live full life. + WakeAllWorkersForExit(); + // Join threads explicitly (by destroying) to avoid destruction order within + // this class. + for (size_t i = 0; i < worker_data_.size(); ++i) worker_data_[i].thread.reset(); + } + public: void StartProfiling() override { profiler_.Start(); @@ -750,22 +762,24 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter ComputeCoprimes(i, &all_coprimes_.back()); } - worker_data_.resize(num_threads_); - for (auto i = 0u; i < num_threads_; i++) { - worker_data_[i].thread.reset(env_.CreateThread(name, i, WorkerLoop, this, thread_options)); + // Eigen::MaxSizeVector has neither essential exception safety features + // such as swap, nor it is movable. So we have to join threads right here + // on exception + ORT_TRY { + worker_data_.resize(num_threads_); + for (auto i = 0u; i < num_threads_; i++) { + worker_data_[i].thread.reset(env_.CreateThread(name, i, WorkerLoop, this, thread_options)); + } + } ORT_CATCH(...) { + ORT_HANDLE_EXCEPTION([&]() { + SignalAllAndWait(); + throw; + }); } } ~ThreadPoolTempl() override { - done_ = true; - - // Now if all threads block without work, they will start exiting. - // But note that threads can continue to work arbitrary long, - // block, submit new work, unblock and otherwise live full life. - WakeAllWorkersForExit(); - // Join threads explicitly (by destroying) to avoid destruction order within - // this class. - for (size_t i = 0; i < worker_data_.size(); ++i) worker_data_[i].thread.reset(); + SignalAllAndWait(); } // Run fn(). Ordinarily, the function will be added to the thread pool and executed diff --git a/onnxruntime/core/platform/posix/env.cc b/onnxruntime/core/platform/posix/env.cc index 06e0cd2a7d..bcdd748f40 100644 --- a/onnxruntime/core/platform/posix/env.cc +++ b/onnxruntime/core/platform/posix/env.cc @@ -26,12 +26,15 @@ limitations under the License. #include #include #include +#include #include #include #include // for std::forward #include #include +#include + #include "core/common/common.h" #include "core/common/logging/logging.h" #include "core/platform/scoped_resource.h" @@ -54,8 +57,7 @@ class UnmapFileParam { * * @return a pair of {errno, error message} */ -static std::pair GetSystemError() { - auto e = errno; +static std::pair GetSystemError(int e) { char buf[1024]; const char* msg = ""; if (e > 0) { @@ -73,6 +75,11 @@ static std::pair GetSystemError() { return std::make_pair(e, msg); } +static std::pair GetSystemError() { + auto e = errno; + return GetSystemError(e); +} + static void UnmapFile(void* param) noexcept { std::unique_ptr p(reinterpret_cast(param)); int ret = munmap(p->addr, p->len); @@ -128,6 +135,7 @@ struct Freer { using MallocdStringPtr = std::unique_ptr >; + class PosixThread : public EnvThread { private: struct Param { @@ -135,22 +143,38 @@ class PosixThread : public EnvThread { int index; unsigned (*start_address)(int id, Eigen::ThreadPoolInterface* param); Eigen::ThreadPoolInterface* param; - const ThreadOptions& thread_options; + std::optional affinity_mask; + + Param(const ORTCHAR_T* name_prefix1, + int index1, + unsigned (*start_address1)(int id, Eigen::ThreadPoolInterface* param), + Eigen::ThreadPoolInterface* param1) + : name_prefix(name_prefix1), + index(index1), + start_address(start_address1), + param(param1) {} }; public: PosixThread(const ORTCHAR_T* name_prefix, int index, unsigned (*start_address)(int id, Eigen::ThreadPoolInterface* param), Eigen::ThreadPoolInterface* param, const ThreadOptions& thread_options) { + ORT_ENFORCE(index >= 0, "Negative thread index is not allowed"); custom_create_thread_fn = thread_options.custom_create_thread_fn; custom_thread_creation_options = thread_options.custom_thread_creation_options; custom_join_thread_fn = thread_options.custom_join_thread_fn; + auto param_ptr = std::make_unique(name_prefix, index, start_address, param); + if (gsl::narrow(index) < thread_options.affinity.size()) { + param_ptr->affinity_mask = thread_options.affinity[index]; + } + if (custom_create_thread_fn) { - custom_thread_handle = custom_create_thread_fn(custom_thread_creation_options, CustomThreadMain, new Param{name_prefix, index, start_address, param, thread_options}); + custom_thread_handle = custom_create_thread_fn(custom_thread_creation_options, CustomThreadMain, param_ptr.get()); if (!custom_thread_handle) { ORT_THROW("custom_create_thread_fn returned invalid handle."); } + param_ptr.release(); } else { pthread_attr_t attr; int s = pthread_attr_init(&attr); @@ -165,24 +189,14 @@ class PosixThread : public EnvThread { ORT_THROW("pthread_attr_setstacksize failed, error code: ", err_no, " error msg: ", err_msg); } } - s = pthread_create(&hThread, &attr, ThreadMain, - new Param{name_prefix, index, start_address, param, thread_options}); + + s = pthread_create(&hThread, &attr, ThreadMain, param_ptr.get()); if (s != 0) { auto [err_no, err_msg] = GetSystemError(); ORT_THROW("pthread_create failed, error code: ", err_no, " error msg: ", err_msg); } -#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(__wasm__) && !defined(_AIX) - if (!thread_options.affinity.empty()) { - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(thread_options.affinity[index], &cpuset); - s = pthread_setaffinity_np(hThread, sizeof(cpu_set_t), &cpuset); - if (s != 0) { - auto [err_no, err_msg] = GetSystemError(); - ORT_THROW("pthread_setaffinity_np failed, error code: ", err_no, " error msg: ", err_msg); - } - } -#endif + param_ptr.release(); + // Do not throw beyond this point so we do not lose thread handle and then not being able to join it. } } @@ -203,13 +217,29 @@ class PosixThread : public EnvThread { private: static void* ThreadMain(void* param) { - std::unique_ptr p((Param*)param); + std::unique_ptr p(static_cast(param)); ORT_TRY { +#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(__wasm__) && !defined(_AIX) + if (p->affinity_mask.has_value()) { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(*p->affinity_mask, &cpuset); + // pthread_setaffinity_np() does not set errno, it returns it. + auto ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + if (ret != 0) { + auto [err_no, err_msg] = GetSystemError(ret); + LOGS_DEFAULT(ERROR) << "pthread_setaffinity_np failed for thread: " << pthread_self() + << ", mask: " << *p->affinity_mask + << ", error code: " << err_no << " error msg: " << err_msg + << ". Specify the number of threads explicitly so the affinity is not set."; + } + } +#endif // Ignore the returned value for now p->start_address(p->index, p->param); } - ORT_CATCH(const std::exception&) { - //ignore any exceptions + ORT_CATCH(...) { + // Ignore exceptions } return nullptr; } @@ -440,7 +470,7 @@ class PosixEnv : public Env { common::Status GetCanonicalPath( const PathString& path, PathString& canonical_path) const override { - MallocdStringPtr canonical_path_cstr{realpath(path.c_str(), nullptr)}; + MallocdStringPtr canonical_path_cstr{realpath(path.c_str(), nullptr), Freer()}; if (!canonical_path_cstr) { return ReportSystemError("realpath", path); } diff --git a/onnxruntime/core/platform/windows/env.cc b/onnxruntime/core/platform/windows/env.cc index c711f6d36a..23eb51f3b4 100644 --- a/onnxruntime/core/platform/windows/env.cc +++ b/onnxruntime/core/platform/windows/env.cc @@ -19,12 +19,14 @@ limitations under the License. #include #include +#include #include #include #include #include #include +#include #include "core/common/logging/logging.h" #include "core/platform/env.h" #include "core/platform/scoped_resource.h" @@ -68,31 +70,53 @@ class WindowsThread : public EnvThread { int index; unsigned (*start_address)(int id, Eigen::ThreadPoolInterface* param); Eigen::ThreadPoolInterface* param; - const ThreadOptions& thread_options; + std::optional affinity_mask; Param(const ORTCHAR_T* name_prefix1, int index1, unsigned (*start_address1)(int id, Eigen::ThreadPoolInterface* param), - Eigen::ThreadPoolInterface* param1, - const ThreadOptions& thread_options1) : name_prefix(name_prefix1), index(index1), start_address(start_address1), param(param1), thread_options(thread_options1) {} + Eigen::ThreadPoolInterface* param1) + : name_prefix(name_prefix1), + index(index1), + start_address(start_address1), + param(param1) {} }; public: WindowsThread(const ORTCHAR_T* name_prefix, int index, unsigned (*start_address)(int id, Eigen::ThreadPoolInterface* param), Eigen::ThreadPoolInterface* param, const ThreadOptions& thread_options) { + ORT_ENFORCE(index >= 0, "Negative thread index is not allowed"); custom_create_thread_fn = thread_options.custom_create_thread_fn; custom_thread_creation_options = thread_options.custom_thread_creation_options; custom_join_thread_fn = thread_options.custom_join_thread_fn; - std::unique_ptr local_param = std::make_unique(name_prefix, index, start_address, param, thread_options); + + std::unique_ptr local_param = std::make_unique(name_prefix, index, start_address, param); + if (gsl::narrow(index) < thread_options.affinity.size()) { + local_param->affinity_mask = thread_options.affinity[index]; + } + if (custom_create_thread_fn) { - custom_thread_handle = custom_create_thread_fn(custom_thread_creation_options, (OrtThreadWorkerFn)CustomThreadMain, local_param.release()); + custom_thread_handle = custom_create_thread_fn(custom_thread_creation_options, (OrtThreadWorkerFn)CustomThreadMain, local_param.get()); if (!custom_thread_handle) { ORT_THROW("custom_create_thread_fn returned invalid handle."); } + local_param.release(); } else { - hThread.reset(reinterpret_cast(_beginthreadex(nullptr, thread_options.stack_size, ThreadMain, - local_param.release(), 0, - &threadID))); + _set_errno(0); + _set_doserrno(0); + auto th_handle = _beginthreadex(nullptr, thread_options.stack_size, ThreadMain, + local_param.get(), 0, + &threadID); + if (th_handle == 0) { + auto err = errno; + auto dos_error = _doserrno; + char message_buf[256]; + strerror_s(message_buf, sizeof(message_buf), err); + ORT_THROW("WindowThread:_beginthreadex failed with message: ", message_buf, " doserrno: ", dos_error); + } + local_param.release(); + hThread.reset(reinterpret_cast(th_handle)); + // Do not throw beyond this point so we do not lose thread handle and then not being able to join it. } } @@ -112,10 +136,7 @@ class WindowsThread : public EnvThread { #pragma warning(push) #pragma warning(disable : 6387) static unsigned __stdcall ThreadMain(void* param) { - std::unique_ptr p((Param*)param); - // TODO: should I try to use SetThreadSelectedCpuSets? - if (!p->thread_options.affinity.empty()) - SetThreadAffinityMask(GetCurrentThread(), p->thread_options.affinity[p->index]); + std::unique_ptr p(static_cast(param)); #if WINVER >= _WIN32_WINNT_WIN10 constexpr SetThreadDescriptionFunc pSetThrDesc = SetThreadDescription; #elif WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP) @@ -137,9 +158,22 @@ class WindowsThread : public EnvThread { } unsigned ret = 0; ORT_TRY { + // TODO: should I try to use SetThreadSelectedCpuSets? + if (p->affinity_mask.has_value()) { + auto rc = SetThreadAffinityMask(GetCurrentThread(), *p->affinity_mask); + if (!rc) { + const auto error_code = GetLastError(); + LOGS_DEFAULT(ERROR) << "SetThreadAffinityMask failed for thread: " << GetCurrentThreadId() + << ", mask: " << *p->affinity_mask + << ", error code: " << error_code + << ", error msg: " << std::system_category().message(error_code) + << ". Specify the number of threads explicitly so the affinity is not set."; + } + } + ret = p->start_address(p->index, p->param); } - ORT_CATCH(const std::exception&) { + ORT_CATCH(...) { p->param->Cancel(); ret = 1; } @@ -148,11 +182,11 @@ class WindowsThread : public EnvThread { #pragma warning(pop) static void __stdcall CustomThreadMain(void* param) { - std::unique_ptr p((Param*)param); + std::unique_ptr p(static_cast(param)); ORT_TRY { p->start_address(p->index, p->param); } - ORT_CATCH(const std::exception&) { + ORT_CATCH(...) { p->param->Cancel(); } } @@ -222,7 +256,7 @@ class WindowsEnv : public Env { ret.push_back(buffer[i].ProcessorMask); } } - if (ret.empty()){ + if (ret.empty()) { return generate_vector_of_n(std::thread::hardware_concurrency()); } return ret; @@ -363,9 +397,9 @@ class WindowsEnv : public Env { if (file_handle.get() == INVALID_HANDLE_VALUE) { const auto error_code = GetLastError(); return ORT_MAKE_STATUS(ONNXRUNTIME, FAIL, - "open file ", ToUTF8String(Basename(file_path)), - " fail, errcode = ", error_code, - " - ", std::system_category().message(error_code)); + "open file ", ToUTF8String(Basename(file_path)), + " fail, errcode = ", error_code, + " - ", std::system_category().message(error_code)); } #if NTDDI_VERSION >= NTDDI_WIN10_RS5 && WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP | WINAPI_PARTITION_SYSTEM) @@ -391,9 +425,9 @@ class WindowsEnv : public Env { if (file_mapping_handle.get() == INVALID_HANDLE_VALUE) { const auto error_code = GetLastError(); return ORT_MAKE_STATUS(ONNXRUNTIME, FAIL, - "open file mapping ", ToUTF8String(Basename(file_path)), - " fail, errcode = ", error_code, - " - ", std::system_category().message(error_code)); + "open file mapping ", ToUTF8String(Basename(file_path)), + " fail, errcode = ", error_code, + " - ", std::system_category().message(error_code)); } SYSTEM_INFO sysinfo; @@ -407,11 +441,11 @@ class WindowsEnv : public Env { if (mapped_offset % allocation_granularity != 0) { const auto error_code = GetLastError(); return ORT_MAKE_STATUS(ONNXRUNTIME, FAIL, - "mapped offset must be a multiple of the allocation granularity", - " , mapped_offset = ", mapped_offset, - " , allocation_granularity = ", allocation_granularity, - " , errcode = ", error_code, - " - ", std::system_category().message(error_code)); + "mapped offset must be a multiple of the allocation granularity", + " , mapped_offset = ", mapped_offset, + " , allocation_granularity = ", allocation_granularity, + " , errcode = ", error_code, + " - ", std::system_category().message(error_code)); } void* const mapped_base = MapViewOfFile(file_mapping_handle.get(), @@ -650,7 +684,7 @@ class WindowsEnv : public Env { static constexpr DWORD bufferLength = 64 * 1024; std::wstring s(bufferLength, '\0'); FormatMessageW( - FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, error_code, @@ -682,7 +716,7 @@ class WindowsEnv : public Env { static constexpr DWORD bufferLength = 64 * 1024; std::wstring s(bufferLength, '\0'); FormatMessageW( - FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, error_code,