[c10d] only PG0 should dump when monitoring thread timed out (#125356)

Summary:
We found that some dumps are missing when monitoring thread timeout.
This is likely due to multiple PGs could still dump the same records
at the same time. So we should allow only PG0 to actualy dump
Test Plan:
 unit test
python test/run_test.py --cpp --verbose -i cpp/ProcessGroupNCCLErrorsTest
Tags:

Pull Request resolved: https://github.com/pytorch/pytorch/pull/125356
Approved by: https://github.com/c-p-i-o
This commit is contained in:
Shuqiang Zhang 2024-05-03 13:38:04 -07:00 committed by PyTorch MergeBot
parent d325c55896
commit bfd5bb0c44
2 changed files with 36 additions and 9 deletions

View file

@ -389,6 +389,7 @@ TEST_F(ProcessGroupNCCLErrorsTest, testNCCLErrorsNoHeartbeat) {
setenv("TORCH_NCCL_DEBUG_INFO_TEMP_FILE", tempFilename.c_str(), 1) == 0);
// Enable nccl flight recorder.
ASSERT_TRUE(setenv("TORCH_NCCL_TRACE_BUFFER_SIZE", "10", 1) == 0);
ASSERT_TRUE(setenv(c10d::TORCH_NCCL_DUMP_ON_TIMEOUT[0].c_str(), "1", 1) == 0);
auto options = c10d::ProcessGroupNCCL::Options::create();
// Set a long watchdog timeout, so that we have enough time to lock the
// watchdog and let the heartbeat monitor thread to kick in.

View file

@ -1250,6 +1250,11 @@ void ProcessGroupNCCL::heartbeatMonitor() {
lastTimePollStore = currentTime;
if (globalStore_->check({std::string(EXCEPTION_DUMP)})) {
int timeOutRank = -1;
if (!shouldDump_.load()) {
LOG(ERROR)
<< logPrefix()
<< "First PG on this rank detecting the dump signal through tcpstore.";
}
shouldDump_.store(true);
try {
auto vec = globalStore_->get(std::string(EXCEPTION_DUMP));
@ -1295,6 +1300,11 @@ void ProcessGroupNCCL::heartbeatMonitor() {
if (heartbeat != heartBeatCounter) {
heartBeatCounter = heartbeat;
} else {
if (!shouldDump_.load()) {
LOG(ERROR)
<< logPrefix()
<< "First PG on this rank that detected no heartbeat of its watchdog.";
}
shouldDump_.store(true);
// No heartbeat increase detected and timeout.
errorMsg = c10::str(
@ -1337,16 +1347,18 @@ void ProcessGroupNCCL::heartbeatMonitor() {
cpp_dumper.value()([](const std::string& line) { LOG(INFO) << line; });
}
// Store debug info to storage if no other thread does it. (By default to
// local disk)
std::future<bool> asyncDebugDump = std::async(
std::launch::async, [this]() { return this->dumpDebuggingInfo(); });
if (checkDumpSignal && shouldDump_.load()) {
// Store debug info to storage if no other thread does it. (By default to
// local disk)
std::future<bool> asyncDebugDump = std::async(
std::launch::async, [this]() { return this->dumpDebuggingInfo(); });
// wait for the dump until timeout
waitForFutureOrTimeout(
asyncDebugDump,
std::chrono::milliseconds(waitTimeoutDumpInMilSec_),
"Flight recorder dump in heartbeatMonitor");
// wait for the dump until timeout
waitForFutureOrTimeout(
asyncDebugDump,
std::chrono::milliseconds(waitTimeoutDumpInMilSec_),
"Flight recorder dump in heartbeatMonitor");
}
if (get_gil_checker() != nullptr) {
auto fut = launchAsyncGilCheck();
@ -1567,6 +1579,16 @@ void ProcessGroupNCCL::watchdogHandler() {
// If work hits an exception (either an error or timeout)
if (work.exception()) {
// log as soon as exception is detected
LOG(ERROR) << c10::str(
logPrefix(),
"Exception (either an error or timeout) detected by watchdog at work: ",
work.seq_,
", last enqueued NCCL work: ",
lastEnqueuedSeq_,
", last completed NCCL work: ",
lastCompletedSeq_,
".");
// try to dump flight records if exception happens.
// Flight recorder behavior should be independent of desync Debug
if (dumpOnException_) {
@ -1576,6 +1598,10 @@ void ProcessGroupNCCL::watchdogHandler() {
reinterpret_cast<uint8_t*>(&rank),
reinterpret_cast<uint8_t*>(&rank) + sizeof(rank));
globalStore_->set(std::string(EXCEPTION_DUMP), vec);
if (!shouldDump_.load()) {
LOG(ERROR) << logPrefix()
<< "First watchdog to set the dump signal.";
}
// signal the monitor thread to start dumping
shouldDump_.store(true);
// This sleep is used to give time for dumping before throwing