Improve thread pool creation failure handling. (#13313)

### Description
Detect and report thread creation failure on Windows.
Do not throw out of constructor after the thread is created,
the thread handle is lost and cannot be joined, resulting in a deadlock.

Make setting a thread priority on Linux consistent with windows.
Set thread priority in the thread itself. Log failure properly,
but do not exit the thread.

### Motivation and Context
Address issues https://github.com/microsoft/onnxruntime/issues/13291
And
https://github.com/microsoft/onnxruntime/issues/13285#issuecomment-1278063223
This commit is contained in:
Dmitri Smirnov 2022-10-15 17:57:19 -07:00 committed by GitHub
parent 1ab11a111c
commit 4a63cd0290
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 141 additions and 63 deletions

View file

@ -681,6 +681,18 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
ThreadPoolProfiler profiler_;
void SignalAllAndWait() {
done_ = true;
// Now if all threads block without work, they will start exiting.
// But note that threads can continue to work arbitrary long,
// block, submit new work, unblock and otherwise live full life.
WakeAllWorkersForExit();
// Join threads explicitly (by destroying) to avoid destruction order within
// this class.
for (size_t i = 0; i < worker_data_.size(); ++i) worker_data_[i].thread.reset();
}
public:
void StartProfiling() override {
profiler_.Start();
@ -750,22 +762,24 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
ComputeCoprimes(i, &all_coprimes_.back());
}
worker_data_.resize(num_threads_);
for (auto i = 0u; i < num_threads_; i++) {
worker_data_[i].thread.reset(env_.CreateThread(name, i, WorkerLoop, this, thread_options));
// Eigen::MaxSizeVector has neither essential exception safety features
// such as swap, nor it is movable. So we have to join threads right here
// on exception
ORT_TRY {
worker_data_.resize(num_threads_);
for (auto i = 0u; i < num_threads_; i++) {
worker_data_[i].thread.reset(env_.CreateThread(name, i, WorkerLoop, this, thread_options));
}
} ORT_CATCH(...) {
ORT_HANDLE_EXCEPTION([&]() {
SignalAllAndWait();
throw;
});
}
}
~ThreadPoolTempl() override {
done_ = true;
// Now if all threads block without work, they will start exiting.
// But note that threads can continue to work arbitrary long,
// block, submit new work, unblock and otherwise live full life.
WakeAllWorkersForExit();
// Join threads explicitly (by destroying) to avoid destruction order within
// this class.
for (size_t i = 0; i < worker_data_.size(); ++i) worker_data_[i].thread.reset();
SignalAllAndWait();
}
// Run fn(). Ordinarily, the function will be added to the thread pool and executed

View file

@ -26,12 +26,15 @@ limitations under the License.
#include <fcntl.h>
#include <dlfcn.h>
#include <ftw.h>
#include <optional>
#include <string.h>
#include <thread>
#include <utility> // for std::forward
#include <vector>
#include <assert.h>
#include <gsl/gsl>
#include "core/common/common.h"
#include "core/common/logging/logging.h"
#include "core/platform/scoped_resource.h"
@ -54,8 +57,7 @@ class UnmapFileParam {
*
* @return a pair of {errno, error message}
*/
static std::pair<int, std::string> GetSystemError() {
auto e = errno;
static std::pair<int, std::string> GetSystemError(int e) {
char buf[1024];
const char* msg = "";
if (e > 0) {
@ -73,6 +75,11 @@ static std::pair<int, std::string> GetSystemError() {
return std::make_pair(e, msg);
}
static std::pair<int, std::string> GetSystemError() {
auto e = errno;
return GetSystemError(e);
}
static void UnmapFile(void* param) noexcept {
std::unique_ptr<UnmapFileParam> p(reinterpret_cast<UnmapFileParam*>(param));
int ret = munmap(p->addr, p->len);
@ -128,6 +135,7 @@ struct Freer {
using MallocdStringPtr = std::unique_ptr<char, Freer<char> >;
class PosixThread : public EnvThread {
private:
struct Param {
@ -135,22 +143,38 @@ class PosixThread : public EnvThread {
int index;
unsigned (*start_address)(int id, Eigen::ThreadPoolInterface* param);
Eigen::ThreadPoolInterface* param;
const ThreadOptions& thread_options;
std::optional<size_t> affinity_mask;
Param(const ORTCHAR_T* name_prefix1,
int index1,
unsigned (*start_address1)(int id, Eigen::ThreadPoolInterface* param),
Eigen::ThreadPoolInterface* param1)
: name_prefix(name_prefix1),
index(index1),
start_address(start_address1),
param(param1) {}
};
public:
PosixThread(const ORTCHAR_T* name_prefix, int index,
unsigned (*start_address)(int id, Eigen::ThreadPoolInterface* param), Eigen::ThreadPoolInterface* param,
const ThreadOptions& thread_options) {
ORT_ENFORCE(index >= 0, "Negative thread index is not allowed");
custom_create_thread_fn = thread_options.custom_create_thread_fn;
custom_thread_creation_options = thread_options.custom_thread_creation_options;
custom_join_thread_fn = thread_options.custom_join_thread_fn;
auto param_ptr = std::make_unique<Param>(name_prefix, index, start_address, param);
if (gsl::narrow<size_t>(index) < thread_options.affinity.size()) {
param_ptr->affinity_mask = thread_options.affinity[index];
}
if (custom_create_thread_fn) {
custom_thread_handle = custom_create_thread_fn(custom_thread_creation_options, CustomThreadMain, new Param{name_prefix, index, start_address, param, thread_options});
custom_thread_handle = custom_create_thread_fn(custom_thread_creation_options, CustomThreadMain, param_ptr.get());
if (!custom_thread_handle) {
ORT_THROW("custom_create_thread_fn returned invalid handle.");
}
param_ptr.release();
} else {
pthread_attr_t attr;
int s = pthread_attr_init(&attr);
@ -165,24 +189,14 @@ class PosixThread : public EnvThread {
ORT_THROW("pthread_attr_setstacksize failed, error code: ", err_no, " error msg: ", err_msg);
}
}
s = pthread_create(&hThread, &attr, ThreadMain,
new Param{name_prefix, index, start_address, param, thread_options});
s = pthread_create(&hThread, &attr, ThreadMain, param_ptr.get());
if (s != 0) {
auto [err_no, err_msg] = GetSystemError();
ORT_THROW("pthread_create failed, error code: ", err_no, " error msg: ", err_msg);
}
#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(__wasm__) && !defined(_AIX)
if (!thread_options.affinity.empty()) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(thread_options.affinity[index], &cpuset);
s = pthread_setaffinity_np(hThread, sizeof(cpu_set_t), &cpuset);
if (s != 0) {
auto [err_no, err_msg] = GetSystemError();
ORT_THROW("pthread_setaffinity_np failed, error code: ", err_no, " error msg: ", err_msg);
}
}
#endif
param_ptr.release();
// Do not throw beyond this point so we do not lose thread handle and then not being able to join it.
}
}
@ -203,13 +217,29 @@ class PosixThread : public EnvThread {
private:
static void* ThreadMain(void* param) {
std::unique_ptr<Param> p((Param*)param);
std::unique_ptr<Param> p(static_cast<Param*>(param));
ORT_TRY {
#if !defined(__APPLE__) && !defined(__ANDROID__) && !defined(__wasm__) && !defined(_AIX)
if (p->affinity_mask.has_value()) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(*p->affinity_mask, &cpuset);
// pthread_setaffinity_np() does not set errno, it returns it.
auto ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
if (ret != 0) {
auto [err_no, err_msg] = GetSystemError(ret);
LOGS_DEFAULT(ERROR) << "pthread_setaffinity_np failed for thread: " << pthread_self()
<< ", mask: " << *p->affinity_mask
<< ", error code: " << err_no << " error msg: " << err_msg
<< ". Specify the number of threads explicitly so the affinity is not set.";
}
}
#endif
// Ignore the returned value for now
p->start_address(p->index, p->param);
}
ORT_CATCH(const std::exception&) {
//ignore any exceptions
ORT_CATCH(...) {
// Ignore exceptions
}
return nullptr;
}
@ -440,7 +470,7 @@ class PosixEnv : public Env {
common::Status GetCanonicalPath(
const PathString& path,
PathString& canonical_path) const override {
MallocdStringPtr canonical_path_cstr{realpath(path.c_str(), nullptr)};
MallocdStringPtr canonical_path_cstr{realpath(path.c_str(), nullptr), Freer<char>()};
if (!canonical_path_cstr) {
return ReportSystemError("realpath", path);
}

View file

@ -19,12 +19,14 @@ limitations under the License.
#include <Windows.h>
#include <fstream>
#include <optional>
#include <string>
#include <thread>
#include <process.h>
#include <fcntl.h>
#include <io.h>
#include <gsl/gsl>
#include "core/common/logging/logging.h"
#include "core/platform/env.h"
#include "core/platform/scoped_resource.h"
@ -68,31 +70,53 @@ class WindowsThread : public EnvThread {
int index;
unsigned (*start_address)(int id, Eigen::ThreadPoolInterface* param);
Eigen::ThreadPoolInterface* param;
const ThreadOptions& thread_options;
std::optional<size_t> affinity_mask;
Param(const ORTCHAR_T* name_prefix1,
int index1,
unsigned (*start_address1)(int id, Eigen::ThreadPoolInterface* param),
Eigen::ThreadPoolInterface* param1,
const ThreadOptions& thread_options1) : name_prefix(name_prefix1), index(index1), start_address(start_address1), param(param1), thread_options(thread_options1) {}
Eigen::ThreadPoolInterface* param1)
: name_prefix(name_prefix1),
index(index1),
start_address(start_address1),
param(param1) {}
};
public:
WindowsThread(const ORTCHAR_T* name_prefix, int index,
unsigned (*start_address)(int id, Eigen::ThreadPoolInterface* param), Eigen::ThreadPoolInterface* param,
const ThreadOptions& thread_options) {
ORT_ENFORCE(index >= 0, "Negative thread index is not allowed");
custom_create_thread_fn = thread_options.custom_create_thread_fn;
custom_thread_creation_options = thread_options.custom_thread_creation_options;
custom_join_thread_fn = thread_options.custom_join_thread_fn;
std::unique_ptr<Param> local_param = std::make_unique<Param>(name_prefix, index, start_address, param, thread_options);
std::unique_ptr<Param> local_param = std::make_unique<Param>(name_prefix, index, start_address, param);
if (gsl::narrow<size_t>(index) < thread_options.affinity.size()) {
local_param->affinity_mask = thread_options.affinity[index];
}
if (custom_create_thread_fn) {
custom_thread_handle = custom_create_thread_fn(custom_thread_creation_options, (OrtThreadWorkerFn)CustomThreadMain, local_param.release());
custom_thread_handle = custom_create_thread_fn(custom_thread_creation_options, (OrtThreadWorkerFn)CustomThreadMain, local_param.get());
if (!custom_thread_handle) {
ORT_THROW("custom_create_thread_fn returned invalid handle.");
}
local_param.release();
} else {
hThread.reset(reinterpret_cast<HANDLE>(_beginthreadex(nullptr, thread_options.stack_size, ThreadMain,
local_param.release(), 0,
&threadID)));
_set_errno(0);
_set_doserrno(0);
auto th_handle = _beginthreadex(nullptr, thread_options.stack_size, ThreadMain,
local_param.get(), 0,
&threadID);
if (th_handle == 0) {
auto err = errno;
auto dos_error = _doserrno;
char message_buf[256];
strerror_s(message_buf, sizeof(message_buf), err);
ORT_THROW("WindowThread:_beginthreadex failed with message: ", message_buf, " doserrno: ", dos_error);
}
local_param.release();
hThread.reset(reinterpret_cast<HANDLE>(th_handle));
// Do not throw beyond this point so we do not lose thread handle and then not being able to join it.
}
}
@ -112,10 +136,7 @@ class WindowsThread : public EnvThread {
#pragma warning(push)
#pragma warning(disable : 6387)
static unsigned __stdcall ThreadMain(void* param) {
std::unique_ptr<Param> p((Param*)param);
// TODO: should I try to use SetThreadSelectedCpuSets?
if (!p->thread_options.affinity.empty())
SetThreadAffinityMask(GetCurrentThread(), p->thread_options.affinity[p->index]);
std::unique_ptr<Param> p(static_cast<Param*>(param));
#if WINVER >= _WIN32_WINNT_WIN10
constexpr SetThreadDescriptionFunc pSetThrDesc = SetThreadDescription;
#elif WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)
@ -137,9 +158,22 @@ class WindowsThread : public EnvThread {
}
unsigned ret = 0;
ORT_TRY {
// TODO: should I try to use SetThreadSelectedCpuSets?
if (p->affinity_mask.has_value()) {
auto rc = SetThreadAffinityMask(GetCurrentThread(), *p->affinity_mask);
if (!rc) {
const auto error_code = GetLastError();
LOGS_DEFAULT(ERROR) << "SetThreadAffinityMask failed for thread: " << GetCurrentThreadId()
<< ", mask: " << *p->affinity_mask
<< ", error code: " << error_code
<< ", error msg: " << std::system_category().message(error_code)
<< ". Specify the number of threads explicitly so the affinity is not set.";
}
}
ret = p->start_address(p->index, p->param);
}
ORT_CATCH(const std::exception&) {
ORT_CATCH(...) {
p->param->Cancel();
ret = 1;
}
@ -148,11 +182,11 @@ class WindowsThread : public EnvThread {
#pragma warning(pop)
static void __stdcall CustomThreadMain(void* param) {
std::unique_ptr<Param> p((Param*)param);
std::unique_ptr<Param> p(static_cast<Param*>(param));
ORT_TRY {
p->start_address(p->index, p->param);
}
ORT_CATCH(const std::exception&) {
ORT_CATCH(...) {
p->param->Cancel();
}
}
@ -222,7 +256,7 @@ class WindowsEnv : public Env {
ret.push_back(buffer[i].ProcessorMask);
}
}
if (ret.empty()){
if (ret.empty()) {
return generate_vector_of_n(std::thread::hardware_concurrency());
}
return ret;
@ -363,9 +397,9 @@ class WindowsEnv : public Env {
if (file_handle.get() == INVALID_HANDLE_VALUE) {
const auto error_code = GetLastError();
return ORT_MAKE_STATUS(ONNXRUNTIME, FAIL,
"open file ", ToUTF8String(Basename(file_path)),
" fail, errcode = ", error_code,
" - ", std::system_category().message(error_code));
"open file ", ToUTF8String(Basename(file_path)),
" fail, errcode = ", error_code,
" - ", std::system_category().message(error_code));
}
#if NTDDI_VERSION >= NTDDI_WIN10_RS5 && WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP | WINAPI_PARTITION_SYSTEM)
@ -391,9 +425,9 @@ class WindowsEnv : public Env {
if (file_mapping_handle.get() == INVALID_HANDLE_VALUE) {
const auto error_code = GetLastError();
return ORT_MAKE_STATUS(ONNXRUNTIME, FAIL,
"open file mapping ", ToUTF8String(Basename(file_path)),
" fail, errcode = ", error_code,
" - ", std::system_category().message(error_code));
"open file mapping ", ToUTF8String(Basename(file_path)),
" fail, errcode = ", error_code,
" - ", std::system_category().message(error_code));
}
SYSTEM_INFO sysinfo;
@ -407,11 +441,11 @@ class WindowsEnv : public Env {
if (mapped_offset % allocation_granularity != 0) {
const auto error_code = GetLastError();
return ORT_MAKE_STATUS(ONNXRUNTIME, FAIL,
"mapped offset must be a multiple of the allocation granularity",
" , mapped_offset = ", mapped_offset,
" , allocation_granularity = ", allocation_granularity,
" , errcode = ", error_code,
" - ", std::system_category().message(error_code));
"mapped offset must be a multiple of the allocation granularity",
" , mapped_offset = ", mapped_offset,
" , allocation_granularity = ", allocation_granularity,
" , errcode = ", error_code,
" - ", std::system_category().message(error_code));
}
void* const mapped_base = MapViewOfFile(file_mapping_handle.get(),
@ -650,7 +684,7 @@ class WindowsEnv : public Env {
static constexpr DWORD bufferLength = 64 * 1024;
std::wstring s(bufferLength, '\0');
FormatMessageW(
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
error_code,
@ -682,7 +716,7 @@ class WindowsEnv : public Env {
static constexpr DWORD bufferLength = 64 * 1024;
std::wstring s(bufferLength, '\0');
FormatMessageW(
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
error_code,