From 9fd6722fc9068eeaa176754acb315fc7e0f6416c Mon Sep 17 00:00:00 2001 From: Ke Wen Date: Tue, 28 Jan 2025 10:58:09 -0800 Subject: [PATCH] [c10d] Add NCCL memory allocator (#145675) This PR implements a small UI improvement over #133603. It prepares a NCCL memory allocator in torch cpp and then pybind's it out, so that user can directly use it. UI: ``` pool = torch.cuda.MemPool(backend.mem_allocator) with torch.cuda.use_mem_pool(pool): tensor = torch.arange(1024 * 1024 * 2, device=device) ``` Pull Request resolved: https://github.com/pytorch/pytorch/pull/145675 Approved by: https://github.com/syed-ahmed, https://github.com/wconstab --- test/distributed/test_c10d_nccl.py | 45 ++----------------- torch/_C/_distributed_c10d.pyi | 2 + torch/csrc/distributed/c10d/Backend.hpp | 8 ++++ .../distributed/c10d/ProcessGroupNCCL.cpp | 32 +++++++++++++ .../distributed/c10d/ProcessGroupNCCL.hpp | 2 + torch/csrc/distributed/c10d/init.cpp | 4 +- 6 files changed, 50 insertions(+), 43 deletions(-) diff --git a/test/distributed/test_c10d_nccl.py b/test/distributed/test_c10d_nccl.py index d2a9ad43769..2cf00a26d46 100644 --- a/test/distributed/test_c10d_nccl.py +++ b/test/distributed/test_c10d_nccl.py @@ -67,7 +67,6 @@ from torch.testing._internal.common_utils import ( TEST_WITH_ROCM, TestCase, ) -from torch.utils.cpp_extension import load_inline if TEST_WITH_DEV_DBG_ASAN: @@ -3104,40 +3103,6 @@ class NcclErrorHandlingTest(MultiProcessTestCase): class NcclUserBufferRegistrationTest(MultiProcessTestCase): - def createNcclAllocator(self): - nccl_allocator_source = """ - #include - #include - #include - - extern "C" { - - // Note that windows needs __declspec(dllexport): https://stackoverflow.com/a/24575865 - C10_EXPORT void* nccl_alloc(size_t size, int device, void* stream) { - std::cout << "Using ncclMemAlloc" << std::endl; - void* ptr; - ncclResult_t err = ncclMemAlloc(&ptr, size); - return ptr; - } - - C10_EXPORT void nccl_free(void* ptr, size_t size, int device, void* stream) { - std::cout << "Using ncclMemFree" << std::endl; - ncclResult_t err = ncclMemFree(ptr); - } - } - """ - nccl_allocator_libname = "nccl_allocator" - nccl_allocator = load_inline( - name=nccl_allocator_libname, - cpp_sources=nccl_allocator_source, - with_cuda=True, - extra_ldflags=["-lnccl"], - is_python_module=False, - keep_intermediates=False, - verbose=True, - ) - return nccl_allocator - def setUp(self): super().setUp() # TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests @@ -3172,13 +3137,9 @@ class NcclUserBufferRegistrationTest(MultiProcessTestCase): torch.cuda.set_device(self.rank) pg = c10d.distributed_c10d._get_default_group() backend = pg._get_backend(torch.device(device)) - allocator_path = self.createNcclAllocator() - allocator = torch.cuda.memory.CUDAPluggableAllocator( - allocator_path, - "nccl_alloc", - "nccl_free", - ) - pool = torch.cuda.MemPool(allocator.allocator()) + + # Use NCCL memory allocator + pool = torch.cuda.MemPool(backend.mem_allocator) # allocate memory with ncclMemAlloc with torch.cuda.use_mem_pool(pool): diff --git a/torch/_C/_distributed_c10d.pyi b/torch/_C/_distributed_c10d.pyi index bf0479f7828..c398eec6e00 100644 --- a/torch/_C/_distributed_c10d.pyi +++ b/torch/_C/_distributed_c10d.pyi @@ -296,6 +296,8 @@ class Backend: def _set_sequence_number_for_group(self) -> None: ... def _set_default_timeout(self, timeout: timedelta) -> None: ... def get_error(self) -> ErrorType: ... + @property + def mem_allocator(self) -> Any: ... class ProcessGroup: class BackendType(Enum): diff --git a/torch/csrc/distributed/c10d/Backend.hpp b/torch/csrc/distributed/c10d/Backend.hpp index bfcbc70e09b..9d188c9c26d 100644 --- a/torch/csrc/distributed/c10d/Backend.hpp +++ b/torch/csrc/distributed/c10d/Backend.hpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -409,6 +410,13 @@ class TORCH_API Backend : public torch::CustomClassHolder { c10::str("Backend ", getBackendName(), " does not support getError")); } + virtual std::shared_ptr getMemAllocator() { + TORCH_CHECK( + false, + c10::str( + "Backend ", getBackendName(), " does not support getMemAllocator")); + } + protected: // Implementations of this interface need to call this to setup // appropriate logging etc. diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp index e3d9b92d090..64b058c7bee 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp +++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -5249,6 +5250,37 @@ c10::intrusive_ptr ProcessGroupNCCL::_allgather_base( avoidRecordStreams); } +// Create a memory allocator for NCCL. This allocator is used to allocate memory +// that supports NVLink Sharp functionality. This allocator is later pybinded to +// python, so that users can use it to create MemPool. For example: +// >>> pool = torch.cuda.MemPool(backend.mem_allocator) + +// Allocate function +void* _ncclMemAlloc(size_t size, int device, void* stream) { + LOG(INFO) << "NCCL mem allocator: allocating " << size << " bytes"; + at::cuda::OptionalCUDAGuard gpuGuard(device); + void* ptr = nullptr; + TORCH_CHECK(ncclMemAlloc(&ptr, size) == ncclSuccess, "ncclMemAlloc failed"); + return ptr; +} + +// Free function +void _ncclMemFree(void* ptr, size_t size, int device, void* stream) { + LOG(INFO) << "NCCL mem allocator: freeing " << size << " bytes"; + at::cuda::OptionalCUDAGuard gpuGuard(device); + TORCH_CHECK(ncclMemFree(ptr) == ncclSuccess, "ncclMemFree failed"); +} + +// Create a `CUDAPluggableAllocator` that uses the above functions. +std::shared_ptr ProcessGroupNCCL::getMemAllocator() { + C10_LOG_API_USAGE_ONCE("ProcessGroupNCCL.getMemAllocator"); + static std::shared_ptr + ncclMemAllocator = + torch::cuda::CUDAPluggableAllocator::createCustomAllocator( + _ncclMemAlloc, _ncclMemFree); + return ncclMemAllocator; +} + } // namespace c10d #endif // USE_C10D_NCCL diff --git a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp index df09ffc9f85..7a2d0ec87df 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp +++ b/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp @@ -768,6 +768,8 @@ class TORCH_API ProcessGroupNCCL : public Backend { ErrorType getError() override; + std::shared_ptr getMemAllocator() override; + // Performs NCCL user buffer registration for all buffers in // the given MemPool void registerMemPool(c10::cuda::MemPool* pool); diff --git a/torch/csrc/distributed/c10d/init.cpp b/torch/csrc/distributed/c10d/init.cpp index 0359dcf55bf..59b4169383c 100644 --- a/torch/csrc/distributed/c10d/init.cpp +++ b/torch/csrc/distributed/c10d/init.cpp @@ -2765,7 +2765,9 @@ Arguments: .def( "_end_coalescing", &::c10d::Backend::endCoalescing, - py::call_guard()); + py::call_guard()) + .def_property_readonly( + "mem_allocator", &::c10d::Backend::getMemAllocator); // base Backend::Options binding // TODO: Maybe we can consider how to merge this with