Add rendezvous timeout parameter and defaults to StoreHandler::wait()

Summary: Add default rendezvous timeout for RedisStoreHandler and FileStoreHandler.

Reviewed By: pietern

Differential Revision: D4911678

fbshipit-source-id: e69dd03d96214449944d583b20941540cc0b6643
This commit is contained in:
Andrew Dye 2017-04-21 13:49:32 -07:00 committed by Yangqing Jia
parent fa261cdafb
commit f07ec699ee
8 changed files with 47 additions and 8 deletions

View file

@ -15,8 +15,10 @@ std::vector<char> StoreHandlerWrapper::get(const std::string& key) {
return std::vector<char>(str.begin(), str.end());
}
void StoreHandlerWrapper::wait(const std::vector<std::string>& keys) {
handler_.wait(keys);
void StoreHandlerWrapper::wait(
const std::vector<std::string>& keys,
const std::chrono::milliseconds& timeout) {
handler_.wait(keys, timeout);
}
} // namespace gloo

View file

@ -18,7 +18,13 @@ class StoreHandlerWrapper : public ::gloo::rendezvous::Store {
virtual std::vector<char> get(const std::string& key) override;
virtual void wait(const std::vector<std::string>& keys) override;
virtual void wait(const std::vector<std::string>& keys) override {
wait(keys, ::gloo::rendezvous::Store::kDefaultTimeout);
}
virtual void wait(
const std::vector<std::string>& keys,
const std::chrono::milliseconds& timeout) override;
protected:
StoreHandler& handler_;

View file

@ -119,10 +119,18 @@ bool FileStoreHandler::check(const std::vector<std::string>& names) {
return true;
}
void FileStoreHandler::wait(const std::vector<std::string>& names) {
void FileStoreHandler::wait(
const std::vector<std::string>& names,
const std::chrono::milliseconds& timeout) {
// Not using inotify because it doesn't work on many
// shared filesystems (such as NFS).
const auto start = std::chrono::steady_clock::now();
while (!check(names)) {
const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start);
if (timeout != kNoTimeout && elapsed > timeout) {
CAFFE_ENFORCE(false, "Wait timeout for name(s): ", Join(" ", names));
}
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

View file

@ -17,7 +17,9 @@ class FileStoreHandler : public StoreHandler {
virtual bool check(const std::vector<std::string>& names) override;
virtual void wait(const std::vector<std::string>& names) override;
virtual void wait(
const std::vector<std::string>& names,
const std::chrono::milliseconds& timeout = kDefaultTimeout) override;
protected:
std::string basePath_;

View file

@ -89,12 +89,20 @@ bool RedisStoreHandler::check(const std::vector<std::string>& names) {
return reply->integer == names.size();
}
void RedisStoreHandler::wait(const std::vector<std::string>& names) {
void RedisStoreHandler::wait(
const std::vector<std::string>& names,
const std::chrono::milliseconds& timeout) {
// Simple approach: poll...
// Complex approach: use pub/sub.
// Polling is fine for the typical rendezvous use case, as it is
// only done at initialization time and not at run time.
const auto start = std::chrono::steady_clock::now();
while (!check(names)) {
const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start);
if (timeout != kNoTimeout && elapsed > timeout) {
CAFFE_ENFORCE(false, "Wait timeout for name(s): ", Join(" ", names));
}
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

View file

@ -23,7 +23,9 @@ class RedisStoreHandler : public StoreHandler {
virtual bool check(const std::vector<std::string>& names) override;
virtual void wait(const std::vector<std::string>& names) override;
virtual void wait(
const std::vector<std::string>& names,
const std::chrono::milliseconds& timeout = kDefaultTimeout) override;
private:
std::string host_;

View file

@ -6,6 +6,9 @@
namespace caffe2 {
constexpr std::chrono::milliseconds StoreHandler::kDefaultTimeout;
constexpr std::chrono::milliseconds StoreHandler::kNoTimeout;
StoreHandler::~StoreHandler() {
// NOP; definition is here to make sure library contains
// symbols for this abstract class.

View file

@ -1,5 +1,6 @@
#pragma once
#include <chrono>
#include <cstdint>
#include <string>
#include <vector>
@ -8,6 +9,11 @@ namespace caffe2 {
class StoreHandler {
public:
static constexpr std::chrono::milliseconds kDefaultTimeout =
std::chrono::seconds(30);
static constexpr std::chrono::milliseconds kNoTimeout =
std::chrono::milliseconds::zero();
virtual ~StoreHandler();
virtual void set(const std::string& name, const std::string& data) = 0;
@ -18,6 +24,8 @@ class StoreHandler {
virtual bool check(const std::vector<std::string>& names) = 0;
virtual void wait(const std::vector<std::string>& names) = 0;
virtual void wait(
const std::vector<std::string>& names,
const std::chrono::milliseconds& timeout = kDefaultTimeout) = 0;
};
}