pytorch/test/cpp/c10d/ProcessGroupNCCLErrorsTest.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

260 lines
7.1 KiB
C++
Raw Normal View History

#include <chrono>
#include <c10/util/irange.h>
#include <torch/csrc/cuda/nccl.h>
#include <torch/csrc/distributed/c10d/FileStore.hpp>
#include <torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp>
#include "CUDATest.hpp"
#include "TestUtils.hpp"
#include <gtest/gtest.h>
using namespace c10d::test;
constexpr int kNcclErrorHandlingVersion = 2400;
class WorkNCCLSimulateErrors : public c10d::ProcessGroupNCCL::WorkNCCL {
public:
WorkNCCLSimulateErrors(
const std::vector<at::Device>& devices,
[ci-all tests] Improve logging in ProcessGroupNCCL for debugging purposes. (#46010) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/46010 When training jobs running with NCCL fail sometimes it is hard to debug the reason of the failure and our logging doesn't provide enough information at times to narrow down the issue. To improve the debugging experience, I've enhanced our logging to add a lot more information about what the ProcessGroup is doing under the hood. #Closes: https://github.com/pytorch/pytorch/issues/45310 Sample output: ``` > I1002 15:18:48.539551 1822062 ProcessGroupNCCL.cpp:528] [Rank 2] NCCL watchdog thread started! > I1002 15:18:48.539533 1821946 ProcessGroupNCCL.cpp:492] [Rank 2] ProcessGroupNCCL initialized with following options: > NCCL_ASYNC_ERROR_HANDLING: 0 > NCCL_BLOCKING_WAIT: 1 > TIMEOUT(ms): 1000 > USE_HIGH_PRIORITY_STREAM: 0 > I1002 15:18:51.080338 1822035 ProcessGroupNCCL.cpp:530] [Rank 1] NCCL watchdog thread terminated normally > I1002 15:18:52.161218 1821930 ProcessGroupNCCL.cpp:385] [Rank 0] Wrote aborted communicator id to store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 > I1002 15:18:52.161238 1821930 ProcessGroupNCCL.cpp:388] [Rank 0] Caught collective operation timeout for work: WorkNCCL(OpType=ALLREDUCE, TensorShape=[10], Timeout(ms)=1000) > I1002 15:18:52.162120 1821957 ProcessGroupNCCL.cpp:530] [Rank 0] NCCL watchdog thread terminated normally > I1002 15:18:58.539937 1822062 ProcessGroupNCCL.cpp:649] [Rank 2] Found key in store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, from rank: 0, aborting appropriate communicators > I1002 15:19:34.740937 1822062 ProcessGroupNCCL.cpp:662] [Rank 2] Aborted communicators for key in store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 > I1002 15:19:34.741678 1822062 ProcessGroupNCCL.cpp:530] [Rank 2] NCCL watchdog thread terminated normally ``` ghstack-source-id: 113961408 Test Plan: waitforbuildbot Reviewed By: osalpekar Differential Revision: D24183463 fbshipit-source-id: cb09c1fb3739972294e7edde4aae331477621c67
2020-10-09 16:44:49 +00:00
bool simulate_error,
int rank,
c10d::OpType opType,
uint64_t seq)
: WorkNCCL(devices, rank, opType, seq), simulate_error_(simulate_error) {}
std::exception_ptr checkForNCCLErrors(
const std::vector<std::shared_ptr<c10d::NCCLComm>>& ncclComms)
const override {
if (simulate_error_) {
return std::make_exception_ptr(std::runtime_error("Error"));
}
return c10d::ProcessGroupNCCL::WorkNCCL::checkForNCCLErrors(ncclComms);
}
private:
bool simulate_error_;
};
class ProcessGroupNCCLSimulateErrors : public c10d::ProcessGroupNCCL {
public:
ProcessGroupNCCLSimulateErrors(
const c10::intrusive_ptr<c10d::Store>& store,
int rank,
int size,
c10::intrusive_ptr<c10d::ProcessGroupNCCL::Options> opts)
: ProcessGroupNCCL(store, rank, size, opts), simulate_error_(false) {}
std::exception_ptr checkForNCCLErrors(
const std::vector<std::shared_ptr<c10d::NCCLComm>>& ncclComms) override {
if (simulate_error_) {
return std::make_exception_ptr(std::runtime_error("Error"));
}
return c10d::ProcessGroupNCCL::checkForNCCLErrors(ncclComms);
}
std::chrono::duration<int64_t, std::milli> getWatchdogSleepInterval() {
return std::chrono::milliseconds(
ProcessGroupNCCLSimulateErrors::kWatchdogThreadSleepMillis);
}
c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> initWork(
[ci-all tests] Improve logging in ProcessGroupNCCL for debugging purposes. (#46010) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/46010 When training jobs running with NCCL fail sometimes it is hard to debug the reason of the failure and our logging doesn't provide enough information at times to narrow down the issue. To improve the debugging experience, I've enhanced our logging to add a lot more information about what the ProcessGroup is doing under the hood. #Closes: https://github.com/pytorch/pytorch/issues/45310 Sample output: ``` > I1002 15:18:48.539551 1822062 ProcessGroupNCCL.cpp:528] [Rank 2] NCCL watchdog thread started! > I1002 15:18:48.539533 1821946 ProcessGroupNCCL.cpp:492] [Rank 2] ProcessGroupNCCL initialized with following options: > NCCL_ASYNC_ERROR_HANDLING: 0 > NCCL_BLOCKING_WAIT: 1 > TIMEOUT(ms): 1000 > USE_HIGH_PRIORITY_STREAM: 0 > I1002 15:18:51.080338 1822035 ProcessGroupNCCL.cpp:530] [Rank 1] NCCL watchdog thread terminated normally > I1002 15:18:52.161218 1821930 ProcessGroupNCCL.cpp:385] [Rank 0] Wrote aborted communicator id to store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 > I1002 15:18:52.161238 1821930 ProcessGroupNCCL.cpp:388] [Rank 0] Caught collective operation timeout for work: WorkNCCL(OpType=ALLREDUCE, TensorShape=[10], Timeout(ms)=1000) > I1002 15:18:52.162120 1821957 ProcessGroupNCCL.cpp:530] [Rank 0] NCCL watchdog thread terminated normally > I1002 15:18:58.539937 1822062 ProcessGroupNCCL.cpp:649] [Rank 2] Found key in store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, from rank: 0, aborting appropriate communicators > I1002 15:19:34.740937 1822062 ProcessGroupNCCL.cpp:662] [Rank 2] Aborted communicators for key in store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 > I1002 15:19:34.741678 1822062 ProcessGroupNCCL.cpp:530] [Rank 2] NCCL watchdog thread terminated normally ``` ghstack-source-id: 113961408 Test Plan: waitforbuildbot Reviewed By: osalpekar Differential Revision: D24183463 fbshipit-source-id: cb09c1fb3739972294e7edde4aae331477621c67
2020-10-09 16:44:49 +00:00
std::vector<at::Device> devices,
int rank,
c10d::OpType opType,
const char* profilingTitle,
const c10::optional<std::vector<at::Tensor>>& inputs =
c10::nullopt) override {
return c10::make_intrusive<WorkNCCLSimulateErrors>(
devices, simulate_error_, rank, opType, seq_);
}
size_t getNCCLCommCacheSize() {
return devNCCLCommMap_.size();
}
void simulate_error() {
simulate_error_ = true;
}
void reset_error() {
simulate_error_ = false;
}
private:
bool simulate_error_;
};
class WorkNCCLTimedoutErrors : public c10d::ProcessGroupNCCL::WorkNCCL {
public:
WorkNCCLTimedoutErrors(
const std::vector<at::Device>& devices,
[ci-all tests] Improve logging in ProcessGroupNCCL for debugging purposes. (#46010) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/46010 When training jobs running with NCCL fail sometimes it is hard to debug the reason of the failure and our logging doesn't provide enough information at times to narrow down the issue. To improve the debugging experience, I've enhanced our logging to add a lot more information about what the ProcessGroup is doing under the hood. #Closes: https://github.com/pytorch/pytorch/issues/45310 Sample output: ``` > I1002 15:18:48.539551 1822062 ProcessGroupNCCL.cpp:528] [Rank 2] NCCL watchdog thread started! > I1002 15:18:48.539533 1821946 ProcessGroupNCCL.cpp:492] [Rank 2] ProcessGroupNCCL initialized with following options: > NCCL_ASYNC_ERROR_HANDLING: 0 > NCCL_BLOCKING_WAIT: 1 > TIMEOUT(ms): 1000 > USE_HIGH_PRIORITY_STREAM: 0 > I1002 15:18:51.080338 1822035 ProcessGroupNCCL.cpp:530] [Rank 1] NCCL watchdog thread terminated normally > I1002 15:18:52.161218 1821930 ProcessGroupNCCL.cpp:385] [Rank 0] Wrote aborted communicator id to store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 > I1002 15:18:52.161238 1821930 ProcessGroupNCCL.cpp:388] [Rank 0] Caught collective operation timeout for work: WorkNCCL(OpType=ALLREDUCE, TensorShape=[10], Timeout(ms)=1000) > I1002 15:18:52.162120 1821957 ProcessGroupNCCL.cpp:530] [Rank 0] NCCL watchdog thread terminated normally > I1002 15:18:58.539937 1822062 ProcessGroupNCCL.cpp:649] [Rank 2] Found key in store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, from rank: 0, aborting appropriate communicators > I1002 15:19:34.740937 1822062 ProcessGroupNCCL.cpp:662] [Rank 2] Aborted communicators for key in store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 > I1002 15:19:34.741678 1822062 ProcessGroupNCCL.cpp:530] [Rank 2] NCCL watchdog thread terminated normally ``` ghstack-source-id: 113961408 Test Plan: waitforbuildbot Reviewed By: osalpekar Differential Revision: D24183463 fbshipit-source-id: cb09c1fb3739972294e7edde4aae331477621c67
2020-10-09 16:44:49 +00:00
bool set_timedout_error,
int rank,
c10d::OpType opType,
uint64_t seq)
: WorkNCCL(devices, rank, opType, seq),
[ci-all tests] Improve logging in ProcessGroupNCCL for debugging purposes. (#46010) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/46010 When training jobs running with NCCL fail sometimes it is hard to debug the reason of the failure and our logging doesn't provide enough information at times to narrow down the issue. To improve the debugging experience, I've enhanced our logging to add a lot more information about what the ProcessGroup is doing under the hood. #Closes: https://github.com/pytorch/pytorch/issues/45310 Sample output: ``` > I1002 15:18:48.539551 1822062 ProcessGroupNCCL.cpp:528] [Rank 2] NCCL watchdog thread started! > I1002 15:18:48.539533 1821946 ProcessGroupNCCL.cpp:492] [Rank 2] ProcessGroupNCCL initialized with following options: > NCCL_ASYNC_ERROR_HANDLING: 0 > NCCL_BLOCKING_WAIT: 1 > TIMEOUT(ms): 1000 > USE_HIGH_PRIORITY_STREAM: 0 > I1002 15:18:51.080338 1822035 ProcessGroupNCCL.cpp:530] [Rank 1] NCCL watchdog thread terminated normally > I1002 15:18:52.161218 1821930 ProcessGroupNCCL.cpp:385] [Rank 0] Wrote aborted communicator id to store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 > I1002 15:18:52.161238 1821930 ProcessGroupNCCL.cpp:388] [Rank 0] Caught collective operation timeout for work: WorkNCCL(OpType=ALLREDUCE, TensorShape=[10], Timeout(ms)=1000) > I1002 15:18:52.162120 1821957 ProcessGroupNCCL.cpp:530] [Rank 0] NCCL watchdog thread terminated normally > I1002 15:18:58.539937 1822062 ProcessGroupNCCL.cpp:649] [Rank 2] Found key in store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, from rank: 0, aborting appropriate communicators > I1002 15:19:34.740937 1822062 ProcessGroupNCCL.cpp:662] [Rank 2] Aborted communicators for key in store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 > I1002 15:19:34.741678 1822062 ProcessGroupNCCL.cpp:530] [Rank 2] NCCL watchdog thread terminated normally ``` ghstack-source-id: 113961408 Test Plan: waitforbuildbot Reviewed By: osalpekar Differential Revision: D24183463 fbshipit-source-id: cb09c1fb3739972294e7edde4aae331477621c67
2020-10-09 16:44:49 +00:00
set_timedout_error_(set_timedout_error) {}
private:
bool isCompleted() override {
if (set_timedout_error_) {
return false;
}
return c10d::ProcessGroupNCCL::WorkNCCL::isCompleted();
}
private:
bool set_timedout_error_;
};
class ProcessGroupNCCLTimedOutErrors : public ProcessGroupNCCLSimulateErrors {
public:
ProcessGroupNCCLTimedOutErrors(
const c10::intrusive_ptr<c10d::Store>& store,
int rank,
int size,
c10::intrusive_ptr<c10d::ProcessGroupNCCL::Options> opts)
: ProcessGroupNCCLSimulateErrors(store, rank, size, opts),
set_timedout_error_(false) {}
c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> initWork(
[ci-all tests] Improve logging in ProcessGroupNCCL for debugging purposes. (#46010) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/46010 When training jobs running with NCCL fail sometimes it is hard to debug the reason of the failure and our logging doesn't provide enough information at times to narrow down the issue. To improve the debugging experience, I've enhanced our logging to add a lot more information about what the ProcessGroup is doing under the hood. #Closes: https://github.com/pytorch/pytorch/issues/45310 Sample output: ``` > I1002 15:18:48.539551 1822062 ProcessGroupNCCL.cpp:528] [Rank 2] NCCL watchdog thread started! > I1002 15:18:48.539533 1821946 ProcessGroupNCCL.cpp:492] [Rank 2] ProcessGroupNCCL initialized with following options: > NCCL_ASYNC_ERROR_HANDLING: 0 > NCCL_BLOCKING_WAIT: 1 > TIMEOUT(ms): 1000 > USE_HIGH_PRIORITY_STREAM: 0 > I1002 15:18:51.080338 1822035 ProcessGroupNCCL.cpp:530] [Rank 1] NCCL watchdog thread terminated normally > I1002 15:18:52.161218 1821930 ProcessGroupNCCL.cpp:385] [Rank 0] Wrote aborted communicator id to store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 > I1002 15:18:52.161238 1821930 ProcessGroupNCCL.cpp:388] [Rank 0] Caught collective operation timeout for work: WorkNCCL(OpType=ALLREDUCE, TensorShape=[10], Timeout(ms)=1000) > I1002 15:18:52.162120 1821957 ProcessGroupNCCL.cpp:530] [Rank 0] NCCL watchdog thread terminated normally > I1002 15:18:58.539937 1822062 ProcessGroupNCCL.cpp:649] [Rank 2] Found key in store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, from rank: 0, aborting appropriate communicators > I1002 15:19:34.740937 1822062 ProcessGroupNCCL.cpp:662] [Rank 2] Aborted communicators for key in store: NCCLABORTEDCOMM:a0e17500002836080c8384c50000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 > I1002 15:19:34.741678 1822062 ProcessGroupNCCL.cpp:530] [Rank 2] NCCL watchdog thread terminated normally ``` ghstack-source-id: 113961408 Test Plan: waitforbuildbot Reviewed By: osalpekar Differential Revision: D24183463 fbshipit-source-id: cb09c1fb3739972294e7edde4aae331477621c67
2020-10-09 16:44:49 +00:00
std::vector<at::Device> devices,
int rank,
c10d::OpType opType,
const char* profilingTitle,
const c10::optional<std::vector<at::Tensor>>& inputs =
c10::nullopt) override {
return c10::make_intrusive<WorkNCCLTimedoutErrors>(
devices, set_timedout_error_, rank, opType, seq_);
}
void set_timedout_error() {
set_timedout_error_ = true;
}
void reset_timedout_error() {
set_timedout_error_ = false;
}
private:
bool set_timedout_error_;
};
class ProcessGroupNCCLErrorsTest : public ::testing::Test {
protected:
bool skipTest() {
if (cudaNumDevices() == 0) {
LOG(INFO) << "Skipping test since CUDA is not available";
return true;
}
#ifdef USE_C10D_NCCL
if (torch::cuda::nccl::version() < kNcclErrorHandlingVersion) {
LOG(INFO) << "Skipping test since NCCL version is too old";
return true;
}
#endif
return false;
}
void SetUp() override {
size_t numDevices = cudaNumDevices();
TemporaryFile file;
store_ = c10::make_intrusive<::c10d::FileStore>(file.path, 1);
at::cuda::OptionalCUDAGuard deviceGuard;
tensors_.resize(numDevices);
for (const auto i : c10::irange(numDevices)) {
deviceGuard.set_index(i);
tensors_[i] = at::ones({3, 3}, at::kCUDA);
}
}
void TearDown() override {
ASSERT_TRUE(setenv(c10d::NCCL_BLOCKING_WAIT, "0", 1) == 0);
}
std::vector<at::Tensor> tensors_;
c10::intrusive_ptr<::c10d::FileStore> store_;
};
TEST_F(ProcessGroupNCCLErrorsTest, testNCCLErrorsBlocking) {
if (skipTest()) {
return;
}
ASSERT_TRUE(setenv(c10d::NCCL_BLOCKING_WAIT, "1", 1) == 0);
auto options = c10d::ProcessGroupNCCL::Options::create();
options->timeout = std::chrono::milliseconds(1000);
ProcessGroupNCCLSimulateErrors pg(store_, 0, 1, options);
auto work = pg.allreduce(tensors_);
work->wait();
EXPECT_TRUE(work->isSuccess());
EXPECT_EQ(1, pg.getNCCLCommCacheSize());
// Now run all reduce with errors.
pg.simulate_error();
work = pg.allreduce(tensors_);
EXPECT_THROW(work->wait(), std::runtime_error);
// Verify the work item failed.
EXPECT_TRUE(work->isCompleted());
EXPECT_FALSE(work->isSuccess());
EXPECT_THROW(work->wait(), std::runtime_error);
// Communicators might be aborted here, further operations would fail.
}
TEST_F(ProcessGroupNCCLErrorsTest, testNCCLTimedoutErrorsBlocking) {
if (skipTest()) {
return;
}
ASSERT_TRUE(setenv(c10d::NCCL_BLOCKING_WAIT, "1", 1) == 0);
auto options = c10d::ProcessGroupNCCL::Options::create();
options->timeout = std::chrono::milliseconds(3000);
ProcessGroupNCCLTimedOutErrors pg(store_, 0, 1, options);
auto work = pg.allreduce(tensors_);
work->wait();
EXPECT_TRUE(work->isSuccess());
EXPECT_EQ(1, pg.getNCCLCommCacheSize());
// Now run all reduce with errors.
pg.set_timedout_error();
work = pg.allreduce(tensors_);
EXPECT_THROW(work->wait(), c10::Error);
// Communicators might be aborted here, further operations would fail.
}
TEST_F(ProcessGroupNCCLErrorsTest, testNCCLErrorsNonBlocking) {
if (skipTest()) {
return;
}
auto options = c10d::ProcessGroupNCCL::Options::create();
options->timeout = std::chrono::milliseconds(3000);
ProcessGroupNCCLSimulateErrors pg(store_, 0, 1, options);
auto work = pg.allreduce(tensors_);
pg.barrier()->wait();
EXPECT_TRUE(work->isSuccess());
EXPECT_EQ(1, pg.getNCCLCommCacheSize());
// Now run all reduce with errors.
pg.simulate_error();
work = pg.allreduce(tensors_);
// Should not throw exceptions.
work->wait();
pg.barrier()->wait();
// Verify the work item failed.
EXPECT_TRUE(work->isCompleted());
EXPECT_FALSE(work->isSuccess());
// Communicators might be aborted here, further operations would fail.
}