pytorch/test/cpp/rpc/test_wire_serialization.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

106 lines
3.7 KiB
C++
Raw Normal View History

Improve process_group_agent() serialization speed (#29785) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/29785 TLDR: This change improves process_group's serialization speed: Serialize_Tensor64: 12.38us -> 1.99us (~-84%) Deserialize_Tensor64: 33.89us -> 5.62us (~-84%) Serialize_Tensor1M: 525.74us -> 285.43us (~-45%) Deserialize_Tensor1M: 892.61us -> 273.68us (~-70%) After speaking with the jit team, we had consensus that torch::save()/load() are somewhat high-overhead for RPC serialization, mostly intended for persistent disk data. (Particularly, for large tensors, 35% of the time is spent in CRC checking, even with the fb-side changes to subsitute 40x faster SSE-accelerated crc checking; Also, for small tensors, the zip container overhead is considerable, as is the overhead of lexing/parsing an embedded text python program for each RPC). The jit team encouraged us to use jit::pickler, with the WriteableTensorData way of outputting result tensors (not the default side-tensor table, or with pickling the actual tensors). This ends up just pickling some tensor metadata, and giving us some tensor blobs that we can mindlessly blit over the wire (they copy to cpu memory if needed). There is yet no standardized container format for the pickled data (there is jit::pickle_save() checked in, but but it's experimental, no load function is yet provided), but they encouraged us to just use something sensible for this, and possibly revisit later. For now, I made the directory headers slightly http-inspired. Note that serialization is just one component of the pipeline, but that said, we also see reasonable reductions in end-to-end echo times (noisier): ProcessGroupAgent_Echo(Tensor_Small) 855.25us -> 492.65us (~-42%) ProcessGroupAgent_Echo(Tensor_1M) 10.82ms -> 6.94ms (~-35%) ProcessGroupAgent_Echo(Small_NoTensor) 688.82us -> 301.72us (~-56%) ProcessGroupAgent_Echo(1MB_NoTensor) 4.65ms -> 3.71ms (~-20%) I moved the "wire serialization" logic to a separate file to assist with unittesting. ghstack-source-id: 94694682 Test Plan: buck test mode/dev-nosan caffe2/test/cpp/api:serialize buck test mode/dev-nosan caffe2/test/... Differential Revision: D18493938 fbshipit-source-id: 07ddfe87dbe56472bc944f7d070627052c94a8f4
2019-11-28 17:55:33 +00:00
#include <gtest/gtest.h>
#include <c10/util/irange.h>
Improve process_group_agent() serialization speed (#29785) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/29785 TLDR: This change improves process_group's serialization speed: Serialize_Tensor64: 12.38us -> 1.99us (~-84%) Deserialize_Tensor64: 33.89us -> 5.62us (~-84%) Serialize_Tensor1M: 525.74us -> 285.43us (~-45%) Deserialize_Tensor1M: 892.61us -> 273.68us (~-70%) After speaking with the jit team, we had consensus that torch::save()/load() are somewhat high-overhead for RPC serialization, mostly intended for persistent disk data. (Particularly, for large tensors, 35% of the time is spent in CRC checking, even with the fb-side changes to subsitute 40x faster SSE-accelerated crc checking; Also, for small tensors, the zip container overhead is considerable, as is the overhead of lexing/parsing an embedded text python program for each RPC). The jit team encouraged us to use jit::pickler, with the WriteableTensorData way of outputting result tensors (not the default side-tensor table, or with pickling the actual tensors). This ends up just pickling some tensor metadata, and giving us some tensor blobs that we can mindlessly blit over the wire (they copy to cpu memory if needed). There is yet no standardized container format for the pickled data (there is jit::pickle_save() checked in, but but it's experimental, no load function is yet provided), but they encouraged us to just use something sensible for this, and possibly revisit later. For now, I made the directory headers slightly http-inspired. Note that serialization is just one component of the pipeline, but that said, we also see reasonable reductions in end-to-end echo times (noisier): ProcessGroupAgent_Echo(Tensor_Small) 855.25us -> 492.65us (~-42%) ProcessGroupAgent_Echo(Tensor_1M) 10.82ms -> 6.94ms (~-35%) ProcessGroupAgent_Echo(Small_NoTensor) 688.82us -> 301.72us (~-56%) ProcessGroupAgent_Echo(1MB_NoTensor) 4.65ms -> 3.71ms (~-20%) I moved the "wire serialization" logic to a separate file to assist with unittesting. ghstack-source-id: 94694682 Test Plan: buck test mode/dev-nosan caffe2/test/cpp/api:serialize buck test mode/dev-nosan caffe2/test/... Differential Revision: D18493938 fbshipit-source-id: 07ddfe87dbe56472bc944f7d070627052c94a8f4
2019-11-28 17:55:33 +00:00
#include <torch/csrc/distributed/rpc/utils.h>
#include <torch/torch.h>
Improve process_group_agent() serialization speed (#29785) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/29785 TLDR: This change improves process_group's serialization speed: Serialize_Tensor64: 12.38us -> 1.99us (~-84%) Deserialize_Tensor64: 33.89us -> 5.62us (~-84%) Serialize_Tensor1M: 525.74us -> 285.43us (~-45%) Deserialize_Tensor1M: 892.61us -> 273.68us (~-70%) After speaking with the jit team, we had consensus that torch::save()/load() are somewhat high-overhead for RPC serialization, mostly intended for persistent disk data. (Particularly, for large tensors, 35% of the time is spent in CRC checking, even with the fb-side changes to subsitute 40x faster SSE-accelerated crc checking; Also, for small tensors, the zip container overhead is considerable, as is the overhead of lexing/parsing an embedded text python program for each RPC). The jit team encouraged us to use jit::pickler, with the WriteableTensorData way of outputting result tensors (not the default side-tensor table, or with pickling the actual tensors). This ends up just pickling some tensor metadata, and giving us some tensor blobs that we can mindlessly blit over the wire (they copy to cpu memory if needed). There is yet no standardized container format for the pickled data (there is jit::pickle_save() checked in, but but it's experimental, no load function is yet provided), but they encouraged us to just use something sensible for this, and possibly revisit later. For now, I made the directory headers slightly http-inspired. Note that serialization is just one component of the pipeline, but that said, we also see reasonable reductions in end-to-end echo times (noisier): ProcessGroupAgent_Echo(Tensor_Small) 855.25us -> 492.65us (~-42%) ProcessGroupAgent_Echo(Tensor_1M) 10.82ms -> 6.94ms (~-35%) ProcessGroupAgent_Echo(Small_NoTensor) 688.82us -> 301.72us (~-56%) ProcessGroupAgent_Echo(1MB_NoTensor) 4.65ms -> 3.71ms (~-20%) I moved the "wire serialization" logic to a separate file to assist with unittesting. ghstack-source-id: 94694682 Test Plan: buck test mode/dev-nosan caffe2/test/cpp/api:serialize buck test mode/dev-nosan caffe2/test/... Differential Revision: D18493938 fbshipit-source-id: 07ddfe87dbe56472bc944f7d070627052c94a8f4
2019-11-28 17:55:33 +00:00
#include <memory>
#include <string>
#include <vector>
using ::testing::IsSubstring;
Improve process_group_agent() serialization speed (#29785) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/29785 TLDR: This change improves process_group's serialization speed: Serialize_Tensor64: 12.38us -> 1.99us (~-84%) Deserialize_Tensor64: 33.89us -> 5.62us (~-84%) Serialize_Tensor1M: 525.74us -> 285.43us (~-45%) Deserialize_Tensor1M: 892.61us -> 273.68us (~-70%) After speaking with the jit team, we had consensus that torch::save()/load() are somewhat high-overhead for RPC serialization, mostly intended for persistent disk data. (Particularly, for large tensors, 35% of the time is spent in CRC checking, even with the fb-side changes to subsitute 40x faster SSE-accelerated crc checking; Also, for small tensors, the zip container overhead is considerable, as is the overhead of lexing/parsing an embedded text python program for each RPC). The jit team encouraged us to use jit::pickler, with the WriteableTensorData way of outputting result tensors (not the default side-tensor table, or with pickling the actual tensors). This ends up just pickling some tensor metadata, and giving us some tensor blobs that we can mindlessly blit over the wire (they copy to cpu memory if needed). There is yet no standardized container format for the pickled data (there is jit::pickle_save() checked in, but but it's experimental, no load function is yet provided), but they encouraged us to just use something sensible for this, and possibly revisit later. For now, I made the directory headers slightly http-inspired. Note that serialization is just one component of the pipeline, but that said, we also see reasonable reductions in end-to-end echo times (noisier): ProcessGroupAgent_Echo(Tensor_Small) 855.25us -> 492.65us (~-42%) ProcessGroupAgent_Echo(Tensor_1M) 10.82ms -> 6.94ms (~-35%) ProcessGroupAgent_Echo(Small_NoTensor) 688.82us -> 301.72us (~-56%) ProcessGroupAgent_Echo(1MB_NoTensor) 4.65ms -> 3.71ms (~-20%) I moved the "wire serialization" logic to a separate file to assist with unittesting. ghstack-source-id: 94694682 Test Plan: buck test mode/dev-nosan caffe2/test/cpp/api:serialize buck test mode/dev-nosan caffe2/test/... Differential Revision: D18493938 fbshipit-source-id: 07ddfe87dbe56472bc944f7d070627052c94a8f4
2019-11-28 17:55:33 +00:00
TEST(WireSerialize, Base) {
auto run = [](const std::string& payload,
const std::vector<at::Tensor>& tensors) {
std::string serialized;
{
std::vector<char> mpayload(payload.begin(), payload.end());
std::vector<at::Tensor> mtensors = tensors;
serialized = torch::distributed::rpc::wireSerialize(
std::move(mpayload), std::move(mtensors));
}
auto deser = torch::distributed::rpc::wireDeserialize(
serialized.data(), serialized.size());
EXPECT_EQ(payload.size(), deser.first.size());
EXPECT_EQ(tensors.size(), deser.second.size());
if (payload.size() > 0) {
EXPECT_TRUE(
memcmp(deser.first.data(), payload.data(), payload.size()) == 0);
}
for (const auto i : c10::irange(tensors.size())) {
Improve process_group_agent() serialization speed (#29785) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/29785 TLDR: This change improves process_group's serialization speed: Serialize_Tensor64: 12.38us -> 1.99us (~-84%) Deserialize_Tensor64: 33.89us -> 5.62us (~-84%) Serialize_Tensor1M: 525.74us -> 285.43us (~-45%) Deserialize_Tensor1M: 892.61us -> 273.68us (~-70%) After speaking with the jit team, we had consensus that torch::save()/load() are somewhat high-overhead for RPC serialization, mostly intended for persistent disk data. (Particularly, for large tensors, 35% of the time is spent in CRC checking, even with the fb-side changes to subsitute 40x faster SSE-accelerated crc checking; Also, for small tensors, the zip container overhead is considerable, as is the overhead of lexing/parsing an embedded text python program for each RPC). The jit team encouraged us to use jit::pickler, with the WriteableTensorData way of outputting result tensors (not the default side-tensor table, or with pickling the actual tensors). This ends up just pickling some tensor metadata, and giving us some tensor blobs that we can mindlessly blit over the wire (they copy to cpu memory if needed). There is yet no standardized container format for the pickled data (there is jit::pickle_save() checked in, but but it's experimental, no load function is yet provided), but they encouraged us to just use something sensible for this, and possibly revisit later. For now, I made the directory headers slightly http-inspired. Note that serialization is just one component of the pipeline, but that said, we also see reasonable reductions in end-to-end echo times (noisier): ProcessGroupAgent_Echo(Tensor_Small) 855.25us -> 492.65us (~-42%) ProcessGroupAgent_Echo(Tensor_1M) 10.82ms -> 6.94ms (~-35%) ProcessGroupAgent_Echo(Small_NoTensor) 688.82us -> 301.72us (~-56%) ProcessGroupAgent_Echo(1MB_NoTensor) 4.65ms -> 3.71ms (~-20%) I moved the "wire serialization" logic to a separate file to assist with unittesting. ghstack-source-id: 94694682 Test Plan: buck test mode/dev-nosan caffe2/test/cpp/api:serialize buck test mode/dev-nosan caffe2/test/... Differential Revision: D18493938 fbshipit-source-id: 07ddfe87dbe56472bc944f7d070627052c94a8f4
2019-11-28 17:55:33 +00:00
EXPECT_TRUE(torch::equal(tensors[i], deser.second[i]));
}
};
run("", {});
run("hi", {});
run("", {torch::randn({5, 5})});
run("hi", {torch::randn({5, 5})});
run("more", {torch::randn({5, 5}), torch::rand({10, 10})});
}
Avoid sending large unneeded data over wire in process_group_agent. (#31357) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/31357 If a user selects a subset of a Tensor and sends it in an RPC, we were sending the whole original Tensor Storage over the network. While this sounds reasonable, in practice, we observed view-like Tensors being sent over rpc, where only 1% of the data in the provided Tensor's Storage was actually used/needed. The simple solution here is to just force a clone in the serializer code if we see that less than (arbitrary) half the bits are used, and the tensor is more than a nominal few KB. Add related tests to ensure this doesn't break. An alternate approach would be to modify the Pickler. That said, since Pickler is shared by more components, the logic might be harder to tailor appropriately at that layer (particularly given that the Pickler has explicit logic to share a single Storage* among several Tensors that commonly point to the same Storage*). It's possible that we might want to further refine the basic thresholds in this change. In practice, we've seen a mostly bimodal distribution thus far for the percent of Tensor Storage referred by a Tensor in observed rpcs (i.e. either 90%+ or sub-10% of the Storage referenced), hence the existing 50% threshold here is probably not an unreasonable starting point. ghstack-source-id: 95925474 Test Plan: buck test mode/dev caffe2/test/cpp/rpc/... Differential Revision: D19137056 fbshipit-source-id: e2b3a4dd0cc6e1de820fd0740aa1d59883dbf8d4
2019-12-19 03:21:58 +00:00
TEST(WireSerialize, RecopySparseTensors) {
// Take a 1K row of a 1M tensors, and make sure we don't send across 1M rows.
constexpr size_t k1K = 1024;
at::Tensor main = torch::randn({k1K, k1K});
at::Tensor tiny = main.select(0, 2); // Select a row in the middle
EXPECT_EQ(tiny.numel(), k1K);
EXPECT_EQ(tiny.storage().nbytes() / tiny.dtype().itemsize(), k1K * k1K);
Avoid sending large unneeded data over wire in process_group_agent. (#31357) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/31357 If a user selects a subset of a Tensor and sends it in an RPC, we were sending the whole original Tensor Storage over the network. While this sounds reasonable, in practice, we observed view-like Tensors being sent over rpc, where only 1% of the data in the provided Tensor's Storage was actually used/needed. The simple solution here is to just force a clone in the serializer code if we see that less than (arbitrary) half the bits are used, and the tensor is more than a nominal few KB. Add related tests to ensure this doesn't break. An alternate approach would be to modify the Pickler. That said, since Pickler is shared by more components, the logic might be harder to tailor appropriately at that layer (particularly given that the Pickler has explicit logic to share a single Storage* among several Tensors that commonly point to the same Storage*). It's possible that we might want to further refine the basic thresholds in this change. In practice, we've seen a mostly bimodal distribution thus far for the percent of Tensor Storage referred by a Tensor in observed rpcs (i.e. either 90%+ or sub-10% of the Storage referenced), hence the existing 50% threshold here is probably not an unreasonable starting point. ghstack-source-id: 95925474 Test Plan: buck test mode/dev caffe2/test/cpp/rpc/... Differential Revision: D19137056 fbshipit-source-id: e2b3a4dd0cc6e1de820fd0740aa1d59883dbf8d4
2019-12-19 03:21:58 +00:00
auto ser = torch::distributed::rpc::wireSerialize({}, {tiny});
auto deser = torch::distributed::rpc::wireDeserialize(ser.data(), ser.size());
EXPECT_TRUE(torch::equal(tiny, deser.second[0]));
EXPECT_LT(ser.size(), (tiny.element_size() * k1K) + k1K);
}
TEST(WireSerialize, CloneSparseTensors) {
constexpr size_t k1K = 1024;
at::Tensor big = torch::randn({k1K, k1K});
auto v1 = torch::distributed::rpc::cloneSparseTensors({big});
EXPECT_EQ(v1.get(0).storage(), big.storage()); // Not cloned
at::Tensor tiny = big.select(0, 2); // Select a row in the middle
auto v2 = torch::distributed::rpc::cloneSparseTensors({tiny});
EXPECT_NE(&v2.get(0).storage(), &tiny.storage()); // Cloned.
EXPECT_TRUE(torch::equal(v2.get(0), tiny));
at::Tensor sparse = at::empty({2, 3}, at::dtype<float>().layout(at::kSparse));
auto v3 = torch::distributed::rpc::cloneSparseTensors({sparse});
// There is no storage() to compare, but at least confirm equality.
EXPECT_TRUE(v3.get(0).is_same(sparse));
}
TEST(WireSerialize, Errors) {
auto checkMessage = [](auto&& f, const char* msg) {
try {
f();
FAIL();
} catch (const std::exception& e) {
EXPECT_PRED_FORMAT2(IsSubstring, msg, e.what());
} catch (...) {
FAIL();
}
};
checkMessage(
[]() { (void)torch::distributed::rpc::wireDeserialize("", 0); },
"failed parse");
checkMessage(
[]() { (void)torch::distributed::rpc::wireDeserialize(" ", 1); },
"failed parse");
auto serialized =
torch::distributed::rpc::wireSerialize({}, {torch::randn({5, 5})});
checkMessage(
[&]() {
(void)torch::distributed::rpc::wireDeserialize(
serialized.data(), serialized.size() / 2);
},
"failed bounds");
}
// Enable this once JIT Pickler supports sparse tensors.
TEST(WireSerialize, DISABLED_Sparse) {
at::Tensor main = at::empty({2, 3}, at::dtype<float>().layout(at::kSparse));
auto ser = torch::distributed::rpc::wireSerialize({}, {main.to(at::kSparse)});
auto deser = torch::distributed::rpc::wireDeserialize(ser.data(), ser.size());
EXPECT_TRUE(torch::equal(main, deser.second[0]));
}