2021-06-09 21:42:47 +00:00
|
|
|
#include <c10/util/irange.h>
|
2021-06-24 19:37:29 +00:00
|
|
|
#include "StoreTestCommon.hpp"
|
2018-05-17 20:38:06 +00:00
|
|
|
|
|
|
|
|
#include <cstdlib>
|
2021-04-28 20:44:35 +00:00
|
|
|
#include <future>
|
2018-05-23 18:26:35 +00:00
|
|
|
#include <iostream>
|
2023-12-20 07:26:27 +00:00
|
|
|
#include <string>
|
2021-06-14 16:51:39 +00:00
|
|
|
#include <system_error>
|
2018-05-23 18:26:35 +00:00
|
|
|
#include <thread>
|
2018-05-17 20:38:06 +00:00
|
|
|
|
2019-12-03 03:50:45 +00:00
|
|
|
#include <gtest/gtest.h>
|
|
|
|
|
|
2022-09-30 05:13:48 +00:00
|
|
|
#include <torch/csrc/distributed/c10d/PrefixStore.hpp>
|
|
|
|
|
#include <torch/csrc/distributed/c10d/TCPStore.hpp>
|
2018-07-13 00:43:27 +00:00
|
|
|
|
2020-10-09 22:39:20 +00:00
|
|
|
constexpr int64_t kShortStoreTimeoutMillis = 100;
|
2021-04-28 20:44:35 +00:00
|
|
|
constexpr int defaultTimeout = 20;
|
TCPStore add watchKey method and new listener thread (#54264)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/54264
**Changes**
- Creates new listener thread on each client to run the callback
- Create new class which listener thread and master thread derive from, this class is used to handle shut down and clean up of the thread in windows and linux
- Add watchKey method and update any functions that changes the key value.
**Background**
This PR adds functionality to TCPStore to allow users to watch a key and execute a callback on key change.
It introduces this a new watchKey() API:
`TCPStore::watchKey(const std::string& key, std::function<void(std::string, std::string)> callback)` which has parameters `key` and `callback(old_key, new_key)` to run on key change. Since current methods are blocking, for example in`TCPStore::get()` a worker will send a "get key" request to the master -> wait for a response back -> then exit the function and return the value to user, we need a non-blocking, asynchronous way to execute the callback whenever a key changes. This is done by creating a new listener thread on each client which the master can communicate with.
Right now, the API is C++ only and only for TCPStore, the internal use case is for elastic RPC. We will have an internal key such as `_NumNodes` and all nodes in the elastic RPC group will watch this key. When a node leaves, this key will be updated and each node will execute a callback to clean up Autograd context and RRef context.
Test Plan: Imported from OSS
Reviewed By: mrshenli
Differential Revision: D27709912
Pulled By: H-Huang
fbshipit-source-id: 619aa3b2a8eb23f4be5f5736efdcca6c175aadf3
2021-04-14 20:19:42 +00:00
|
|
|
|
2021-04-28 20:44:35 +00:00
|
|
|
c10::intrusive_ptr<c10d::TCPStore> _createServer(
|
2023-08-28 16:37:46 +00:00
|
|
|
bool useLibUV,
|
2021-04-28 20:44:35 +00:00
|
|
|
int numWorkers = 1,
|
|
|
|
|
int timeout = defaultTimeout) {
|
|
|
|
|
return c10::make_intrusive<c10d::TCPStore>(
|
2021-04-15 14:38:40 +00:00
|
|
|
"127.0.0.1",
|
2021-06-05 14:47:29 +00:00
|
|
|
c10d::TCPStoreOptions{
|
|
|
|
|
/* port */ 0,
|
|
|
|
|
/* isServer */ true,
|
|
|
|
|
numWorkers,
|
|
|
|
|
/* waitWorkers */ false,
|
2023-08-28 16:37:46 +00:00
|
|
|
/* timeout */ std::chrono::seconds(timeout),
|
|
|
|
|
/* multiTenant */ false,
|
2024-07-15 00:48:43 +00:00
|
|
|
/* masterListenFd */ std::nullopt,
|
2023-08-28 16:37:46 +00:00
|
|
|
/* useLibUV*/ useLibUV});
|
2021-04-28 20:44:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Different ports for different tests.
|
2023-08-28 16:37:46 +00:00
|
|
|
void testHelper(bool useLibUV, const std::string& prefix = "") {
|
2021-12-10 05:59:50 +00:00
|
|
|
constexpr auto numThreads = 16;
|
|
|
|
|
constexpr auto numWorkers = numThreads + 1;
|
2021-04-28 20:44:35 +00:00
|
|
|
|
2023-08-28 16:37:46 +00:00
|
|
|
auto serverTCPStore = _createServer(useLibUV, numWorkers);
|
2020-01-13 22:20:42 +00:00
|
|
|
|
|
|
|
|
auto serverStore =
|
2020-11-12 06:49:06 +00:00
|
|
|
c10::make_intrusive<c10d::PrefixStore>(prefix, serverTCPStore);
|
2018-05-17 20:38:06 +00:00
|
|
|
// server store
|
2020-01-13 22:20:42 +00:00
|
|
|
auto serverThread = std::thread([&serverStore, &serverTCPStore] {
|
|
|
|
|
// Wait for all workers to join.
|
|
|
|
|
serverTCPStore->waitForWorkers();
|
|
|
|
|
|
|
|
|
|
// Basic set/get on the server store
|
|
|
|
|
c10d::test::set(*serverStore, "key0", "value0");
|
|
|
|
|
c10d::test::set(*serverStore, "key1", "value1");
|
|
|
|
|
c10d::test::set(*serverStore, "key2", "value2");
|
|
|
|
|
c10d::test::check(*serverStore, "key0", "value0");
|
|
|
|
|
c10d::test::check(*serverStore, "key1", "value1");
|
|
|
|
|
c10d::test::check(*serverStore, "key2", "value2");
|
2020-09-28 22:23:09 +00:00
|
|
|
serverStore->add("counter", 1);
|
2020-09-26 07:45:59 +00:00
|
|
|
auto numKeys = serverStore->getNumKeys();
|
|
|
|
|
// We expect 5 keys since 3 are added above, 'counter' is added by the
|
|
|
|
|
// helper thread, and the init key to coordinate workers.
|
|
|
|
|
EXPECT_EQ(numKeys, 5);
|
2020-09-28 22:23:09 +00:00
|
|
|
|
2021-02-08 21:41:54 +00:00
|
|
|
// Check compareSet, does not check return value
|
|
|
|
|
c10d::test::compareSet(
|
2021-04-29 20:55:42 +00:00
|
|
|
*serverStore, "key0", "wrongExpectedValue", "newValue");
|
2021-02-08 21:41:54 +00:00
|
|
|
c10d::test::check(*serverStore, "key0", "value0");
|
|
|
|
|
c10d::test::compareSet(*serverStore, "key0", "value0", "newValue");
|
|
|
|
|
c10d::test::check(*serverStore, "key0", "newValue");
|
|
|
|
|
|
2020-09-28 22:23:09 +00:00
|
|
|
auto delSuccess = serverStore->deleteKey("key0");
|
|
|
|
|
// Ensure that the key was successfully deleted
|
|
|
|
|
EXPECT_TRUE(delSuccess);
|
|
|
|
|
auto delFailure = serverStore->deleteKey("badKeyName");
|
|
|
|
|
// The key was not in the store so the delete operation should have failed
|
|
|
|
|
// and returned false.
|
|
|
|
|
EXPECT_FALSE(delFailure);
|
|
|
|
|
numKeys = serverStore->getNumKeys();
|
|
|
|
|
EXPECT_EQ(numKeys, 4);
|
2020-10-09 22:39:20 +00:00
|
|
|
auto timeout = std::chrono::milliseconds(kShortStoreTimeoutMillis);
|
|
|
|
|
serverStore->setTimeout(timeout);
|
2021-06-14 16:51:39 +00:00
|
|
|
EXPECT_THROW(serverStore->get("key0"), c10::Error);
|
2020-01-13 22:20:42 +00:00
|
|
|
});
|
2018-05-17 20:38:06 +00:00
|
|
|
|
|
|
|
|
// Hammer on TCPStore
|
|
|
|
|
std::vector<std::thread> threads;
|
2021-12-10 05:59:50 +00:00
|
|
|
constexpr auto numIterations = 1000;
|
2018-05-17 20:38:06 +00:00
|
|
|
c10d::test::Semaphore sem1, sem2;
|
|
|
|
|
|
2021-06-05 14:47:29 +00:00
|
|
|
c10d::TCPStoreOptions opts{};
|
|
|
|
|
opts.port = serverTCPStore->getPort();
|
|
|
|
|
opts.numWorkers = numWorkers;
|
|
|
|
|
|
2018-05-17 20:38:06 +00:00
|
|
|
// Each thread will have a client store to send/recv data
|
2020-11-12 06:49:06 +00:00
|
|
|
std::vector<c10::intrusive_ptr<c10d::TCPStore>> clientTCPStores;
|
|
|
|
|
std::vector<c10::intrusive_ptr<c10d::PrefixStore>> clientStores;
|
2021-06-09 21:42:47 +00:00
|
|
|
for (const auto i : c10::irange(numThreads)) {
|
2020-11-12 06:49:06 +00:00
|
|
|
clientTCPStores.push_back(
|
|
|
|
|
c10::make_intrusive<c10d::TCPStore>("127.0.0.1", opts));
|
|
|
|
|
clientStores.push_back(
|
|
|
|
|
c10::make_intrusive<c10d::PrefixStore>(prefix, clientTCPStores[i]));
|
2018-05-17 20:38:06 +00:00
|
|
|
}
|
|
|
|
|
|
2021-04-28 20:44:35 +00:00
|
|
|
std::string expectedCounterRes =
|
|
|
|
|
std::to_string(numThreads * numIterations + 1);
|
2018-05-17 20:38:06 +00:00
|
|
|
|
2021-06-09 21:42:47 +00:00
|
|
|
for (const auto i : c10::irange(numThreads)) {
|
2021-12-10 05:59:50 +00:00
|
|
|
threads.emplace_back(
|
|
|
|
|
std::thread([=, &sem1, &sem2, &clientStores, &expectedCounterRes] {
|
|
|
|
|
for (C10_UNUSED const auto j : c10::irange(numIterations)) {
|
2021-04-28 20:44:35 +00:00
|
|
|
clientStores[i]->add("counter", 1);
|
|
|
|
|
}
|
|
|
|
|
// Let each thread set and get key on its client store
|
|
|
|
|
std::string key = "thread_" + std::to_string(i);
|
2021-06-09 21:42:47 +00:00
|
|
|
for (const auto j : c10::irange(numIterations)) {
|
2021-04-28 20:44:35 +00:00
|
|
|
std::string val = "thread_val_" + std::to_string(j);
|
|
|
|
|
c10d::test::set(*clientStores[i], key, val);
|
|
|
|
|
c10d::test::check(*clientStores[i], key, val);
|
|
|
|
|
}
|
2022-06-11 17:22:58 +00:00
|
|
|
|
2021-04-28 20:44:35 +00:00
|
|
|
sem1.post();
|
|
|
|
|
sem2.wait();
|
|
|
|
|
// Check the counter results
|
|
|
|
|
c10d::test::check(*clientStores[i], "counter", expectedCounterRes);
|
|
|
|
|
// Now check other threads' written data
|
2021-06-09 21:42:47 +00:00
|
|
|
for (const auto j : c10::irange(numThreads)) {
|
2021-04-28 20:44:35 +00:00
|
|
|
if (j == i) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
std::string key = "thread_" + std::to_string(i);
|
|
|
|
|
std::string val = "thread_val_" + std::to_string(numIterations - 1);
|
|
|
|
|
c10d::test::check(*clientStores[i], key, val);
|
|
|
|
|
}
|
|
|
|
|
}));
|
2018-05-17 20:38:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sem1.wait(numThreads);
|
|
|
|
|
sem2.post(numThreads);
|
|
|
|
|
|
|
|
|
|
for (auto& thread : threads) {
|
|
|
|
|
thread.join();
|
|
|
|
|
}
|
|
|
|
|
|
2019-12-03 03:50:45 +00:00
|
|
|
serverThread.join();
|
|
|
|
|
|
2018-05-17 20:38:06 +00:00
|
|
|
// Clear the store to test that client disconnect won't shutdown the store
|
|
|
|
|
clientStores.clear();
|
2018-08-24 01:04:16 +00:00
|
|
|
clientTCPStores.clear();
|
2018-05-17 20:38:06 +00:00
|
|
|
|
|
|
|
|
// Check that the counter has the expected value
|
2019-12-03 03:50:45 +00:00
|
|
|
c10d::test::check(*serverStore, "counter", expectedCounterRes);
|
2018-05-17 20:38:06 +00:00
|
|
|
|
|
|
|
|
// Check that each threads' written data from the main thread
|
2021-06-09 21:42:47 +00:00
|
|
|
for (const auto i : c10::irange(numThreads)) {
|
2018-05-17 20:38:06 +00:00
|
|
|
std::string key = "thread_" + std::to_string(i);
|
|
|
|
|
std::string val = "thread_val_" + std::to_string(numIterations - 1);
|
2019-12-03 03:50:45 +00:00
|
|
|
c10d::test::check(*serverStore, key, val);
|
2018-05-17 20:38:06 +00:00
|
|
|
}
|
2018-08-24 01:04:16 +00:00
|
|
|
}
|
2019-12-03 03:50:45 +00:00
|
|
|
|
|
|
|
|
TEST(TCPStoreTest, testHelper) {
|
2023-08-28 16:37:46 +00:00
|
|
|
testHelper(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TEST(TCPStoreTest, testHelperUV) {
|
|
|
|
|
testHelper(true);
|
2019-12-03 03:50:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TEST(TCPStoreTest, testHelperPrefix) {
|
2023-08-28 16:37:46 +00:00
|
|
|
testHelper(false, "testPrefix");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TEST(TCPStoreTest, testHelperPrefixUV) {
|
|
|
|
|
testHelper(true, "testPrefix");
|
2018-05-17 20:38:06 +00:00
|
|
|
}
|
2021-04-28 20:44:35 +00:00
|
|
|
|
|
|
|
|
TEST(TCPStoreTest, testCleanShutdown) {
|
|
|
|
|
int numWorkers = 2;
|
|
|
|
|
|
|
|
|
|
auto serverTCPStore = std::make_unique<c10d::TCPStore>(
|
|
|
|
|
"127.0.0.1",
|
|
|
|
|
0,
|
|
|
|
|
numWorkers,
|
|
|
|
|
true,
|
|
|
|
|
std::chrono::seconds(defaultTimeout),
|
|
|
|
|
/* wait */ false);
|
|
|
|
|
c10d::test::set(*serverTCPStore, "key", "val");
|
|
|
|
|
|
|
|
|
|
auto clientTCPStore = c10::make_intrusive<c10d::TCPStore>(
|
|
|
|
|
"127.0.0.1",
|
2021-06-05 14:47:29 +00:00
|
|
|
c10d::TCPStoreOptions{
|
|
|
|
|
/* port */ serverTCPStore->getPort(),
|
|
|
|
|
/* isServer */ false,
|
|
|
|
|
numWorkers,
|
|
|
|
|
/* waitWorkers */ false,
|
|
|
|
|
/* timeout */ std::chrono::seconds(defaultTimeout)});
|
2021-04-28 20:44:35 +00:00
|
|
|
clientTCPStore->get("key");
|
|
|
|
|
|
|
|
|
|
auto clientThread = std::thread([&clientTCPStore] {
|
2023-08-30 21:47:35 +00:00
|
|
|
EXPECT_THROW(clientTCPStore->get("invalid_key"), c10::DistNetworkError);
|
2021-04-28 20:44:35 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// start server shutdown during a client request
|
|
|
|
|
serverTCPStore = nullptr;
|
|
|
|
|
|
|
|
|
|
clientThread.join();
|
|
|
|
|
}
|
2021-06-05 14:47:29 +00:00
|
|
|
|
2023-12-20 07:26:27 +00:00
|
|
|
TEST(TCPStoreTest, testLibUVPartialRead) {
|
|
|
|
|
int numWorkers = 2; // thread 0 creates both server and client
|
|
|
|
|
|
|
|
|
|
// server part
|
|
|
|
|
c10d::TCPStoreOptions server_opts{
|
|
|
|
|
0,
|
|
|
|
|
true, // is master
|
|
|
|
|
numWorkers,
|
|
|
|
|
false, // don't wait otherwise client thread won't spawn
|
|
|
|
|
std::chrono::seconds(defaultTimeout)};
|
|
|
|
|
server_opts.useLibUV = true;
|
|
|
|
|
|
|
|
|
|
auto serverTCPStore =
|
|
|
|
|
std::make_unique<c10d::TCPStore>("127.0.0.1", server_opts);
|
|
|
|
|
|
|
|
|
|
// client part
|
|
|
|
|
c10d::TCPStoreOptions client_opts{
|
|
|
|
|
serverTCPStore->getPort(),
|
|
|
|
|
false, // is master
|
|
|
|
|
numWorkers,
|
|
|
|
|
false, // wait workers
|
|
|
|
|
std::chrono::seconds(defaultTimeout)};
|
|
|
|
|
client_opts.useLibUV = true;
|
|
|
|
|
auto clientTCPStore =
|
|
|
|
|
c10::make_intrusive<c10d::TCPStore>("127.0.0.1", client_opts);
|
|
|
|
|
auto clientThread = std::thread([&clientTCPStore] {
|
|
|
|
|
std::string keyPrefix(
|
|
|
|
|
"/default_pg/0//b7dc24de75e482ba2ceb9f9ee20732c25c0166d8//cuda//");
|
|
|
|
|
std::string value("v");
|
|
|
|
|
std::vector<uint8_t> valueBuf(value.begin(), value.end());
|
|
|
|
|
|
|
|
|
|
// split store->set(key, valueBuf) into two requests
|
|
|
|
|
for (int i = 0; i < 10; ++i) {
|
|
|
|
|
std::string key = keyPrefix + std::to_string(i);
|
|
|
|
|
clientTCPStore->_splitSet(key, valueBuf);
|
|
|
|
|
|
|
|
|
|
// check the result on server
|
|
|
|
|
c10d::test::check(*clientTCPStore, key, "v");
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
clientThread.join();
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-28 16:37:46 +00:00
|
|
|
void testMultiTenantStores(bool libUV) {
|
2021-06-05 14:47:29 +00:00
|
|
|
c10d::TCPStoreOptions opts{};
|
|
|
|
|
opts.isServer = true;
|
|
|
|
|
opts.multiTenant = true;
|
2023-08-28 16:37:46 +00:00
|
|
|
opts.useLibUV = libUV;
|
2021-06-05 14:47:29 +00:00
|
|
|
|
|
|
|
|
// Construct two server stores on the same port.
|
|
|
|
|
auto store1 = c10::make_intrusive<c10d::TCPStore>("localhost", opts);
|
|
|
|
|
auto store2 = c10::make_intrusive<c10d::TCPStore>("localhost", opts);
|
|
|
|
|
|
|
|
|
|
// Assert that the two stores share the same server.
|
|
|
|
|
c10d::test::set(*store1, "key0", "value0");
|
|
|
|
|
c10d::test::check(*store2, "key0", "value0");
|
|
|
|
|
|
|
|
|
|
// Dispose the second instance and assert that the server is still alive.
|
|
|
|
|
store2.reset();
|
|
|
|
|
|
|
|
|
|
c10d::test::set(*store1, "key0", "value0");
|
|
|
|
|
c10d::test::check(*store1, "key0", "value0");
|
|
|
|
|
}
|
2023-08-28 16:37:46 +00:00
|
|
|
|
|
|
|
|
TEST(TCPStoreTest, testMultiTenantStores) {
|
|
|
|
|
testMultiTenantStores(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TEST(TCPStoreTest, testMultiTenantStoresUV) {
|
|
|
|
|
testMultiTenantStores(true);
|
|
|
|
|
}
|