Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57635
Note: this PR looks massive, but it's just one simple change, codemodded many times.
In many cases, a callback needs to access the value/error produced by the parent future. In Python this was easy because the callback was invoked with the parent future as argument, and could thus inspect it. In C++ the callbacks didn't take any arguments, thus in many cases we worked around this by capturing the future in its own callback. This is risky (leads to reference cycle and thus memory leak) and must be done carefully (spoiler: sometimes we weren't).
ghstack-source-id: 128296580
Test Plan: CI
Reviewed By: wanchaol
Differential Revision: D28178783
fbshipit-source-id: 6de02c4568be42123372edc008f630d5ddae0081
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/57294
With the advent of CPUs in the device maps, and to be more generic (e.g., to support AMD GPUs), and to avoid conversions when passing to Future and RRef and such, it's easier to use Devices instead of DeviceIndices. This started by just migrating the TensorPipe agent but the RPC layer is quite intertwined so I had to migrate a lot of stuff.
ghstack-source-id: 127916562
Test Plan: CI
Reviewed By: mrshenli
Differential Revision: D28092733
fbshipit-source-id: 024dcb3648c5898ab13e770413c43958f04f1a8a
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/56807
If I understand correctly, there's no reason to create your own instance of these global singleton types.
ghstack-source-id: 127312270
Test Plan: CI
Reviewed By: SplitInfinity
Differential Revision: D27973447
fbshipit-source-id: f12df69d185f1baaa45f2ac6eac70570a7a65912
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/56346
Now that TensorPipe's API has `targetDevice`, use that instead of
manually writing the CUDA device index in `metadata`.
Test Plan: CI
Reviewed By: lw
Differential Revision: D27703235
fbshipit-source-id: c5b620e3b3ce619367412efdbe9fa3778f6b8869
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/55136
This will ease the transition to the new API where `Buffer` does not
store a length anymore.
Test Plan: CI
Reviewed By: lw
Differential Revision: D27466385
fbshipit-source-id: 9a167f8c501455a3ab49ce75257c69d8b4869925
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/54251
Pull Request resolved: https://github.com/pytorch/tensorpipe/pull/324
In order to merge the channel hierarchies, we need a generic `Buffer` type, that can wrap either a `CpuBuffer` or a `CudaBuffer`.
The constraints are that, since this type is used by the channels, it cannot explicitly refer to `CudaBuffer`. We propose here a type-erasure based solution, with small-buffer optimization to avoid heap-allocating the wrapped concrete buffer.
This is a new version of D27001339 (c618dc13d2) which broke PyTorch OSS build.
Test Plan: CI
Reviewed By: lw, mrshenli
Differential Revision: D27156053
fbshipit-source-id: 4244302af33a3be91dcd06093c0d6045d081d3cc
Summary:
Pull Request resolved: https://github.com/pytorch/tensorpipe/pull/322
Pull Request resolved: https://github.com/pytorch/pytorch/pull/54145
In order to merge the channel hierarchies, we need a generic `Buffer` type, that can wrap either a `CpuBuffer` or a `CudaBuffer`.
The constraints are that, since this type is used by the channels, it cannot explicitly refer to `CudaBuffer`. We propose here a type-erasure based solution, with small-buffer optimization to avoid heap-allocating the wrapped concrete buffer.
ghstack-source-id: 124131499
Test Plan: CI
Reviewed By: lw
Differential Revision: D27001339
fbshipit-source-id: 26d7dc19d69d7e3336df6fd4ff6ec118dc17c5b6
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/53662
Add a base processgroup::options so that we can do inheritance and
provide
a universal option API in python
Test Plan: Imported from OSS
Reviewed By: rohan-varma
Differential Revision: D26968856
Pulled By: wanchaol
fbshipit-source-id: 858f4b61b27aecb1943959bba68f8c14114f67d8
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/51697
Refactors the rest of rref_context, specifically pendingOwners map and `getOwnerRRef` to use JitFuture.
ghstack-source-id: 122037611
Test Plan: CI
Reviewed By: wanchaol
Differential Revision: D26243268
fbshipit-source-id: ab8874c8253274e8fe50dcd7291e0655a8f3f1df
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/51939
TestTrainingLoop - TestE2ETensorPipe was flaky since there would still
be inflight background RPCs running as we performed the assertions. This
resulted in these assertions failing since we didn't wait for all RPCs on the
agent to finish.
To resolve this issue, in this PR we join() and shutdown() the RPC agent to
ensure no further RPCs are done. Then we assertion the map sizes to ensure no
leaks occurred.
In addition to this, added messageIdToTimeout map to lookup the appropriate
timeout for a messageId. This ensures we remove the appropriate entry from the
map. The previous solution was passing the expirationTime through the lambda,
but it is not guaranteed the lambda would read the response of the request we
just sent out.
ghstack-source-id: 121412604
Test Plan:
1) unit tests
2) waitforbuildbot
Reviewed By: rohan-varma
Differential Revision: D26331585
fbshipit-source-id: a41e0534d7d4dfd240446e661e5541311931c7d7
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/44859
TensorPipe's `set_device_map` option was applied during the forward
pass. However, if we ran the backward pass for the graph we would not
automatically pick up the reverse device mapping.
As a result, users had to specify both forward and backward device mapping
which is very tedious to do.
In this PR, I've added this functionality such that TensorPipe automatically
picks up the reverse device mapping during the backward pass. This is done by
storing the appropriate device mapping in the "recv" autograd function for
distributed autograd.
#Closes: https://github.com/pytorch/pytorch/issues/44170
ghstack-source-id: 119950842
Test Plan:
1) waitforbuildbot
2) Unit test added.
Reviewed By: mrshenli
Differential Revision: D23751975
fbshipit-source-id: 2717d0ef5bde3db029a6172d98aad95734d52140
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/50564
When an RPC was sent, the associated future was stored in two maps:
pendingResponseMessage_ and timeoutMap_. Once the response was received, the
entry was only removed from pendingResponseMessage_ and not timeoutMap_. The
pollTimedoudRpcs method then eventually removed the entry from timeoutMap_
after the time out duration had passed.
Although, in scenarios where there is a large timeout and a large number of
RPCs being used, it is very easy for the timeoutMap_ to grow without any
bounds. This was discovered in https://github.com/pytorch/pytorch/issues/50522.
To fix this issue, I've added some code to cleanup timeoutMap_ as well once we
receive a response.
ghstack-source-id: 119925182
Test Plan:
1) Unit test added.
2) Tested with repro in https://github.com/pytorch/pytorch/issues/50522
#Closes: https://github.com/pytorch/pytorch/issues/50522
Reviewed By: mrshenli
Differential Revision: D25919650
fbshipit-source-id: a0a42647e706d598fce2ca2c92963e540b9d9dbb
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45867
In most cases the lock ordering was hold a lock in local autograd and
then hold a lock in DistAutogradContext.
In case of `set_exception_without_signal` the lock order was in reverse and as
a result we saw potential deadlock issues in our TSAN tests. To fix this, I
removed the lock and instead just used std::atomic exchange.
In addition to this, I fixed TestE2E to ensure that we use the appropriate
timeout.
TestE2EProcessGroup was flaky for these two reasons and now is fixed.
ghstack-source-id: 113592709
Test Plan: waitforbuildbot.
Reviewed By: albanD
Differential Revision: D24120962
fbshipit-source-id: 12447b84ceae772b91e9a183c90d1e6340f44e66
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/45014
Pull Request resolved: https://github.com/pytorch/tensorpipe/pull/219
Pull Request resolved: https://github.com/pytorch/tensorpipe/pull/212
+ Introduce buffer.h defining the buffer struct(s). The `CpuBuffer`
struct is always defined, while the `CudaBuffer` struct is defined
only when `TENSORPIPE_SUPPORTS_CUDA` is true.
+ Update all channels to take a `CpuBuffer` or `CudaBuffer` for
`send`/`recv` rather than a raw pointer and a length.
+ Make the base `Channel`/`Context` classes templated on `TBuffer`,
effectively creating two channel hierarchies (one for CPU channels,
one for CUDA channels).
+ Update the Pipe and the generic channel tests to use the new API. So
far, generic channel tests are CPU only, and tests for the CUDA IPC
channel are (temporarily) disabled. A subsequent PR will take care of
refactoring tests so that generic tests work for CUDA channels. An
other PR will add support for CUDA tensors in the Pipe.
Differential Revision: D23598033
Test Plan: Imported from OSS
Reviewed By: lw
Pulled By: beauby
fbshipit-source-id: 1d6c3f91e288420858835cd5e7962e8da051b44b
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/42637
This commit enables sending non-CPU tensors through RPC using
TensorPipe backend. Users can configure device mappings by calling
set_map_location on `TensorPipeRpcBackendOptions`. Internally,
the `init_rpc` API verifies the correctness of device mappings. It
will shutdown RPC if the check failed, or proceed and pass global
mappings to `TensorPipeAgent` if the check was successful. For serde,
we added a device indices field to TensorPipe read and write buffers,
which should be either empty (all tensors must be on CPU) or match
the tensors in order and number in the RPC message. This commit
does not yet avoid zero-copy, the tensor is always moved to CPU
on the sender and then moved to the specified device on the receiver.
Test Plan: Imported from OSS
Reviewed By: izdeby
Differential Revision: D23011572
Pulled By: mrshenli
fbshipit-source-id: 62b617eed91237d4e9926bc8551db78b822a1187
Summary:
test_e2e_tensorpipe depends on ProcessGroupGloo, therefore it could not be tested with Gloo disabled
Otherwise, it re-introduces https://github.com/pytorch/pytorch/issues/42776
Pull Request resolved: https://github.com/pytorch/pytorch/pull/43041
Reviewed By: lw
Differential Revision: D23122101
Pulled By: malfet
fbshipit-source-id: a8a088b6522a3bc888238ede5c2d589b83c6ea94
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/42522
Main changes:
- Consolidated CMake files to have a single entry point, rather than having a specialized one for PyTorch.
- Changed the way the preprocessor flags are provided, and changed their name.
There were a few instances in PyTorch's CMake files where we were directly adding TensorPipe's source directory as an include path, which however doesn't contain the auto-generated header we now added. We fix that by adding the `tensorpipe` CMake target as a dependency, so that the include paths defined by TensorPipe are used, which contain that auto-generated header. So instead we link those targets to the tensorpipe target in order for them to pick up the correct include directories.
I'm turning off SHM and CMA for now because they have never been covered by the CI. I'll enable them in a separate PR so that if they turn out to be flaky we can revert that change without reverting this one.
Test Plan: CI
Reviewed By: malfet
Differential Revision: D22959472
fbshipit-source-id: 1959a41c4a66ef78bf0f3bd5e3964969a2a1bf67
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/42225
Main changes:
- Consolidated CMake files to have a single entry point, rather than having a specialized one for PyTorch.
- Changed the way the preprocessor flags are provided, and changed their name.
There were a few instances in PyTorch's CMake files where we were directly adding TensorPipe's source directory as an include path, which however doesn't contain the auto-generated header we now added. We fix that by adding the `tensorpipe` CMake target as a dependency, so that the include paths defined by TensorPipe are used, which contain that auto-generated header.
I'm turning off SHM and CMA for now because they have never been covered by the CI. I'll enable them in a separate PR so that if they turn out to be flaky we can revert that change without reverting this one.
Test Plan: CircleCI is all green.
Reviewed By: beauby
Differential Revision: D22812445
fbshipit-source-id: e6d824bb28f5afe75fd765de0430968174f3531f
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/36893
Adding an end to end test for running a simple training loop in C++
for the distributed RPC framework.
The goal of this change is to enable LeakSanitizer and potentially catch memory
leaks in the Future. Enabling LSAN with python multiprocessing is tricky and we
haven't found a solution for this. As a result, adding a C++ test that triggers
most of the critical codepaths would be good for now.
As an example, this unit test would've caught the memory leak fixed by:
https://github.com/pytorch/pytorch/pull/31030
ghstack-source-id: 107781167
Test Plan:
1) Verify the test catches memory leaks.
2) waitforbuildbot
Reviewed By: mrshenli
Differential Revision: D21112208
fbshipit-source-id: 4eb2a6b409253108f6b6e14352e593d250c7a64d
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/39010
The initial version of the serialization for the TensorPipe RPC agent (i.e., the conversion from rpc::Message to tensorpipe::Message) worker around a limitation of TensorPipe of only allowing one payload per message by pickling each tensor separately and storing the pickles as metadata (which is a less efficient way of sending data over, as it goes through more copies). Having now lifter that limitation we can now improve the way we serialize. We now put the type and the id as their own payloads, we do a single pickling pass for all the tensors of the message (which allows us to deduplicate them) and store the pickle as a payload. My impression is that pickling is a somewhat costly operation, so reducing the number of times we do it should be beneficial for performance. For this same reason, another change I've done here is separate the allocation of the buffers from the deserialization. This will allow us (in the future) to perform the allocation on the I/O event loop but perform the unpickling in the worker thread, thus keeping the event loop more responsive.
ghstack-source-id: 104810740
Test Plan: RPC tests
Differential Revision: D21716067
fbshipit-source-id: c1475cc78afdcf0820a485ffd98c91abb35796c7
Summary:
In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/37919
ghstack-source-id: 103572164
Test Plan:
On both workers
```
import os
import torch
import torch.distributed.rpc as rpc
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "8765"
```
On worker 0
```
rpc.init_rpc(name="foo", rank=0, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0}))
```
On worker 1
```
rpc.init_rpc(name="bar", rank=1, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0}))
```
On worker 0
```
In [15]: rpc.rpc_sync("bar", torch.add, args=(torch.full((2,2), 1), torch.full((2,2), 2)))
Out[15]:
tensor([[3., 3.],
[3., 3.]])
In [16]: rpc.rpc_sync("bar", torch.add, args=(1, 2))
Out[16]: 3
```
Differential Revision: D21425536
fbshipit-source-id: a0ec2be825556b39aff018a2834baf815a6d8fa5
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/37776
* Remove type-specific size tracking in favor of byte size tracking in Storage and StorageImpl
* Changed numel() and set_numel() to nbytes() and set_nbytes()
* Added enum argument to Storage/StorageImpl constructor to indicate new meaning of the size parameter
* Update all callers of the changed API
Part of issue https://github.com/pytorch/pytorch/issues/33950
Pull Request resolved: https://github.com/pytorch/pytorch/pull/37028
Differential Revision: D21171334
Pulled By: ezyang
fbshipit-source-id: 37329a379de9a3a83cc5e9007e455a3e1c2d10b8
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/36197
Create APIs to convert between rpc::message and tensorpipe::message
1. tensorpipeSerialize() - converts rpc::message to tensorpipe::message without memory copy (tensors).
2. tensorpipeAllocateMessage - allocates rpc::message based on received tensorpipe descriptor to prepare memory-copy-free receiving.
Test Plan: buck test caffe2/test/cpp/rpc:test_tensorpipe_serialization
Reviewed By: lw
Differential Revision: D20084125
fbshipit-source-id: ffbc310f93443e50261aed752be0fe176610dd2a
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/36976
The bounds check and the read were swapped in two places - I noticed
ASAN complaining in an unrelated change on an erroneous buffer.
Adding a couple simple test cases.
ghstack-source-id: 102606986
Test Plan: buck test mode/dev caffe2/test/cpp/rpc:
Differential Revision: D21148936
fbshipit-source-id: 7ec5007535f7310437ac1b9a72852a223b9dd29a
Summary:
Ignore mixed upper-case/lower-case style for now
Fix space between function and its arguments violation
Pull Request resolved: https://github.com/pytorch/pytorch/pull/35574
Test Plan: CI
Differential Revision: D20712969
Pulled By: malfet
fbshipit-source-id: 0012d430aed916b4518599a0b535e82d15721f78
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/34626
We need to check has_storage() before looking at it in
cloneSparseTensors(), to avoid gratuitously throwing.
Ideally, we'd add a test for this (I wrote one up but had to disable it),
but won't work until JIT Pickler supports sparse tensors.
ghstack-source-id: 100018077
Test Plan: buck test mode/dev-nosan caffe2/torch/fb/distributed/thriftRpcAgent/...
Differential Revision: D20399971
fbshipit-source-id: 5debfa8140eb1f949d37336330223962cc320abc
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/31357
If a user selects a subset of a Tensor and sends it in an RPC, we were sending
the whole original Tensor Storage over the network.
While this sounds reasonable, in practice, we observed view-like Tensors being sent
over rpc, where only 1% of the data in the provided Tensor's Storage was
actually used/needed.
The simple solution here is to just force a clone in the serializer code if we see that
less than (arbitrary) half the bits are used, and the tensor is more than a nominal few KB.
Add related tests to ensure this doesn't break.
An alternate approach would be to modify the Pickler. That said, since Pickler is shared by more
components, the logic might be harder to tailor appropriately at that layer (particularly
given that the Pickler has explicit logic to share a single Storage* among several Tensors
that commonly point to the same Storage*).
It's possible that we might want to further refine the basic thresholds in this change.
In practice, we've seen a mostly bimodal distribution thus far for the percent of Tensor
Storage referred by a Tensor in observed rpcs (i.e. either 90%+ or sub-10% of the Storage
referenced), hence the existing 50% threshold here is probably not an unreasonable
starting point.
ghstack-source-id: 95925474
Test Plan: buck test mode/dev caffe2/test/cpp/rpc/...
Differential Revision: D19137056
fbshipit-source-id: e2b3a4dd0cc6e1de820fd0740aa1d59883dbf8d4