Improve process_group_agent() serialization speed (#29785)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29785
TLDR: This change improves process_group's serialization speed:
Serialize_Tensor64: 12.38us -> 1.99us (~-84%)
Deserialize_Tensor64: 33.89us -> 5.62us (~-84%)
Serialize_Tensor1M: 525.74us -> 285.43us (~-45%)
Deserialize_Tensor1M: 892.61us -> 273.68us (~-70%)
After speaking with the jit team, we had consensus that torch::save()/load()
are somewhat high-overhead for RPC serialization, mostly intended for
persistent disk data.
(Particularly, for large tensors, 35% of the time is spent in CRC checking, even
with the fb-side changes to subsitute 40x faster SSE-accelerated crc checking;
Also, for small tensors, the zip container overhead is considerable, as is the
overhead of lexing/parsing an embedded text python program for each RPC).
The jit team encouraged us to use jit::pickler, with the WriteableTensorData
way of outputting result tensors (not the default side-tensor table, or
with pickling the actual tensors). This ends up just pickling some tensor
metadata, and giving us some tensor blobs that we can mindlessly
blit over the wire (they copy to cpu memory if needed).
There is yet no standardized container format for the pickled data
(there is jit::pickle_save() checked in, but but it's experimental,
no load function is yet provided), but they encouraged us to just use
something sensible for this, and possibly revisit later. For now, I made
the directory headers slightly http-inspired.
Note that serialization is just one component of the pipeline, but that
said, we also see reasonable reductions in end-to-end echo times (noisier):
ProcessGroupAgent_Echo(Tensor_Small) 855.25us -> 492.65us (~-42%)
ProcessGroupAgent_Echo(Tensor_1M) 10.82ms -> 6.94ms (~-35%)
ProcessGroupAgent_Echo(Small_NoTensor) 688.82us -> 301.72us (~-56%)
ProcessGroupAgent_Echo(1MB_NoTensor) 4.65ms -> 3.71ms (~-20%)
I moved the "wire serialization" logic to a separate file to assist with
unittesting.
ghstack-source-id: 94694682
Test Plan:
buck test mode/dev-nosan caffe2/test/cpp/api:serialize
buck test mode/dev-nosan caffe2/test/...
Differential Revision: D18493938
fbshipit-source-id: 07ddfe87dbe56472bc944f7d070627052c94a8f4
2019-11-28 17:55:33 +00:00
|
|
|
#include <gtest/gtest.h>
|
|
|
|
|
|
2021-10-19 04:58:26 +00:00
|
|
|
#include <c10/util/irange.h>
|
Improve process_group_agent() serialization speed (#29785)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29785
TLDR: This change improves process_group's serialization speed:
Serialize_Tensor64: 12.38us -> 1.99us (~-84%)
Deserialize_Tensor64: 33.89us -> 5.62us (~-84%)
Serialize_Tensor1M: 525.74us -> 285.43us (~-45%)
Deserialize_Tensor1M: 892.61us -> 273.68us (~-70%)
After speaking with the jit team, we had consensus that torch::save()/load()
are somewhat high-overhead for RPC serialization, mostly intended for
persistent disk data.
(Particularly, for large tensors, 35% of the time is spent in CRC checking, even
with the fb-side changes to subsitute 40x faster SSE-accelerated crc checking;
Also, for small tensors, the zip container overhead is considerable, as is the
overhead of lexing/parsing an embedded text python program for each RPC).
The jit team encouraged us to use jit::pickler, with the WriteableTensorData
way of outputting result tensors (not the default side-tensor table, or
with pickling the actual tensors). This ends up just pickling some tensor
metadata, and giving us some tensor blobs that we can mindlessly
blit over the wire (they copy to cpu memory if needed).
There is yet no standardized container format for the pickled data
(there is jit::pickle_save() checked in, but but it's experimental,
no load function is yet provided), but they encouraged us to just use
something sensible for this, and possibly revisit later. For now, I made
the directory headers slightly http-inspired.
Note that serialization is just one component of the pipeline, but that
said, we also see reasonable reductions in end-to-end echo times (noisier):
ProcessGroupAgent_Echo(Tensor_Small) 855.25us -> 492.65us (~-42%)
ProcessGroupAgent_Echo(Tensor_1M) 10.82ms -> 6.94ms (~-35%)
ProcessGroupAgent_Echo(Small_NoTensor) 688.82us -> 301.72us (~-56%)
ProcessGroupAgent_Echo(1MB_NoTensor) 4.65ms -> 3.71ms (~-20%)
I moved the "wire serialization" logic to a separate file to assist with
unittesting.
ghstack-source-id: 94694682
Test Plan:
buck test mode/dev-nosan caffe2/test/cpp/api:serialize
buck test mode/dev-nosan caffe2/test/...
Differential Revision: D18493938
fbshipit-source-id: 07ddfe87dbe56472bc944f7d070627052c94a8f4
2019-11-28 17:55:33 +00:00
|
|
|
#include <torch/csrc/distributed/rpc/utils.h>
|
2020-04-21 23:58:58 +00:00
|
|
|
#include <torch/torch.h>
|
Improve process_group_agent() serialization speed (#29785)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29785
TLDR: This change improves process_group's serialization speed:
Serialize_Tensor64: 12.38us -> 1.99us (~-84%)
Deserialize_Tensor64: 33.89us -> 5.62us (~-84%)
Serialize_Tensor1M: 525.74us -> 285.43us (~-45%)
Deserialize_Tensor1M: 892.61us -> 273.68us (~-70%)
After speaking with the jit team, we had consensus that torch::save()/load()
are somewhat high-overhead for RPC serialization, mostly intended for
persistent disk data.
(Particularly, for large tensors, 35% of the time is spent in CRC checking, even
with the fb-side changes to subsitute 40x faster SSE-accelerated crc checking;
Also, for small tensors, the zip container overhead is considerable, as is the
overhead of lexing/parsing an embedded text python program for each RPC).
The jit team encouraged us to use jit::pickler, with the WriteableTensorData
way of outputting result tensors (not the default side-tensor table, or
with pickling the actual tensors). This ends up just pickling some tensor
metadata, and giving us some tensor blobs that we can mindlessly
blit over the wire (they copy to cpu memory if needed).
There is yet no standardized container format for the pickled data
(there is jit::pickle_save() checked in, but but it's experimental,
no load function is yet provided), but they encouraged us to just use
something sensible for this, and possibly revisit later. For now, I made
the directory headers slightly http-inspired.
Note that serialization is just one component of the pipeline, but that
said, we also see reasonable reductions in end-to-end echo times (noisier):
ProcessGroupAgent_Echo(Tensor_Small) 855.25us -> 492.65us (~-42%)
ProcessGroupAgent_Echo(Tensor_1M) 10.82ms -> 6.94ms (~-35%)
ProcessGroupAgent_Echo(Small_NoTensor) 688.82us -> 301.72us (~-56%)
ProcessGroupAgent_Echo(1MB_NoTensor) 4.65ms -> 3.71ms (~-20%)
I moved the "wire serialization" logic to a separate file to assist with
unittesting.
ghstack-source-id: 94694682
Test Plan:
buck test mode/dev-nosan caffe2/test/cpp/api:serialize
buck test mode/dev-nosan caffe2/test/...
Differential Revision: D18493938
fbshipit-source-id: 07ddfe87dbe56472bc944f7d070627052c94a8f4
2019-11-28 17:55:33 +00:00
|
|
|
|
|
|
|
|
#include <memory>
|
|
|
|
|
#include <string>
|
|
|
|
|
#include <vector>
|
|
|
|
|
|
2021-06-11 18:14:26 +00:00
|
|
|
using ::testing::IsSubstring;
|
|
|
|
|
|
Improve process_group_agent() serialization speed (#29785)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29785
TLDR: This change improves process_group's serialization speed:
Serialize_Tensor64: 12.38us -> 1.99us (~-84%)
Deserialize_Tensor64: 33.89us -> 5.62us (~-84%)
Serialize_Tensor1M: 525.74us -> 285.43us (~-45%)
Deserialize_Tensor1M: 892.61us -> 273.68us (~-70%)
After speaking with the jit team, we had consensus that torch::save()/load()
are somewhat high-overhead for RPC serialization, mostly intended for
persistent disk data.
(Particularly, for large tensors, 35% of the time is spent in CRC checking, even
with the fb-side changes to subsitute 40x faster SSE-accelerated crc checking;
Also, for small tensors, the zip container overhead is considerable, as is the
overhead of lexing/parsing an embedded text python program for each RPC).
The jit team encouraged us to use jit::pickler, with the WriteableTensorData
way of outputting result tensors (not the default side-tensor table, or
with pickling the actual tensors). This ends up just pickling some tensor
metadata, and giving us some tensor blobs that we can mindlessly
blit over the wire (they copy to cpu memory if needed).
There is yet no standardized container format for the pickled data
(there is jit::pickle_save() checked in, but but it's experimental,
no load function is yet provided), but they encouraged us to just use
something sensible for this, and possibly revisit later. For now, I made
the directory headers slightly http-inspired.
Note that serialization is just one component of the pipeline, but that
said, we also see reasonable reductions in end-to-end echo times (noisier):
ProcessGroupAgent_Echo(Tensor_Small) 855.25us -> 492.65us (~-42%)
ProcessGroupAgent_Echo(Tensor_1M) 10.82ms -> 6.94ms (~-35%)
ProcessGroupAgent_Echo(Small_NoTensor) 688.82us -> 301.72us (~-56%)
ProcessGroupAgent_Echo(1MB_NoTensor) 4.65ms -> 3.71ms (~-20%)
I moved the "wire serialization" logic to a separate file to assist with
unittesting.
ghstack-source-id: 94694682
Test Plan:
buck test mode/dev-nosan caffe2/test/cpp/api:serialize
buck test mode/dev-nosan caffe2/test/...
Differential Revision: D18493938
fbshipit-source-id: 07ddfe87dbe56472bc944f7d070627052c94a8f4
2019-11-28 17:55:33 +00:00
|
|
|
TEST(WireSerialize, Base) {
|
|
|
|
|
auto run = [](const std::string& payload,
|
|
|
|
|
const std::vector<at::Tensor>& tensors) {
|
|
|
|
|
std::string serialized;
|
|
|
|
|
{
|
|
|
|
|
std::vector<char> mpayload(payload.begin(), payload.end());
|
|
|
|
|
std::vector<at::Tensor> mtensors = tensors;
|
|
|
|
|
serialized = torch::distributed::rpc::wireSerialize(
|
|
|
|
|
std::move(mpayload), std::move(mtensors));
|
|
|
|
|
}
|
|
|
|
|
auto deser = torch::distributed::rpc::wireDeserialize(
|
|
|
|
|
serialized.data(), serialized.size());
|
|
|
|
|
EXPECT_EQ(payload.size(), deser.first.size());
|
|
|
|
|
EXPECT_EQ(tensors.size(), deser.second.size());
|
|
|
|
|
if (payload.size() > 0) {
|
|
|
|
|
EXPECT_TRUE(
|
|
|
|
|
memcmp(deser.first.data(), payload.data(), payload.size()) == 0);
|
|
|
|
|
}
|
2021-10-19 04:58:26 +00:00
|
|
|
for (const auto i : c10::irange(tensors.size())) {
|
Improve process_group_agent() serialization speed (#29785)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/29785
TLDR: This change improves process_group's serialization speed:
Serialize_Tensor64: 12.38us -> 1.99us (~-84%)
Deserialize_Tensor64: 33.89us -> 5.62us (~-84%)
Serialize_Tensor1M: 525.74us -> 285.43us (~-45%)
Deserialize_Tensor1M: 892.61us -> 273.68us (~-70%)
After speaking with the jit team, we had consensus that torch::save()/load()
are somewhat high-overhead for RPC serialization, mostly intended for
persistent disk data.
(Particularly, for large tensors, 35% of the time is spent in CRC checking, even
with the fb-side changes to subsitute 40x faster SSE-accelerated crc checking;
Also, for small tensors, the zip container overhead is considerable, as is the
overhead of lexing/parsing an embedded text python program for each RPC).
The jit team encouraged us to use jit::pickler, with the WriteableTensorData
way of outputting result tensors (not the default side-tensor table, or
with pickling the actual tensors). This ends up just pickling some tensor
metadata, and giving us some tensor blobs that we can mindlessly
blit over the wire (they copy to cpu memory if needed).
There is yet no standardized container format for the pickled data
(there is jit::pickle_save() checked in, but but it's experimental,
no load function is yet provided), but they encouraged us to just use
something sensible for this, and possibly revisit later. For now, I made
the directory headers slightly http-inspired.
Note that serialization is just one component of the pipeline, but that
said, we also see reasonable reductions in end-to-end echo times (noisier):
ProcessGroupAgent_Echo(Tensor_Small) 855.25us -> 492.65us (~-42%)
ProcessGroupAgent_Echo(Tensor_1M) 10.82ms -> 6.94ms (~-35%)
ProcessGroupAgent_Echo(Small_NoTensor) 688.82us -> 301.72us (~-56%)
ProcessGroupAgent_Echo(1MB_NoTensor) 4.65ms -> 3.71ms (~-20%)
I moved the "wire serialization" logic to a separate file to assist with
unittesting.
ghstack-source-id: 94694682
Test Plan:
buck test mode/dev-nosan caffe2/test/cpp/api:serialize
buck test mode/dev-nosan caffe2/test/...
Differential Revision: D18493938
fbshipit-source-id: 07ddfe87dbe56472bc944f7d070627052c94a8f4
2019-11-28 17:55:33 +00:00
|
|
|
EXPECT_TRUE(torch::equal(tensors[i], deser.second[i]));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
run("", {});
|
|
|
|
|
run("hi", {});
|
|
|
|
|
run("", {torch::randn({5, 5})});
|
|
|
|
|
run("hi", {torch::randn({5, 5})});
|
|
|
|
|
run("more", {torch::randn({5, 5}), torch::rand({10, 10})});
|
|
|
|
|
}
|
2019-12-19 03:21:58 +00:00
|
|
|
|
|
|
|
|
TEST(WireSerialize, RecopySparseTensors) {
|
|
|
|
|
// Take a 1K row of a 1M tensors, and make sure we don't send across 1M rows.
|
|
|
|
|
constexpr size_t k1K = 1024;
|
|
|
|
|
at::Tensor main = torch::randn({k1K, k1K});
|
|
|
|
|
at::Tensor tiny = main.select(0, 2); // Select a row in the middle
|
|
|
|
|
EXPECT_EQ(tiny.numel(), k1K);
|
2020-05-06 05:41:11 +00:00
|
|
|
EXPECT_EQ(tiny.storage().nbytes() / tiny.dtype().itemsize(), k1K * k1K);
|
2019-12-19 03:21:58 +00:00
|
|
|
auto ser = torch::distributed::rpc::wireSerialize({}, {tiny});
|
|
|
|
|
auto deser = torch::distributed::rpc::wireDeserialize(ser.data(), ser.size());
|
|
|
|
|
EXPECT_TRUE(torch::equal(tiny, deser.second[0]));
|
|
|
|
|
EXPECT_LT(ser.size(), (tiny.element_size() * k1K) + k1K);
|
|
|
|
|
}
|
2020-03-12 18:31:00 +00:00
|
|
|
|
|
|
|
|
TEST(WireSerialize, CloneSparseTensors) {
|
|
|
|
|
constexpr size_t k1K = 1024;
|
|
|
|
|
at::Tensor big = torch::randn({k1K, k1K});
|
|
|
|
|
auto v1 = torch::distributed::rpc::cloneSparseTensors({big});
|
|
|
|
|
EXPECT_EQ(v1.get(0).storage(), big.storage()); // Not cloned
|
|
|
|
|
|
|
|
|
|
at::Tensor tiny = big.select(0, 2); // Select a row in the middle
|
|
|
|
|
auto v2 = torch::distributed::rpc::cloneSparseTensors({tiny});
|
|
|
|
|
EXPECT_NE(&v2.get(0).storage(), &tiny.storage()); // Cloned.
|
|
|
|
|
EXPECT_TRUE(torch::equal(v2.get(0), tiny));
|
|
|
|
|
|
2020-04-21 23:58:58 +00:00
|
|
|
at::Tensor sparse = at::empty({2, 3}, at::dtype<float>().layout(at::kSparse));
|
2020-03-12 18:31:00 +00:00
|
|
|
auto v3 = torch::distributed::rpc::cloneSparseTensors({sparse});
|
|
|
|
|
// There is no storage() to compare, but at least confirm equality.
|
|
|
|
|
EXPECT_TRUE(v3.get(0).is_same(sparse));
|
|
|
|
|
}
|
|
|
|
|
|
2020-04-21 23:58:58 +00:00
|
|
|
TEST(WireSerialize, Errors) {
|
|
|
|
|
auto checkMessage = [](auto&& f, const char* msg) {
|
|
|
|
|
try {
|
|
|
|
|
f();
|
|
|
|
|
FAIL();
|
2021-06-11 18:14:26 +00:00
|
|
|
} catch (const std::exception& e) {
|
|
|
|
|
EXPECT_PRED_FORMAT2(IsSubstring, msg, e.what());
|
2020-04-21 23:58:58 +00:00
|
|
|
} catch (...) {
|
|
|
|
|
FAIL();
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
checkMessage(
|
|
|
|
|
[]() { (void)torch::distributed::rpc::wireDeserialize("", 0); },
|
|
|
|
|
"failed parse");
|
|
|
|
|
checkMessage(
|
|
|
|
|
[]() { (void)torch::distributed::rpc::wireDeserialize(" ", 1); },
|
|
|
|
|
"failed parse");
|
|
|
|
|
auto serialized =
|
|
|
|
|
torch::distributed::rpc::wireSerialize({}, {torch::randn({5, 5})});
|
|
|
|
|
checkMessage(
|
|
|
|
|
[&]() {
|
|
|
|
|
(void)torch::distributed::rpc::wireDeserialize(
|
|
|
|
|
serialized.data(), serialized.size() / 2);
|
|
|
|
|
},
|
|
|
|
|
"failed bounds");
|
|
|
|
|
}
|
|
|
|
|
|
2020-03-12 18:31:00 +00:00
|
|
|
// Enable this once JIT Pickler supports sparse tensors.
|
|
|
|
|
TEST(WireSerialize, DISABLED_Sparse) {
|
2020-04-21 23:58:58 +00:00
|
|
|
at::Tensor main = at::empty({2, 3}, at::dtype<float>().layout(at::kSparse));
|
2020-03-12 18:31:00 +00:00
|
|
|
auto ser = torch::distributed::rpc::wireSerialize({}, {main.to(at::kSparse)});
|
|
|
|
|
auto deser = torch::distributed::rpc::wireDeserialize(ser.data(), ser.size());
|
|
|
|
|
EXPECT_TRUE(torch::equal(main, deser.second[0]));
|
|
|
|
|
}
|