mirror of
https://github.com/saymrwulf/pytorch.git
synced 2026-05-15 21:00:47 +00:00
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/58753 TSAN was (rightfully!) detecting and complaining about a race due to the fact that upon init the TP agent exchanges the device maps between nodes using RPC requests (and by doing so it accesses the device maps) and then sets the reverse device maps (thus possibly modifying the set of devices). This resulted in a data race, i.e., simultaneously reading and writing the set of devices without synchronizing. One solution is to add a mutex around the devices, which works, but is "annoying". An alternative solution is to make the set of devices immutable (i.e., `const`). For that to work, we need to exchange the device maps without using RPC calls. We can do so using the process group that we need to create anyways. Since now there's a lot more logic in Python, I've moved (and restructured) all safety checks over there, and removed them from C++. ghstack-source-id: 130583775 Test Plan: Unit tests Reviewed By: mrshenli Differential Revision: D28603754 fbshipit-source-id: 88533e65d72d1eb806dc41bec8d55def5082e290
68 lines
2 KiB
C++
68 lines
2 KiB
C++
#include <gtest/gtest.h>
|
|
|
|
#include "e2e_test_base.h"
|
|
|
|
#include <c10d/ProcessGroupGloo.hpp>
|
|
#include <torch/csrc/distributed/rpc/request_callback_no_python.h>
|
|
#include <torch/csrc/distributed/rpc/tensorpipe_agent.h>
|
|
#include <torch/torch.h>
|
|
|
|
namespace torch {
|
|
namespace distributed {
|
|
namespace rpc {
|
|
|
|
#ifdef USE_TENSORPIPE
|
|
|
|
class TestE2ETensorPipe : public TestE2EBase {
|
|
protected:
|
|
void buildRpcAgent() override {
|
|
auto options = c10d::ProcessGroupGloo::Options::create();
|
|
options->devices.push_back(
|
|
::c10d::ProcessGroupGloo::createDeviceForHostname(serverAddress));
|
|
float rpcTimeout = 30;
|
|
|
|
// Initialize server rpc agent.
|
|
auto pg = c10::make_intrusive<c10d::ProcessGroupGloo>(
|
|
store, 0, numWorkers, options);
|
|
|
|
TensorPipeRpcBackendOptions opts(
|
|
/*numWorkerThreads=*/std::max(16U, std::thread::hardware_concurrency()),
|
|
/*transports=*/nullopt,
|
|
/*channels=*/nullopt,
|
|
/*rpc_timeout=*/rpcTimeout,
|
|
/*init_method=*/"unused");
|
|
|
|
rpcAgent = std::make_shared<TensorPipeAgent>(
|
|
store,
|
|
"worker",
|
|
0,
|
|
numWorkers,
|
|
pg,
|
|
opts,
|
|
std::unordered_map<std::string, DeviceMap>{},
|
|
std::vector<c10::Device>{},
|
|
std::make_unique<RequestCallbackNoPython>());
|
|
}
|
|
};
|
|
|
|
// End to end training loop test in C++ so that we can run LSAN on this test to
|
|
// catch memory leaks. Enabling LSAN with python multiprocessing has been
|
|
// challenging and we don't have a good solution yet.
|
|
TEST_F(TestE2ETensorPipe, TestTrainingLoop) {
|
|
runTrainingLoop();
|
|
// Ensure the tensorpipe internal state is cleared up.
|
|
auto tensorpipeAgent = std::static_pointer_cast<TensorPipeAgent>(rpcAgent);
|
|
|
|
// Shutdown RPC agent for all RPCs to clean up.
|
|
tensorpipeAgent->join();
|
|
tensorpipeAgent->shutdown();
|
|
ASSERT_EQ(0, tensorpipeAgent->numPendingResponses());
|
|
ASSERT_EQ(0, tensorpipeAgent->timeoutMapSize());
|
|
ASSERT_EQ(0, tensorpipeAgent->messageIdToTimeoutMapSize());
|
|
}
|
|
|
|
#endif
|
|
|
|
} // namespace rpc
|
|
} // namespace distributed
|
|
} // namespace torch
|