mirror of
https://github.com/saymrwulf/pytorch.git
synced 2026-05-14 20:57:59 +00:00
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/41769 Currently the tests in `test_distributed` only work with the `fork` mode multiprocessing, this PR introduces support for `spawn` mode multiprocessing as well (while keeping the `fork` mode intact). Motivations for the change: 1) Spawn multiprocessing is the default on MacOS, so it better emulates how MacOS users would use distributed 2) With python 3.8+, spawn is the default on linux, so we should have test coverage for this 3) PT multiprocessing suggests using spawn/forkserver over fork, for sharing cuda tensors: https://pytorch.org/docs/stable/multiprocessing.html 4) Spawn is better supported with respect to certain sanitizers such as TSAN, so adding this sanitizer coverage may help us uncover issues. How it is done: 1) Move `test_distributed` tests in `_DistTestBase` class to a shared file `distributed_test` (similar to how the RPC tests are structured) 2) For `Barrier`, refactor the setup of temp directories, as the current version did not work with spawn, each process would get a different randomly generated directory and thus would write to different barriers. 3) Add all the relevant builds to run internally and in OSS. Running test_distributed with spawn mode in OSS can be done with: `python test/run_test.py -i distributed/test_distributed_spawn -v` Reviewed By: izdeby Differential Revision: D22408023 fbshipit-source-id: e206be16961fd80438f995e221f18139d7e6d2a9
3151 lines
124 KiB
Python
3151 lines
124 KiB
Python
from __future__ import absolute_import, division, print_function, unicode_literals
|
|
import copy
|
|
import fcntl
|
|
import itertools
|
|
import random
|
|
import math
|
|
import os
|
|
import sys
|
|
import time
|
|
import tempfile
|
|
import unittest
|
|
from contextlib import contextmanager
|
|
from datetime import timedelta
|
|
from functools import reduce
|
|
from io import StringIO
|
|
from typing import Union, NamedTuple
|
|
|
|
import torch
|
|
import torch.cuda
|
|
import torch.distributed as dist
|
|
from torch.utils.data.distributed import DistributedSampler
|
|
from torch.nn.parallel.distributed import _dump_DDP_relevant_env_vars
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
from torch.testing._internal.common_distributed import (
|
|
MultiProcessTestCase,
|
|
TEST_SKIPS,
|
|
initialize_temp_directories,
|
|
cleanup_temp_dir,
|
|
simple_sparse_reduce_tests,
|
|
skip_if_rocm,
|
|
skip_if_small_worldsize,
|
|
skip_if_lt_x_gpu,
|
|
skip_if_no_gpu,
|
|
require_n_gpus_for_nccl_backend,
|
|
)
|
|
from torch._utils_internal import TEST_MASTER_ADDR as MASTER_ADDR
|
|
from torch._utils_internal import TEST_MASTER_PORT as MASTER_PORT
|
|
|
|
try:
|
|
import torchvision
|
|
HAS_TORCHVISION = True
|
|
except ImportError:
|
|
HAS_TORCHVISION = False
|
|
|
|
|
|
class Foo:
|
|
def __init__(self, x):
|
|
self.x = x
|
|
|
|
def __eq__(self, other):
|
|
return self.__dict__ == other.__dict__
|
|
|
|
f = Foo(10)
|
|
f.bar = 1
|
|
|
|
collectives_object_test_list = [
|
|
{"key1": 3, "key2": 4, "key3": {"nested": True}},
|
|
f,
|
|
"foo",
|
|
[1, 2, True, "string", [4, 5, "nested"]],
|
|
]
|
|
|
|
|
|
skipIfNoTorchVision = unittest.skipIf(not HAS_TORCHVISION, "no torchvision")
|
|
|
|
BACKEND = os.environ["BACKEND"]
|
|
INIT_METHOD = os.getenv("INIT_METHOD", "env://")
|
|
|
|
DEFAULT_TIMEOUT = 300
|
|
CUSTOMIZED_TIMEOUT = {"test_DistributedDataParallel": 500}
|
|
|
|
|
|
class _FC2(nn.Module):
|
|
def __init__(self):
|
|
super(_FC2, self).__init__()
|
|
self.fc = nn.Linear(10, 50, bias=True)
|
|
self.fc.bias.requires_grad = False
|
|
|
|
def forward(self, x):
|
|
x = self.fc(x)
|
|
return x
|
|
|
|
|
|
class Net(nn.Module):
|
|
def __init__(self):
|
|
super(Net, self).__init__()
|
|
self.fc1 = nn.Linear(2, 10, bias=False)
|
|
self.fc2 = _FC2()
|
|
self.fc3 = nn.Linear(50, 4, bias=False)
|
|
self.relu = nn.ReLU()
|
|
self.no_grad_param = nn.Parameter(torch.tensor([2, 2]).long(),
|
|
requires_grad=False)
|
|
|
|
def forward(self, x):
|
|
x = self.relu(self.fc1(x))
|
|
x = self.relu(self.fc2(x))
|
|
x = self.fc3(x)
|
|
return F.softmax(x, dim=1)
|
|
|
|
class Task(nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.p = nn.Parameter(torch.ones(2, 2))
|
|
|
|
def forward(self, x):
|
|
return self.p + x
|
|
|
|
|
|
class BatchNormNet(nn.Module):
|
|
|
|
def __init__(self):
|
|
super(BatchNormNet, self).__init__()
|
|
self.fc1 = nn.Linear(2, 40, bias=False)
|
|
self.bn = nn.BatchNorm1d(4)
|
|
self.fc2 = nn.Linear(40, 4, bias=False)
|
|
|
|
def forward(self, x):
|
|
x = torch.reshape(self.fc1(x), (-1, 4, 10))
|
|
x = self.bn(x)
|
|
x = torch.reshape(x, (-1, 40))
|
|
x = self.fc2(x)
|
|
return F.softmax(x, dim=1)
|
|
|
|
|
|
DDP_NET = Net()
|
|
BN_NET = BatchNormNet()
|
|
ONLY_SBN_NET = nn.SyncBatchNorm(2, momentum=0.99)
|
|
|
|
|
|
@contextmanager
|
|
def _captured_output():
|
|
new_out, new_err = StringIO(), StringIO()
|
|
old_out, old_err = sys.stdout, sys.stderr
|
|
try:
|
|
sys.stdout, sys.stderr = new_out, new_err
|
|
yield sys.stdout, sys.stderr
|
|
finally:
|
|
sys.stdout, sys.stderr = old_out, old_err
|
|
|
|
|
|
def get_timeout(test_id):
|
|
test_name = test_id.split(".")[-1]
|
|
if test_name in CUSTOMIZED_TIMEOUT:
|
|
return CUSTOMIZED_TIMEOUT[test_name]
|
|
else:
|
|
return DEFAULT_TIMEOUT
|
|
|
|
|
|
def require_backend(backends):
|
|
if BACKEND not in backends:
|
|
return unittest.skip("Test requires backend to be one of %s" % backends)
|
|
return lambda func: func
|
|
|
|
|
|
def require_backends_available(backends):
|
|
def check(backend):
|
|
if backend == dist.Backend.GLOO:
|
|
return dist.is_gloo_available()
|
|
if backend == dist.Backend.NCCL:
|
|
return dist.is_nccl_available()
|
|
if backend == dist.Backend.MPI:
|
|
return dist.is_mpi_available()
|
|
return False
|
|
backends = map(lambda b: dist.Backend(b), backends)
|
|
if not all(map(check, backends)):
|
|
return unittest.skip(
|
|
"Test requires backends to be available %s" % backends)
|
|
return lambda func: func
|
|
|
|
|
|
def require_world_size(world_size):
|
|
if int(os.environ["WORLD_SIZE"]) < world_size:
|
|
return unittest.skip("Test requires world size of %d" % world_size)
|
|
return lambda func: func
|
|
|
|
|
|
def apply_hack_for_nccl():
|
|
# This is a hack for a known NCCL issue using multiprocess
|
|
# in conjunction with multiple threads to manage different GPUs which
|
|
# may cause ncclCommInitRank to fail.
|
|
# http://docs.nvidia.com/deeplearning/sdk/nccl-release-notes/rel_2.1.4.html#rel_2.1.4
|
|
# It slows down the performance of collective operations.
|
|
# Without this setting NCCL might throw unhandled error.
|
|
os.environ["NCCL_MAX_NRINGS"] = "1"
|
|
|
|
|
|
@contextmanager
|
|
def _lock():
|
|
TEMP_DIR = os.environ["TEMP_DIR"]
|
|
lockfile = os.path.join(TEMP_DIR, "lockfile")
|
|
with open(lockfile, "w") as lf:
|
|
try:
|
|
fcntl.flock(lf.fileno(), fcntl.LOCK_EX)
|
|
yield
|
|
finally:
|
|
fcntl.flock(lf.fileno(), fcntl.LOCK_UN)
|
|
lf.close()
|
|
|
|
|
|
def _build_tensor(size, value=None, dtype=torch.float):
|
|
if value is None:
|
|
value = size
|
|
return torch.empty(size, size, size, dtype=dtype).fill_(value)
|
|
|
|
|
|
def _build_multidim_tensor(dim, dim_size, value=None):
|
|
if value is None:
|
|
value = size
|
|
return torch.FloatTensor(size=[dim_size for _ in range(dim)]).fill_(value)
|
|
|
|
|
|
class Barrier(object):
|
|
barrier_id = 0
|
|
|
|
@classmethod
|
|
def init(cls):
|
|
cls.barrier_id = 0
|
|
barrier_dir = os.path.join(os.environ["TEMP_DIR"], "barrier")
|
|
for f_name in os.listdir(barrier_dir):
|
|
os.unlink(os.path.join(barrier_dir, f_name))
|
|
|
|
@classmethod
|
|
def sync(cls, wait_for=None, timeout=10):
|
|
if wait_for is None:
|
|
wait_for = dist.get_world_size()
|
|
cls.barrier_id += 1
|
|
barrier_dir = os.path.join(os.environ["TEMP_DIR"], "barrier")
|
|
pid = str(os.getpid())
|
|
barrier_file = os.path.join(barrier_dir, pid)
|
|
with _lock():
|
|
with open(barrier_file, "w") as f:
|
|
f.write(str(cls.barrier_id))
|
|
|
|
start_time = time.time()
|
|
while True:
|
|
arrived = 0
|
|
with _lock():
|
|
for f_name in os.listdir(barrier_dir):
|
|
with open(os.path.join(barrier_dir, f_name), "r") as f:
|
|
data = f.read()
|
|
if int(data) >= cls.barrier_id:
|
|
arrived += 1
|
|
if arrived == wait_for:
|
|
break
|
|
|
|
if time.time() - start_time > timeout:
|
|
raise RuntimeError("barrier timeout")
|
|
time.sleep(0.1)
|
|
|
|
|
|
class TestDistBackend(MultiProcessTestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
os.environ["MASTER_ADDR"] = str(MASTER_ADDR)
|
|
os.environ["MASTER_PORT"] = str(MASTER_PORT)
|
|
# os.environ["WORLD_SIZE"] = str(WORLD_SIZE)
|
|
super().setUpClass()
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
# initialize temp directories
|
|
initialize_temp_directories()
|
|
# initialize Barrier
|
|
Barrier.init()
|
|
|
|
def tearDown(self):
|
|
cleanup_temp_dir()
|
|
super().tearDown()
|
|
|
|
@property
|
|
def init_method(self):
|
|
return "file://{file_name}".format(file_name=self.file_name)
|
|
|
|
@classmethod
|
|
def _run(cls, rank, test_name, file_name):
|
|
self = cls(test_name)
|
|
self.rank = rank
|
|
self.file_name = file_name
|
|
try:
|
|
dist.init_process_group(
|
|
init_method=self.init_method,
|
|
backend=BACKEND,
|
|
world_size=int(self.world_size),
|
|
rank=self.rank,
|
|
)
|
|
except RuntimeError as e:
|
|
if "recompile" in e.args[0]:
|
|
sys.exit(TEST_SKIPS["backend_unavailable"].exit_code)
|
|
|
|
raise
|
|
|
|
# Execute barrier prior to running test to ensure that every process
|
|
# has finished initialization and that the following test
|
|
# immediately exiting due to a skip doesn't cause flakiness.
|
|
self._barrier()
|
|
|
|
# self.id() == e.g. '__main__.TestDistributed.test_get_rank'
|
|
# We're retreiving a corresponding test and executing it.
|
|
getattr(self, test_name)()
|
|
self._barrier()
|
|
dist.destroy_process_group()
|
|
sys.exit(0)
|
|
|
|
# Needed since MultiProcessTestCase assumes a world_size of 4, but we
|
|
# run these tests under other various world_sizes.
|
|
@property
|
|
def world_size(self):
|
|
return os.environ["WORLD_SIZE"]
|
|
|
|
|
|
class DistributedTest:
|
|
class _DistTestBase:
|
|
def _barrier(self, *args, **kwargs):
|
|
Barrier.sync(*args, **kwargs)
|
|
|
|
def _init_group_test(self, **kwargs):
|
|
group = [1, 2]
|
|
group_id = dist.new_group(group, **kwargs)
|
|
rank = dist.get_rank()
|
|
if rank not in group:
|
|
return ([], None, rank)
|
|
|
|
return (group, group_id, rank)
|
|
|
|
def _init_full_group_test(self, **kwargs):
|
|
group = list(range(0, dist.get_world_size()))
|
|
group_id = dist.new_group(**kwargs)
|
|
rank = dist.get_rank()
|
|
return (group, group_id, rank)
|
|
|
|
def _init_global_test(self):
|
|
group = list(range(0, dist.get_world_size()))
|
|
group_id = dist.group.WORLD
|
|
rank = dist.get_rank()
|
|
return (group, group_id, rank)
|
|
|
|
# HELPER FOR MULTIGPU TESTS
|
|
def _init_multigpu_helper(self):
|
|
"""Multigpu tests are designed to simulate the multi nodes with multi
|
|
GPUs on each node. Nccl backend requires equal #GPUs in each process.
|
|
On a single node, all visible GPUs are evenly
|
|
divided to subsets, each process only uses a subset.
|
|
"""
|
|
nGPUs = torch.cuda.device_count()
|
|
world_size = dist.get_world_size()
|
|
visible_devices = range(nGPUs)
|
|
|
|
if BACKEND == "nccl":
|
|
apply_hack_for_nccl()
|
|
|
|
nGPUs_per_process = nGPUs // world_size
|
|
rank_to_GPU = {
|
|
i: list(
|
|
visible_devices[i * nGPUs_per_process: (i + 1) * nGPUs_per_process]
|
|
)
|
|
for i in range(world_size)
|
|
}
|
|
return rank_to_GPU
|
|
|
|
def test_dump_DDP_relevant_env_vars(self):
|
|
with _captured_output() as (out, err):
|
|
_dump_DDP_relevant_env_vars()
|
|
lines = out.getvalue().splitlines()
|
|
|
|
def format_line(var):
|
|
return "env:%s=%s" % (var, os.environ[var] if var in os.environ else "N/A")
|
|
|
|
# Check relevant env vars
|
|
vars = [
|
|
"MASTER_ADDR",
|
|
"MASTER_PORT",
|
|
"WORLD_SIZE",
|
|
"NCCL_TOPO_DUMP_FILE", # N/A
|
|
]
|
|
for var in vars:
|
|
line = format_line(var)
|
|
self.assertIn(line, lines)
|
|
# Check irrelevant env vars
|
|
vars = [
|
|
"xxx",
|
|
"yyy",
|
|
"zzz",
|
|
]
|
|
for var in vars:
|
|
line = format_line(var)
|
|
self.assertNotIn(line, lines)
|
|
|
|
# GET RANK
|
|
def test_get_rank(self):
|
|
test_dir = os.path.join(os.environ["TEMP_DIR"], "test_dir")
|
|
pid = str(os.getpid())
|
|
num_processes = dist.get_world_size()
|
|
with open(os.path.join(test_dir, pid), "w") as f:
|
|
f.write(str(dist.get_rank()))
|
|
|
|
self._barrier()
|
|
|
|
all_ranks = set()
|
|
for f_name in os.listdir(test_dir):
|
|
with open(os.path.join(test_dir, f_name), "r") as f:
|
|
all_ranks.add(int(f.read()))
|
|
self.assertEqual(len(all_ranks), num_processes)
|
|
|
|
self._barrier()
|
|
|
|
if dist.get_rank() == 0:
|
|
for f_name in os.listdir(test_dir):
|
|
os.unlink(os.path.join(test_dir, f_name))
|
|
|
|
self._barrier()
|
|
|
|
def test_get_backend(self):
|
|
if dist.get_world_size() > 2:
|
|
group = [1, 2]
|
|
else:
|
|
group = [0, 1]
|
|
group_id = dist.new_group(group)
|
|
backend_str = BACKEND.lower()
|
|
self.assertEqual(dist.get_backend(), backend_str)
|
|
if dist.get_rank() in group:
|
|
self.assertEqual(dist.get_backend(group_id), backend_str)
|
|
else:
|
|
with self.assertRaisesRegex(RuntimeError, "Invalid process group specified"):
|
|
dist.get_backend(group_id)
|
|
|
|
def test_Backend_enum_class(self):
|
|
# test parsing
|
|
backend = BACKEND.lower()
|
|
self.assertEqual(dist.Backend(BACKEND.upper()), backend)
|
|
self.assertEqual(dist.Backend(BACKEND), backend)
|
|
with self.assertRaisesRegex(ValueError, "Invalid backend: 'undefined'"):
|
|
dist.Backend("undefined")
|
|
with self.assertRaisesRegex(ValueError, "Invalid backend: 'xYz'"):
|
|
dist.Backend("xYz")
|
|
with self.assertRaises(ValueError):
|
|
dist.Backend(None)
|
|
with self.assertRaises(ValueError):
|
|
dist.Backend(3)
|
|
with self.assertRaises(ValueError):
|
|
dist.Backend(["gloo"])
|
|
|
|
# Test destroy
|
|
def test_destroy_group(self):
|
|
if dist.get_world_size() > 2:
|
|
group = [1, 2]
|
|
else:
|
|
group = [0, 1]
|
|
group_id = dist.new_group(group)
|
|
self._barrier()
|
|
dist.destroy_process_group(group_id)
|
|
|
|
# Test get rank and size of group
|
|
def test_get_rank_size_group(self):
|
|
if dist.get_world_size() > 2:
|
|
group = [1, 2]
|
|
else:
|
|
group = [0, 1]
|
|
group_id = dist.new_group(group)
|
|
if dist.get_rank() in group:
|
|
self.assertEqual(dist.get_world_size(group_id), 2)
|
|
self.assertTrue(dist.get_rank(group_id) in list(range(2)))
|
|
else:
|
|
self.assertEqual(dist.get_world_size(group_id), -1)
|
|
self.assertEqual(dist.get_rank(group_id), -1)
|
|
|
|
# Test destroy full groups
|
|
def test_destroy_full_group(self):
|
|
_, group_id, _ = self._init_full_group_test()
|
|
self._barrier()
|
|
dist.destroy_process_group(group_id)
|
|
|
|
# Test get rank and size of full group
|
|
def test_get_rank_size_full_group(self):
|
|
_, group_id, _ = self._init_full_group_test()
|
|
self.assertEqual(dist.get_world_size(group_id), dist.get_world_size())
|
|
self.assertEqual(dist.get_rank(group_id), dist.get_rank())
|
|
|
|
def _test_barrier_timeout(self, group_id, timeout):
|
|
local_rank = dist.get_rank(group_id)
|
|
|
|
# Only execute barrier on rank == 0, causing it to timeout
|
|
if local_rank == 0:
|
|
expected_time = time.time() + timeout.total_seconds()
|
|
with self.assertRaisesRegex(Exception, " (Timed out|closed|timeout) "):
|
|
dist.barrier(group_id)
|
|
self.assertGreaterEqual(time.time(), expected_time)
|
|
else:
|
|
time.sleep(timeout.total_seconds())
|
|
|
|
@unittest.skipIf(BACKEND != "gloo", "Only gloo backend supports timeouts")
|
|
@unittest.skipIf(
|
|
not INIT_METHOD.startswith("file://"),
|
|
"Requires file:// initialization method. " +
|
|
"Both tcp:// and env:// rely on the TCP store for which "
|
|
"reinitialization has proven racy."
|
|
)
|
|
def test_barrier_timeout_global(self):
|
|
dist.destroy_process_group()
|
|
|
|
# Explicitly pass world size to the barrier because we've
|
|
# just destroyed any state in torch.distributed.
|
|
self._barrier(wait_for=int(os.environ["WORLD_SIZE"]))
|
|
|
|
# Reinitialize global process group
|
|
timeout = timedelta(seconds=1)
|
|
dist.init_process_group(
|
|
init_method=INIT_METHOD,
|
|
backend=BACKEND,
|
|
world_size=int(os.environ["WORLD_SIZE"]),
|
|
rank=self.rank,
|
|
timeout=timeout,
|
|
)
|
|
self._test_barrier_timeout(dist.group.WORLD, timeout)
|
|
|
|
@skip_if_small_worldsize
|
|
@unittest.skipIf(BACKEND != "gloo", "Only gloo backend supports timeouts")
|
|
def test_barrier_timeout_group(self):
|
|
timeout = timedelta(seconds=1)
|
|
_, group_id, _ = self._init_group_test(timeout=timeout)
|
|
if group_id is not None:
|
|
self._test_barrier_timeout(group_id, timeout)
|
|
|
|
@unittest.skipIf(BACKEND != "gloo", "Only gloo backend supports timeouts")
|
|
def test_barrier_timeout_full_group(self):
|
|
timeout = timedelta(seconds=1)
|
|
_, group_id, _ = self._init_full_group_test(timeout=timeout)
|
|
if group_id is not None:
|
|
self._test_barrier_timeout(group_id, timeout)
|
|
|
|
# This test helper can only be used when using the Gloo or NCCL backend
|
|
# **and** both the Gloo and NCCL backends are available.
|
|
# See the @skip annotations below.
|
|
def _test_group_override_backend(self, initializer):
|
|
if BACKEND == "gloo":
|
|
new_backend = "nccl"
|
|
if BACKEND == "nccl":
|
|
new_backend = "gloo"
|
|
|
|
group, group_id, rank = initializer(backend=new_backend)
|
|
if group_id is None:
|
|
return
|
|
|
|
if new_backend == "gloo":
|
|
self.assertTrue(isinstance(group_id, dist.ProcessGroupGloo))
|
|
if new_backend == "nccl":
|
|
self.assertTrue(isinstance(group_id, dist.ProcessGroupNCCL))
|
|
|
|
self.assertEqual(rank, group[dist.get_rank(group_id)])
|
|
self.assertEqual(len(group), dist.get_world_size(group_id))
|
|
|
|
# Pin device (so we avoid NCCL race conditions/deadlocks).
|
|
group_rank = dist.get_rank(group_id)
|
|
torch.cuda.set_device(group_rank)
|
|
|
|
# Run broadcast of CUDA tensor (so it works for both Gloo and NCCL).
|
|
tensor = _build_tensor(2, value=group_rank).cuda()
|
|
dist.broadcast(tensor, src=group[0], group=group_id)
|
|
self.assertEqual(_build_tensor(2, value=0), tensor.to("cpu"))
|
|
|
|
@require_backend({"gloo", "nccl"})
|
|
@require_backends_available({"gloo", "nccl"})
|
|
@require_world_size(3)
|
|
@skip_if_lt_x_gpu(2)
|
|
def test_backend_group(self):
|
|
self._test_group_override_backend(self._init_group_test)
|
|
|
|
@require_backend({"gloo", "nccl"})
|
|
@require_backends_available({"gloo", "nccl"})
|
|
@skip_if_lt_x_gpu(3)
|
|
def test_backend_full_group(self):
|
|
self._test_group_override_backend(self._init_full_group_test)
|
|
|
|
# SEND RECV
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv")
|
|
def test_send_recv(self):
|
|
rank = dist.get_rank()
|
|
tensor = _build_tensor(rank + 1)
|
|
|
|
for src in range(0, dist.get_world_size()):
|
|
if src == rank:
|
|
# Send mode
|
|
for dst in range(0, dist.get_world_size()):
|
|
if dst == rank:
|
|
continue
|
|
dist.send(tensor, dst)
|
|
else:
|
|
# Recv mode
|
|
expected_tensor = _build_tensor(src + 1)
|
|
output_tensor = _build_tensor(src + 1, value=-1)
|
|
dist.recv(output_tensor, src)
|
|
self.assertEqual(output_tensor, expected_tensor)
|
|
|
|
self._barrier()
|
|
|
|
# SEND RECV ANY SOURCE
|
|
@unittest.skipIf(
|
|
BACKEND == "nccl", "Nccl does not support send/recv from any source"
|
|
)
|
|
def test_send_recv_any_source(self):
|
|
rank = dist.get_rank()
|
|
tensor = _build_tensor(10, value=rank)
|
|
recv_ranks = set()
|
|
|
|
for dst in range(0, dist.get_world_size()):
|
|
if dst == rank:
|
|
# Recv mode
|
|
for dst in range(0, dist.get_world_size()):
|
|
if dst == rank:
|
|
continue
|
|
output_tensor = _build_tensor(10, value=-1)
|
|
sender = dist.recv(output_tensor)
|
|
|
|
# Assert the scalar value "sender" that should be
|
|
# equal to the rank of the sender is equal to all
|
|
# values in the received tensor.
|
|
self.assertTrue(output_tensor.eq(sender).all())
|
|
recv_ranks.add(sender)
|
|
else:
|
|
# Send mode
|
|
dist.send(tensor, dst)
|
|
|
|
self.assertEqual(len(recv_ranks), dist.get_world_size() - 1)
|
|
self._barrier()
|
|
|
|
# SEND RECV WITH TAG
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv")
|
|
def test_send_recv_with_tag(self):
|
|
rank = dist.get_rank()
|
|
world_size = dist.get_world_size()
|
|
tensor = _build_tensor(10, value=rank)
|
|
|
|
for dst in range(0, world_size):
|
|
if dst == rank:
|
|
# Recv mode
|
|
for src in range(0, world_size):
|
|
if src == rank:
|
|
continue
|
|
output_tensor = _build_tensor(10, value=-1)
|
|
dist.recv(output_tensor, src, tag=src)
|
|
self.assertTrue(output_tensor.eq(src).all())
|
|
else:
|
|
# Send mode
|
|
dist.send(tensor, dst, tag=rank)
|
|
|
|
# ISEND
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend")
|
|
def test_isend(self):
|
|
rank = dist.get_rank()
|
|
world_size = dist.get_world_size()
|
|
|
|
if rank == 0:
|
|
requests = [
|
|
dist.isend(_build_tensor(dest, 10), dest)
|
|
for dest in range(1, world_size)
|
|
]
|
|
for request in requests:
|
|
request.wait()
|
|
self.assertTrue(request.is_completed())
|
|
else:
|
|
tensor = _build_tensor(rank, -1)
|
|
dist.recv(tensor, 0)
|
|
self.assertEqual(tensor, _build_tensor(rank, 10))
|
|
|
|
self._barrier()
|
|
|
|
# IRECV
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support irecv")
|
|
def test_irecv(self):
|
|
rank = dist.get_rank()
|
|
world_size = dist.get_world_size()
|
|
|
|
if rank == 0:
|
|
expected_tensors = [_build_tensor(src, -1) for src in range(1, world_size)]
|
|
requests = [
|
|
dist.irecv(expected_tensors[src - 1], src)
|
|
for src in range(1, world_size)
|
|
]
|
|
|
|
for src in range(1, world_size):
|
|
requests[src - 1].wait()
|
|
self.assertTrue(requests[src - 1].is_completed())
|
|
self.assertEqual(expected_tensors[src - 1], _build_tensor(src, 10))
|
|
else:
|
|
tensor = _build_tensor(rank, 10)
|
|
dist.send(tensor, 0)
|
|
|
|
self._barrier()
|
|
|
|
# BROADCAST
|
|
def _test_broadcast_helper(
|
|
self, group, group_id, rank, cuda=False, rank_to_GPU=None
|
|
):
|
|
for dtype, value, requires_cuda in [
|
|
(torch.float, -1e-10, False),
|
|
(torch.double, -1e-100, False),
|
|
(torch.half, -0.1, True),
|
|
(torch.int8, -2, False),
|
|
(torch.uint8, 129, False),
|
|
(torch.int, -1e5, False),
|
|
(torch.long, -1e15, False),
|
|
]:
|
|
if requires_cuda and not cuda:
|
|
continue
|
|
for src in group:
|
|
expected_tensor = _build_tensor(src + 1, value, dtype)
|
|
if cuda:
|
|
expected_tensor = expected_tensor.cuda(rank_to_GPU[rank][0])
|
|
if rank == src:
|
|
dist.broadcast(expected_tensor, src, group_id)
|
|
else:
|
|
tensor = _build_tensor(src + 1, -1, dtype)
|
|
if cuda:
|
|
tensor = tensor.cuda(rank_to_GPU[rank][0])
|
|
dist.broadcast(tensor, src, group_id)
|
|
self.assertEqual(tensor.size(), expected_tensor.size())
|
|
self.assertEqual(tensor.ne(expected_tensor).max(), torch.tensor(False))
|
|
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_broadcast(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_broadcast_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(
|
|
BACKEND != "gloo" and BACKEND != "nccl",
|
|
"Only Gloo and Nccl backend supports CUDA allReduce",
|
|
)
|
|
@skip_if_no_gpu
|
|
def test_broadcast_cuda(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_broadcast_helper(group, group_id, rank, True, rank_to_GPU)
|
|
|
|
@skip_if_small_worldsize
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_broadcast_group(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_broadcast_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_broadcast_full_group(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_broadcast_helper(group, group_id, rank)
|
|
|
|
# REDUCE
|
|
def _test_reduce_helper(
|
|
self,
|
|
group,
|
|
group_id,
|
|
rank,
|
|
op,
|
|
master_value,
|
|
worker_value,
|
|
expected_value,
|
|
cuda=False,
|
|
rank_to_GPU=None,
|
|
):
|
|
for src in group:
|
|
if rank == src:
|
|
tensor = _build_tensor(src + 1).fill_(master_value)
|
|
if cuda:
|
|
tensor = tensor.cuda(rank_to_GPU[rank][0])
|
|
dist.reduce(tensor, src, op, group_id)
|
|
self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
|
|
else:
|
|
tensor = _build_tensor(src + 1).fill_(worker_value)
|
|
if cuda:
|
|
tensor = tensor.cuda(rank_to_GPU[rank][0])
|
|
dist.reduce(tensor, src, op, group_id)
|
|
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_reduce_sum(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.SUM,
|
|
2,
|
|
10,
|
|
2 + (10 * (len(group) - 1)),
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND != "nccl", "Only Nccl supports CUDA reduce")
|
|
@skip_if_no_gpu
|
|
@skip_if_rocm
|
|
def test_reduce_sum_cuda(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.SUM,
|
|
2,
|
|
10,
|
|
2 + 10 * (len(group) - 1),
|
|
True,
|
|
rank_to_GPU,
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_reduce_product(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.PRODUCT,
|
|
2,
|
|
10,
|
|
reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_reduce_min(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_reduce_max(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
@skip_if_small_worldsize
|
|
def test_reduce_group_sum(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.SUM,
|
|
2,
|
|
10,
|
|
2 + (10 * (len(group) - 1)),
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
@skip_if_small_worldsize
|
|
def test_reduce_group_product(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.PRODUCT,
|
|
2,
|
|
10,
|
|
reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
@skip_if_small_worldsize
|
|
def test_reduce_group_min(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
@skip_if_small_worldsize
|
|
def test_reduce_group_max(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_reduce_full_group_sum(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.SUM,
|
|
2,
|
|
10,
|
|
2 + (10 * (len(group) - 1)),
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_reduce_full_group_product(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.PRODUCT,
|
|
2,
|
|
10,
|
|
reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_reduce_full_group_min(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_reduce_full_group_max(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_reduce_helper(group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10)
|
|
|
|
# ALL REDUCE
|
|
def _test_all_reduce_helper(
|
|
self,
|
|
group,
|
|
group_id,
|
|
rank,
|
|
op,
|
|
master_value,
|
|
worker_value,
|
|
expected_value,
|
|
cuda=False,
|
|
rank_to_GPU=None,
|
|
):
|
|
for src in group:
|
|
if rank == src:
|
|
tensor = _build_tensor(src + 1).fill_(master_value)
|
|
if cuda:
|
|
tensor = tensor.cuda(rank_to_GPU[rank][0])
|
|
dist.all_reduce(tensor, op, group_id)
|
|
self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
|
|
else:
|
|
tensor = _build_tensor(src + 1).fill_(worker_value)
|
|
if cuda:
|
|
tensor = tensor.cuda(rank_to_GPU[rank][0])
|
|
dist.all_reduce(tensor, op, group_id)
|
|
self.assertEqual(tensor, _build_tensor(src + 1, expected_value))
|
|
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_reduce_sum(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.SUM,
|
|
2,
|
|
10,
|
|
2 + (10 * (len(group) - 1)),
|
|
)
|
|
|
|
@unittest.skipIf(
|
|
BACKEND != "gloo",
|
|
"Only Gloo backend will have CUDA allReduce tested",
|
|
)
|
|
@skip_if_no_gpu
|
|
def test_all_reduce_sum_cuda(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_all_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.SUM,
|
|
2,
|
|
10,
|
|
2 + (10 * (len(group) - 1)),
|
|
True,
|
|
rank_to_GPU,
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_reduce_product(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.PRODUCT,
|
|
2,
|
|
10,
|
|
reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_reduce_min(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_reduce_helper(
|
|
group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_reduce_max(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_reduce_helper(
|
|
group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10
|
|
)
|
|
|
|
@skip_if_small_worldsize
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_reduce_group_sum(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.SUM,
|
|
2,
|
|
10,
|
|
2 + (10 * (len(group) - 1)),
|
|
)
|
|
|
|
@skip_if_small_worldsize
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_reduce_group_product(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.PRODUCT,
|
|
2,
|
|
10,
|
|
reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
|
|
)
|
|
|
|
@skip_if_small_worldsize
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_reduce_group_min(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_reduce_helper(
|
|
group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1
|
|
)
|
|
|
|
@skip_if_small_worldsize
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_reduce_group_max(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_reduce_helper(
|
|
group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_reduce_full_group_sum(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.SUM,
|
|
2,
|
|
10,
|
|
2 + (10 * (len(group) - 1)),
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_reduce_full_group_product(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_reduce_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.PRODUCT,
|
|
2,
|
|
10,
|
|
reduce((lambda x, y: x * y), [10] * (len(group) - 1), 2),
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_reduce_full_group_min(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_reduce_helper(
|
|
group, group_id, rank, dist.ReduceOp.MIN, 1010, 1, 1
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_reduce_full_group_max(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_reduce_helper(
|
|
group, group_id, rank, dist.ReduceOp.MAX, -1, 10, 10
|
|
)
|
|
|
|
# SPARSE ALL REDUCE
|
|
def _test_sparse_all_reduce_sum(self, fn):
|
|
group, group_id, rank = self._init_global_test()
|
|
|
|
tests = simple_sparse_reduce_tests(
|
|
rank,
|
|
dist.get_world_size(),
|
|
num_inputs=1)
|
|
for (inputs, outputs) in tests:
|
|
tensors = [fn(input) for input in inputs]
|
|
dist.all_reduce(tensors[0], dist.ReduceOp.SUM, group_id)
|
|
self.assertEqual(tensors[0], outputs[0])
|
|
|
|
@unittest.skipIf(BACKEND != "gloo", "Only Gloo backend support sparse all reduce")
|
|
def test_sparse_all_reduce_sum(self):
|
|
self._test_sparse_all_reduce_sum(lambda t: t)
|
|
|
|
@unittest.skipIf(BACKEND != "gloo", "Only Gloo backend support sparse all reduce")
|
|
@skip_if_no_gpu
|
|
@skip_if_rocm
|
|
def test_sparse_all_reduce_sum_cuda(self):
|
|
self._test_sparse_all_reduce_sum(lambda t: t.clone().cuda())
|
|
|
|
# ALL REDUCE - COALESCED
|
|
@staticmethod
|
|
def _all_reduce_coalesced_sum_test_cases(group_size):
|
|
return (
|
|
[2, 3],
|
|
[10, 11],
|
|
[2 + 10 * (group_size - 1), 3 + 11 * (group_size - 1)]
|
|
)
|
|
|
|
@staticmethod
|
|
def _all_reduce_coalesced_product_test_cases(group_size):
|
|
return (
|
|
[1, 2],
|
|
[3, 4],
|
|
[1 * 3 ** (group_size - 1), 2 * 4 ** (group_size - 1)]
|
|
)
|
|
|
|
@staticmethod
|
|
def _all_reduce_coalesced_min_test_cases(group_size):
|
|
return (
|
|
[1, 4],
|
|
[2, 3],
|
|
[1, 3]
|
|
)
|
|
|
|
@staticmethod
|
|
def _all_reduce_coalesced_max_test_cases(group_size):
|
|
return (
|
|
[1, 4],
|
|
[2, 3],
|
|
[2, 4]
|
|
)
|
|
|
|
def _test_all_reduce_coalesced_helper(
|
|
self,
|
|
group,
|
|
group_id,
|
|
rank,
|
|
op,
|
|
cuda=False,
|
|
rank_to_GPU=None,
|
|
):
|
|
test_case_func = {
|
|
dist.ReduceOp.SUM: self._all_reduce_coalesced_sum_test_cases,
|
|
dist.ReduceOp.PRODUCT: self._all_reduce_coalesced_product_test_cases,
|
|
dist.ReduceOp.MIN: self._all_reduce_coalesced_min_test_cases,
|
|
dist.ReduceOp.MAX: self._all_reduce_coalesced_max_test_cases
|
|
}[op]
|
|
|
|
master_values, worker_values, expected_values = test_case_func(len(group))
|
|
|
|
for src in group:
|
|
tensors = [
|
|
_build_tensor(src + 1, val)
|
|
for val in (master_values if rank == src else worker_values)
|
|
]
|
|
if cuda:
|
|
tensors = list(map(tensors, lambda t: t.cuda(rank_to_GPU[rank][0])))
|
|
dist.all_reduce_coalesced(tensors, op, group_id)
|
|
self.assertEqual(
|
|
tensors,
|
|
[
|
|
_build_tensor(src + 1, expected_value)
|
|
for expected_value in expected_values
|
|
]
|
|
)
|
|
|
|
self._barrier()
|
|
|
|
@require_backend({"gloo"})
|
|
def test_all_reduce_coalesced_sum(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_reduce_coalesced_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.SUM,
|
|
cuda=False,
|
|
rank_to_GPU=None,
|
|
)
|
|
|
|
@require_backend({"gloo"})
|
|
def test_all_reduce_coalesced_product(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_reduce_coalesced_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.PRODUCT,
|
|
cuda=False,
|
|
rank_to_GPU=None,
|
|
)
|
|
|
|
@require_backend({"gloo"})
|
|
def test_all_reduce_coalesced_min(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_reduce_coalesced_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.MIN,
|
|
cuda=False,
|
|
rank_to_GPU=None,
|
|
)
|
|
|
|
@require_backend({"gloo"})
|
|
def test_all_reduce_coalesced_max(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_reduce_coalesced_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.MAX,
|
|
cuda=False,
|
|
rank_to_GPU=None
|
|
)
|
|
|
|
@skip_if_small_worldsize
|
|
@require_backend({"gloo"})
|
|
def test_all_reduce_coalesced_group_sum(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_reduce_coalesced_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.SUM,
|
|
cuda=False,
|
|
rank_to_GPU=None
|
|
)
|
|
|
|
@skip_if_small_worldsize
|
|
@require_backend({"gloo"})
|
|
def test_all_reduce_coalesced_group_product(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_reduce_coalesced_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.PRODUCT,
|
|
cuda=False,
|
|
rank_to_GPU=None
|
|
)
|
|
|
|
@skip_if_small_worldsize
|
|
@require_backend({"gloo"})
|
|
def test_all_reduce_coalesced_group_min(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_reduce_coalesced_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.MIN,
|
|
cuda=False,
|
|
rank_to_GPU=None
|
|
)
|
|
|
|
@skip_if_small_worldsize
|
|
@require_backend({"gloo"})
|
|
def test_all_reduce_coalesced_group_max(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_reduce_coalesced_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.MAX,
|
|
cuda=False,
|
|
rank_to_GPU=None
|
|
)
|
|
|
|
@require_backend({"gloo"})
|
|
def test_all_reduce_coalesced_full_group_sum(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_reduce_coalesced_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.SUM,
|
|
cuda=False,
|
|
rank_to_GPU=None
|
|
)
|
|
|
|
@require_backend({"gloo"})
|
|
def test_all_reduce_coalesced_full_group_product(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_reduce_coalesced_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.PRODUCT,
|
|
cuda=False,
|
|
rank_to_GPU=None
|
|
)
|
|
|
|
@require_backend({"gloo"})
|
|
def test_all_reduce_coalesced_full_group_min(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_reduce_coalesced_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.MIN,
|
|
cuda=False,
|
|
rank_to_GPU=None,
|
|
)
|
|
|
|
@require_backend({"gloo"})
|
|
def test_all_reduce_coalesced_full_group_max(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_reduce_coalesced_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
dist.ReduceOp.MAX,
|
|
cuda=False,
|
|
rank_to_GPU=None
|
|
)
|
|
|
|
# SCATTER
|
|
def _test_scatter_helper(self, group, group_id, rank):
|
|
for dest in group:
|
|
tensor = _build_tensor(dest + 1, -1)
|
|
expected_tensor = _build_tensor(dest + 1, rank)
|
|
tensors = (
|
|
[_build_tensor(dest + 1, i) for i in group] if rank == dest else []
|
|
)
|
|
dist.scatter(tensor, src=dest, scatter_list=tensors, group=group_id)
|
|
self.assertEqual(tensor, expected_tensor)
|
|
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_scatter_checks(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
one = torch.ones([1])
|
|
|
|
# Specify scatter_list argument only on source rank.
|
|
output = one.clone() * -1
|
|
if rank == 0:
|
|
scatter_list = [one.clone() * i for i in group]
|
|
dist.scatter(output, src=0, scatter_list=scatter_list)
|
|
else:
|
|
dist.scatter(output, src=0)
|
|
self.assertEqual(output, one * rank)
|
|
|
|
# Don't specify src argument.
|
|
output = one.clone() * -1
|
|
if rank == 0:
|
|
scatter_list = [one.clone() * i for i in group]
|
|
dist.scatter(output, scatter_list=scatter_list)
|
|
else:
|
|
dist.scatter(output)
|
|
self.assertEqual(output, one * rank)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter")
|
|
def test_scatter(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_scatter_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter")
|
|
@skip_if_small_worldsize
|
|
def test_scatter_group(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_scatter_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support scatter")
|
|
def test_scatter_full_group(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_scatter_helper(group, group_id, rank)
|
|
|
|
# GATHER
|
|
def _test_gather_helper(self, group, group_id, rank):
|
|
for dest in group:
|
|
tensor = _build_tensor(dest + 1, rank)
|
|
tensors = (
|
|
[_build_tensor(dest + 1, -1) for i in group] if rank == dest else []
|
|
)
|
|
dist.gather(tensor, dst=dest, gather_list=tensors, group=group_id)
|
|
if rank == dest:
|
|
expected_tensors = [_build_tensor(dest + 1, i) for i in group]
|
|
for t1, t2 in zip(tensors, expected_tensors):
|
|
self.assertEqual(t1, t2)
|
|
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_gather_checks(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
one = torch.ones([1])
|
|
|
|
# Specify gather_list argument only on destination rank.
|
|
if rank == 0:
|
|
gather_list = [one.clone() for _ in group]
|
|
dist.gather(one * rank, dst=0, gather_list=gather_list)
|
|
for i in group:
|
|
self.assertEqual(gather_list[i], one * i)
|
|
else:
|
|
dist.gather(one * rank, dst=0)
|
|
|
|
# Don't specify dst argument.
|
|
if rank == 0:
|
|
gather_list = [one.clone() for _ in group]
|
|
dist.gather(one * rank, gather_list=gather_list)
|
|
for i in group:
|
|
self.assertEqual(gather_list[i], one * i)
|
|
else:
|
|
dist.gather(one * rank)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_gather(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_gather_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
@skip_if_small_worldsize
|
|
def test_gather_group(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_gather_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_gather_full_group(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_gather_helper(group, group_id, rank)
|
|
|
|
# ALL GATHER
|
|
def _test_all_gather_helper(
|
|
self, group, group_id, rank, cuda=False, rank_to_GPU=None
|
|
):
|
|
for dest in group:
|
|
tensor = _build_tensor(dest + 1, rank)
|
|
tensors = [_build_tensor(dest + 1, -1) for i in group]
|
|
if cuda:
|
|
tensor = tensor.cuda(rank_to_GPU[rank][0])
|
|
tensors = [t.cuda(rank_to_GPU[rank][0]) for t in tensors]
|
|
dist.all_gather(tensors, tensor, group_id)
|
|
|
|
expected_tensors = [_build_tensor(dest + 1, i) for i in group]
|
|
for t1, t2 in zip(tensors, expected_tensors):
|
|
self.assertEqual(t1, t2)
|
|
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_gather(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_gather_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(BACKEND != "nccl", "Only Nccl supports CUDA all gather")
|
|
@unittest.skipIf(BACKEND == "nccl", "CUDA all gather skipped for NCCL")
|
|
@skip_if_no_gpu
|
|
def test_all_gather_cuda(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_all_gather_helper(group, group_id, rank, True, rank_to_GPU)
|
|
|
|
@skip_if_small_worldsize
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_gather_group(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_gather_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support CPU tensors")
|
|
def test_all_gather_full_group(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_gather_helper(group, group_id, rank)
|
|
|
|
def _run_all_gather_coalesced_and_verify(
|
|
self, output_tensor_lists, input_tensors, expected_tensors, group_id
|
|
):
|
|
"""
|
|
Helper that runs all_gather_coalesced and returns true if output
|
|
matches expectations.
|
|
"""
|
|
dist.all_gather_coalesced(
|
|
output_tensor_lists, input_tensors, group_id)
|
|
|
|
for l1, l2 in zip(output_tensor_lists, expected_tensors):
|
|
for t1, t2 in zip(l1, l2):
|
|
if not torch.equal(t1, t2):
|
|
return False
|
|
return True
|
|
|
|
def _test_all_gather_coalesced_helper(
|
|
self, group, group_id, rank
|
|
):
|
|
# TODO: Instead we should probably go through _rank_not_in_group
|
|
# mechanism to disable sending tensors
|
|
if group_id is not None:
|
|
for test_case_id in range(2, 5):
|
|
# Make sure we create tensors of incompatible sizes, e.g.
|
|
# [1], [2x2], [3x3x3] ... to be sent in one batch
|
|
input_tensors = [
|
|
_build_multidim_tensor(
|
|
tensor_id, tensor_id, rank + tensor_id) for tensor_id in range(
|
|
1, test_case_id)
|
|
]
|
|
output_tensor_lists = [
|
|
[
|
|
_build_multidim_tensor(
|
|
tensor_id, tensor_id, -1) for tensor_id in range(
|
|
1, test_case_id)
|
|
] for _ in group
|
|
]
|
|
expected_tensors = [
|
|
[
|
|
_build_multidim_tensor(
|
|
tensor_id,
|
|
tensor_id,
|
|
rank_iter + tensor_id) for tensor_id in range(
|
|
1, test_case_id)
|
|
] for rank_iter in group
|
|
]
|
|
assert self._run_all_gather_coalesced_and_verify(
|
|
output_tensor_lists, input_tensors,
|
|
expected_tensors, group_id
|
|
), "output tensors do not match expected ouputs"
|
|
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "all_gather_coalesced does not support NCCL")
|
|
@unittest.skipIf(BACKEND == "mpi", "all_gather_coalesced does not support MPI")
|
|
def test_all_gather_coalesced_simple(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_gather_coalesced_helper(group, group_id, rank)
|
|
|
|
@skip_if_small_worldsize
|
|
@unittest.skipIf(BACKEND == "nccl", "all_gather_coalesced does not support NCCL")
|
|
@unittest.skipIf(BACKEND == "mpi", "all_gather_coalesced does not support MPI")
|
|
def test_all_gather_coalesced_group(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_gather_coalesced_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "all_gather_coalesced does not support NCCL")
|
|
@unittest.skipIf(BACKEND == "mpi", "all_gather_coalesced does not support MPI")
|
|
def test_all_gather_coalesced_full_group(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_gather_coalesced_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "all_gather_coalesced does not support NCCL")
|
|
@unittest.skipIf(BACKEND == "mpi", "all_gather_coalesced does not support MPI")
|
|
def test_all_gather_coalesced_with_empty(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
input_tensors = [
|
|
rank * torch.ones([2, 2]),
|
|
torch.ones([0]),
|
|
(rank + 1) * torch.ones([3, 3]),
|
|
torch.ones([0]),
|
|
torch.ones([0])
|
|
]
|
|
output_tensors_lists = [
|
|
[
|
|
-1 * torch.ones([2, 2]),
|
|
-1 * torch.ones([0]),
|
|
-1 * torch.ones([3, 3]),
|
|
-1 * torch.ones([0]),
|
|
-1 * torch.ones([0])
|
|
] for _ in group
|
|
]
|
|
expected_tensors = [
|
|
[
|
|
r * torch.ones([2, 2]),
|
|
torch.ones([0]),
|
|
(r + 1) * torch.ones([3, 3]),
|
|
torch.ones([0]),
|
|
torch.ones([0])
|
|
] for r in group
|
|
]
|
|
assert self._run_all_gather_coalesced_and_verify(
|
|
output_tensors_lists, input_tensors, expected_tensors, group_id)
|
|
self._barrier()
|
|
|
|
# AllToAll
|
|
def _test_all_to_all_single_equal_split_helper(
|
|
self,
|
|
group,
|
|
group_id,
|
|
rank,
|
|
cuda=False,
|
|
rank_to_GPU=None,
|
|
):
|
|
if group_id is not None:
|
|
size = len(group)
|
|
in_tensor = torch.ones([size, size]) * rank
|
|
expected_tensor = torch.cat([torch.ones([1, size]) * i for i in group])
|
|
out_tensor = torch.ones([size, size]) * -1
|
|
if cuda:
|
|
in_tensor = in_tensor.cuda(rank_to_GPU[rank][0])
|
|
expected_tensor = expected_tensor.cuda(rank_to_GPU[rank][0])
|
|
out_tensor = out_tensor.cuda(rank_to_GPU[rank][0])
|
|
dist.all_to_all_single(out_tensor, in_tensor, group=group_id)
|
|
self.assertEqual(out_tensor, expected_tensor)
|
|
self._barrier()
|
|
|
|
def _test_all_to_all_single_unequal_split_helper(
|
|
self,
|
|
group,
|
|
group_id,
|
|
rank,
|
|
cuda=False,
|
|
rank_to_GPU=None,
|
|
):
|
|
if group_id is not None:
|
|
size = len(group)
|
|
in_splits = [i + 1 for i in group]
|
|
out_splits = [rank + 1 for _ in group]
|
|
in_tensor = torch.ones([sum(in_splits), size]) * rank
|
|
out_tensor = torch.ones([(rank + 1) * size, size])
|
|
expected_tensor = torch.cat([torch.ones([rank + 1, size]) * i for i in group])
|
|
if cuda:
|
|
in_tensor = in_tensor.cuda(rank_to_GPU[rank][0])
|
|
expected_tensor = expected_tensor.cuda(rank_to_GPU[rank][0])
|
|
out_tensor = out_tensor.cuda(rank_to_GPU[rank][0])
|
|
dist.all_to_all_single(
|
|
out_tensor, in_tensor, out_splits, in_splits, group=group_id)
|
|
self.assertEqual(out_tensor, expected_tensor)
|
|
self._barrier()
|
|
|
|
def _test_all_to_all_helper(self, group, group_id, rank):
|
|
if group_id is not None:
|
|
size = len(group)
|
|
in_splits = [i + 1 for i in group]
|
|
in_tensors = [
|
|
torch.ones([in_splits[i], size]) * rank for i, _ in enumerate(group)
|
|
]
|
|
out_tensors = [torch.ones([(rank + 1), size]) for _ in group]
|
|
expected_tensors = [torch.ones([rank + 1, size]) * i for i in group]
|
|
dist.all_to_all(out_tensors, in_tensors, group=group_id)
|
|
for t1, t2 in zip(out_tensors, expected_tensors):
|
|
self.assertEqual(t1, t2)
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(
|
|
BACKEND != "mpi", "Only MPI supports CPU all_to_all_single"
|
|
)
|
|
def test_all_to_all_single_equal_split(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_to_all_single_equal_split_helper(group, group_id, rank)
|
|
|
|
@unittest.skip("NCCL A2A is not enabled for OSS builds")
|
|
@unittest.skipIf(
|
|
BACKEND != "nccl", "Only Nccl supports CUDA all_to_all_single"
|
|
)
|
|
@skip_if_no_gpu
|
|
@skip_if_rocm
|
|
def test_all_to_all_single_equal_split_cuda(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_all_to_all_single_equal_split_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
True,
|
|
rank_to_GPU,
|
|
)
|
|
|
|
@unittest.skipIf(
|
|
BACKEND != "mpi", "Only MPI supports CPU all_to_all_single"
|
|
)
|
|
def test_all_to_all_single_unequal_split(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_to_all_single_unequal_split_helper(group, group_id, rank)
|
|
|
|
@unittest.skip("NCCL A2A is not enabled for OSS builds")
|
|
@unittest.skipIf(
|
|
BACKEND != "nccl", "Only Nccl supports CUDA all_to_all_single"
|
|
)
|
|
@skip_if_no_gpu
|
|
@skip_if_rocm
|
|
def test_all_to_all_single_unequal_split_cuda(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_all_to_all_single_unequal_split_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
True,
|
|
rank_to_GPU,
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND != "mpi", "Only MPI supports all_to_all")
|
|
def test_all_to_all(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_all_to_all_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(
|
|
BACKEND != "mpi", "Only MPI supports CPU all_to_all_single"
|
|
)
|
|
@skip_if_small_worldsize
|
|
def test_all_to_all_single_equal_split_group(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_to_all_single_equal_split_helper(group, group_id, rank)
|
|
|
|
@unittest.skip("NCCL A2A is not enabled for OSS builds")
|
|
@unittest.skipIf(
|
|
BACKEND != "nccl", "Only Nccl supports CUDA all_to_all_single"
|
|
)
|
|
@skip_if_no_gpu
|
|
@skip_if_rocm
|
|
@skip_if_small_worldsize
|
|
def test_all_to_all_single_equal_split_group_cuda(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_all_to_all_single_equal_split_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
True,
|
|
rank_to_GPU,
|
|
)
|
|
|
|
@unittest.skipIf(
|
|
BACKEND != "mpi", "Only MPI supports CPU all_to_all_single"
|
|
)
|
|
@skip_if_small_worldsize
|
|
def test_all_to_all_single_unequal_split_group(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_to_all_single_unequal_split_helper(group, group_id, rank)
|
|
|
|
@unittest.skip("NCCL A2A is not enabled for OSS builds")
|
|
@unittest.skipIf(
|
|
BACKEND != "nccl", "Only Nccl supports CUDA all_to_all_single"
|
|
)
|
|
@skip_if_no_gpu
|
|
@skip_if_rocm
|
|
@skip_if_small_worldsize
|
|
def test_all_to_all_single_unequal_split_group_cuda(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_all_to_all_single_unequal_split_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
True,
|
|
rank_to_GPU,
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND != "mpi", "Only MPI supports all_to_all")
|
|
@skip_if_small_worldsize
|
|
def test_all_to_all_group(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_all_to_all_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(
|
|
BACKEND != "mpi", "Only MPI supports CPU all_to_all_single"
|
|
)
|
|
def test_all_to_all_single_equal_split_full_group(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_to_all_single_equal_split_helper(group, group_id, rank)
|
|
|
|
@unittest.skip("NCCL A2A is not enabled for OSS builds")
|
|
@unittest.skipIf(
|
|
BACKEND != "nccl", "Only Nccl supports CUDA all_to_all_single"
|
|
)
|
|
@skip_if_no_gpu
|
|
@skip_if_rocm
|
|
def test_all_to_all_single_equal_split_full_group_cuda(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_all_to_all_single_equal_split_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
True,
|
|
rank_to_GPU,
|
|
)
|
|
|
|
@unittest.skipIf(
|
|
BACKEND != "mpi", "Only MPI supports CPU all_to_all_single"
|
|
)
|
|
def test_all_to_all_single_unequal_split_full_group(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_to_all_single_unequal_split_helper(group, group_id, rank)
|
|
|
|
@unittest.skip("NCCL A2A is not enabled for OSS builds")
|
|
@unittest.skipIf(
|
|
BACKEND != "nccl", "Only Nccl supports CUDA all_to_all_single"
|
|
)
|
|
@skip_if_no_gpu
|
|
@skip_if_rocm
|
|
def test_all_to_all_single_unequal_split_full_group_cuda(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_all_to_all_single_unequal_split_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
True,
|
|
rank_to_GPU,
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND != "mpi", "Only MPI supports all_to_all")
|
|
def test_all_to_all_full_group(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_all_to_all_helper(group, group_id, rank)
|
|
|
|
# BARRIER
|
|
def _test_barrier_helper(
|
|
self, group, group_id, rank, cuda=False, rank_to_GPU=None):
|
|
WAIT_TIME = 0.3 # seconds
|
|
|
|
for dest in group:
|
|
expected_time = torch.DoubleTensor(1).fill_(0.0)
|
|
if cuda:
|
|
expected_time = expected_time.cuda(rank_to_GPU[rank][0])
|
|
if dest == rank:
|
|
expected_time.fill_(time.time() + WAIT_TIME)
|
|
dist.broadcast(expected_time, dest, group_id)
|
|
time.sleep(WAIT_TIME + 0.1) # sleep a little bit longer
|
|
dist.barrier(group_id)
|
|
else:
|
|
dist.broadcast(expected_time, dest, group_id)
|
|
dist.barrier(group_id)
|
|
self.assertGreaterEqual(
|
|
float(time.time()),
|
|
float(expected_time[0]),
|
|
"destination rank: %d, my rank: %d" % (dest, rank) +
|
|
" (if you see this failure, please report in #14554)")
|
|
|
|
# Use higher timeout for the instance where the test runs
|
|
# against a subgroup and uses a CUDA tensor for expected time.
|
|
# The CUDA initialization for the participating processes can
|
|
# take long enough for the barrier timeout to trigger on the
|
|
# process that doesn't participate in the group.
|
|
self._barrier(timeout=20)
|
|
|
|
@skip_if_no_gpu
|
|
@unittest.skipIf(BACKEND == "mpi", "MPI doesn't supports GPU barrier")
|
|
def test_barrier_cuda(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_barrier_helper(group, group_id, rank, True, rank_to_GPU)
|
|
|
|
@skip_if_small_worldsize
|
|
@skip_if_no_gpu
|
|
@unittest.skipIf(BACKEND == "mpi", "MPI doesn't supports GPU barrier")
|
|
@skip_if_rocm
|
|
def test_barrier_group_cuda(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_barrier_helper(group, group_id, rank, True, rank_to_GPU)
|
|
|
|
@skip_if_small_worldsize
|
|
@skip_if_no_gpu
|
|
@unittest.skipIf(BACKEND == "mpi", "MPI doesn't supports GPU barrier")
|
|
def test_barrier_full_group_cuda(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_barrier_helper(group, group_id, rank, True, rank_to_GPU)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "NCCL does not support CPU barrier")
|
|
def test_barrier(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
self._test_barrier_helper(group, group_id, rank)
|
|
|
|
@skip_if_small_worldsize
|
|
@unittest.skipIf(BACKEND == "nccl", "NCCL does not support CPU barrier")
|
|
def test_barrier_group(self):
|
|
group, group_id, rank = self._init_group_test()
|
|
self._test_barrier_helper(group, group_id, rank)
|
|
|
|
@unittest.skipIf(BACKEND == "nccl", "NCCL does not support CPU barrier")
|
|
def test_barrier_full_group(self):
|
|
group, group_id, rank = self._init_full_group_test()
|
|
self._test_barrier_helper(group, group_id, rank)
|
|
|
|
def _test_broadcast_multigpu_helper(self, group, group_id, rank, rank_to_GPU):
|
|
for src in group:
|
|
expected_tensor = _build_tensor(src + 1)
|
|
tensors = [
|
|
_build_tensor(src + 1, -1).cuda(device=i) for i in rank_to_GPU[rank]
|
|
]
|
|
if rank == src:
|
|
tensors[0] = expected_tensor.cuda(device=rank_to_GPU[rank][0])
|
|
|
|
dist.broadcast_multigpu(tensors, src, group_id)
|
|
for tensor in tensors:
|
|
self.assertEqual(tensor, expected_tensor)
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND == "mpi", "MPI doesn't support broadcast multigpu")
|
|
@unittest.skipIf(BACKEND == "nccl", "NCCL broadcast multigpu skipped")
|
|
@skip_if_no_gpu
|
|
def test_broadcast_multigpu(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_broadcast_multigpu_helper(group, group_id, rank, rank_to_GPU)
|
|
|
|
def _test_all_reduce_multigpu_helper(
|
|
self,
|
|
group,
|
|
group_id,
|
|
rank,
|
|
rank_to_GPU,
|
|
op,
|
|
master_value,
|
|
worker_value,
|
|
expected_value,
|
|
):
|
|
for src in group:
|
|
if rank == src:
|
|
tensors = [
|
|
_build_tensor(src + 1, master_value).cuda(device=i)
|
|
for i in rank_to_GPU[rank]
|
|
]
|
|
else:
|
|
tensors = [
|
|
_build_tensor(src + 1, worker_value).cuda(device=i)
|
|
for i in rank_to_GPU[rank]
|
|
]
|
|
|
|
dist.all_reduce_multigpu(tensors, op, group_id)
|
|
expected_tensor = _build_tensor(src + 1, expected_value)
|
|
for tensor in tensors:
|
|
self.assertEqual(tensor, expected_tensor)
|
|
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND == "mpi", "MPI doesn't support broadcast multigpu")
|
|
@unittest.skipIf(BACKEND == "nccl", "CUDA all_reduce multigpu skipped for NCCL")
|
|
@skip_if_no_gpu
|
|
def test_all_reduce_multigpu(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_all_reduce_multigpu_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
rank_to_GPU,
|
|
dist.ReduceOp.SUM,
|
|
2,
|
|
10,
|
|
(2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]),
|
|
)
|
|
|
|
def _test_reduce_multigpu_helper(
|
|
self,
|
|
group,
|
|
group_id,
|
|
rank,
|
|
rank_to_GPU,
|
|
op,
|
|
master_value,
|
|
worker_value,
|
|
expected_value,
|
|
):
|
|
for src in group:
|
|
if rank == src:
|
|
tensors = [
|
|
_build_tensor(src + 1, master_value).cuda(device=i)
|
|
for i in rank_to_GPU[rank]
|
|
]
|
|
dist.reduce_multigpu(tensors, src, op, group_id)
|
|
expected_tensor = _build_tensor(src + 1, expected_value)
|
|
self.assertEqual(tensors[0], expected_tensor)
|
|
else:
|
|
tensors = [
|
|
_build_tensor(src + 1, worker_value).cuda(device=i)
|
|
for i in rank_to_GPU[rank]
|
|
]
|
|
dist.reduce_multigpu(tensors, src, op, group_id)
|
|
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports reduce multigpu")
|
|
@skip_if_no_gpu
|
|
@skip_if_rocm
|
|
def test_reduce_multigpu(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_reduce_multigpu_helper(
|
|
group,
|
|
group_id,
|
|
rank,
|
|
rank_to_GPU,
|
|
dist.ReduceOp.SUM,
|
|
2,
|
|
10,
|
|
(2 + 10 * (len(group) - 1)) * len(rank_to_GPU[0]),
|
|
)
|
|
|
|
def _test_all_gather_multigpu_helper(self, group, group_id, rank, rank_to_GPU):
|
|
for dest in group:
|
|
tensors = [
|
|
_build_tensor(dest + 1).cuda(device=i) for i in rank_to_GPU[rank]
|
|
]
|
|
|
|
# construct expected output along with
|
|
# a place holder to receive all gather results
|
|
output_tensors = []
|
|
expected_output = []
|
|
output_per_gpu = (
|
|
[_build_tensor(dest + 1, -1)] * len(rank_to_GPU[0]) * len(group)
|
|
)
|
|
expected_per_gpu = (
|
|
[_build_tensor(dest + 1)] * len(rank_to_GPU[0]) * len(group)
|
|
)
|
|
for gpu in rank_to_GPU[rank]:
|
|
output_tensors.append([t.cuda(device=gpu) for t in output_per_gpu])
|
|
expected_output.append([t.cuda(device=gpu) for t in expected_per_gpu])
|
|
|
|
dist.all_gather_multigpu(output_tensors, tensors, group_id)
|
|
self.assertEqual(output_tensors, expected_output)
|
|
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND != "nccl", "Only Nccl backend supports allgather multigpu")
|
|
@skip_if_no_gpu
|
|
def test_all_gather_multigpu(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
self._test_all_gather_multigpu_helper(group, group_id, rank, rank_to_GPU)
|
|
|
|
def _model_step(self, model):
|
|
for param in model.parameters():
|
|
if param.grad is not None:
|
|
with torch.no_grad():
|
|
param += param.grad
|
|
param.grad = None
|
|
|
|
def _prepare_dummy_data(self, local_bs):
|
|
# global_bs for DDP should be divisible by WORLD_SIZE
|
|
world_size = int(os.environ["WORLD_SIZE"])
|
|
global_bs = world_size * local_bs
|
|
input_cpu = torch.randn(global_bs, 2)
|
|
target = torch.randn(global_bs, 4)
|
|
loss = nn.MSELoss()
|
|
return global_bs, input_cpu, target, loss
|
|
|
|
# END TO END TEST FOR DISTRIBUTEDDATAPARALLEL
|
|
def _test_DDP_helper(self, model, input_var, target, loss, scale_factor=1.0):
|
|
model.train()
|
|
output = model(input_var)
|
|
l = loss(output, target) * scale_factor
|
|
l.backward()
|
|
|
|
def _assert_equal_param(self, param_gpu, param_DDP):
|
|
self.assertEqual(len(param_gpu), len(param_DDP))
|
|
for p_gpu, p_DDP in zip(param_gpu, param_DDP):
|
|
self.assertEqual(p_gpu, p_DDP)
|
|
|
|
def _test_DDP_5iter(
|
|
self, model_base, model_DDP, input, target, loss, local_bs, rank, batch_size, test_save, offset=None, world_size=0
|
|
):
|
|
for idx in range(5):
|
|
# single cpu/gpu training
|
|
self._test_DDP_helper(model_base, input, target, loss)
|
|
|
|
if offset is None:
|
|
offset = rank * local_bs
|
|
|
|
# DDP training, DDP scatters subsets of input_cpu to nodes/GPUs
|
|
self._test_DDP_helper(
|
|
model_DDP,
|
|
input[offset: offset + local_bs],
|
|
target[offset: offset + local_bs],
|
|
loss,
|
|
world_size * local_bs / batch_size if world_size != 0 else 1,
|
|
)
|
|
|
|
# Update weights and run a second iteration to shake out errors
|
|
self._model_step(model_base)
|
|
self._model_step(model_DDP)
|
|
self._assert_equal_param(
|
|
list(model_base.parameters()), list(model_DDP.module.parameters())
|
|
)
|
|
|
|
# Shuffle the input so that DDP input is different
|
|
input = input[torch.randperm(batch_size)]
|
|
|
|
# save the model in the middle and reload
|
|
if test_save and idx == 2 and INIT_METHOD.startswith("file://"):
|
|
with tempfile.NamedTemporaryFile() as tmp:
|
|
torch.save(model_DDP, tmp.name)
|
|
model_DDP = torch.load(tmp.name)
|
|
|
|
with tempfile.TemporaryFile() as tmp_file:
|
|
torch.save(model_DDP, tmp_file)
|
|
tmp_file.seek(0)
|
|
saved_model = torch.load(tmp_file)
|
|
for k in model_DDP.state_dict():
|
|
self.assertEqual(model_DDP.state_dict()[k], saved_model.state_dict()[k])
|
|
|
|
def _test_DistributedDataParallel(self, gpu_subset, rank, output_device=None):
|
|
# Run a simple end to end DDP model, use result of single node model
|
|
# as baseline
|
|
|
|
# cpu training setup
|
|
model = DDP_NET
|
|
|
|
# single gpu training setup
|
|
model_gpu = copy.deepcopy(model)
|
|
model_gpu.cuda(gpu_subset[0])
|
|
|
|
# DDP training setup
|
|
model_DDP = copy.deepcopy(model)
|
|
model_DDP.cuda(gpu_subset[0])
|
|
model_DDP = nn.parallel.DistributedDataParallel(
|
|
model_DDP, device_ids=gpu_subset
|
|
)
|
|
|
|
# test serializable/unserializable
|
|
with tempfile.NamedTemporaryFile() as tmp:
|
|
torch.save(model_DDP, tmp.name)
|
|
model_DDP = torch.load(tmp.name)
|
|
|
|
# dummy data initialization
|
|
local_bs = len(gpu_subset)
|
|
global_bs, input_cpu, target, loss = self._prepare_dummy_data(local_bs)
|
|
|
|
# check two model parameters over 5 iterations
|
|
self._test_DDP_5iter(
|
|
model_gpu,
|
|
model_DDP,
|
|
input_cpu.cuda(gpu_subset[0]),
|
|
target.cuda(gpu_subset[0]),
|
|
loss,
|
|
local_bs,
|
|
rank,
|
|
global_bs,
|
|
True
|
|
)
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(
|
|
BACKEND == "nccl", "nccl does not support DDP on CPU models"
|
|
)
|
|
def test_DistributedDataParallelCPU(self):
|
|
# Run a simple end to end DDP-CPU model, use result of single node
|
|
# model as baseline
|
|
group, group_id, rank = self._init_global_test()
|
|
|
|
# cpu training setup
|
|
model_base = DDP_NET
|
|
|
|
# DDP-CPU training setup
|
|
model_DDP = copy.deepcopy(model_base)
|
|
model_DDP = nn.parallel.DistributedDataParallelCPU(model_DDP)
|
|
|
|
# dummy data initialization
|
|
local_bs = 2
|
|
global_bs, input_cpu, target, loss = self._prepare_dummy_data(local_bs)
|
|
|
|
# check two model parameters over 5 iterations
|
|
self._test_DDP_5iter(
|
|
model_base, model_DDP, input_cpu, target, loss, local_bs, rank, global_bs, False
|
|
)
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
|
|
"Only Nccl & Gloo backend support DistributedDataParallel")
|
|
def test_DistributedDataParallel_requires_grad(self):
|
|
# a module without gradients shouldn't be accepted
|
|
self.assertRaises(AssertionError, lambda: nn.parallel.DistributedDataParallel(nn.Module()))
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(
|
|
BACKEND != "nccl" and BACKEND != "gloo",
|
|
"Only NCCL and GLOO backend support DistributedDataParallel",
|
|
)
|
|
@skip_if_lt_x_gpu(int(os.environ["WORLD_SIZE"]))
|
|
@skip_if_rocm
|
|
def test_DistributedDataParallel_non_default_stream(self):
|
|
stream = torch.cuda.Stream()
|
|
rank = self.rank
|
|
with torch.cuda.stream(stream):
|
|
net = torch.nn.parallel.DistributedDataParallel(
|
|
torch.nn.Linear(1, 1, bias=False).cuda(rank), device_ids=[rank]
|
|
)
|
|
for i in range(1000):
|
|
# Clear gradients manually
|
|
grad = net.module.weight.grad
|
|
if grad is not None:
|
|
grad.requires_grad_(False)
|
|
grad.zero_()
|
|
# Forward + BW
|
|
batch = torch.tensor([rank]).float().cuda(rank)
|
|
loss = net(batch).sum()
|
|
loss.backward()
|
|
# For each worker, the gradient on the weight should be worker_rank.
|
|
grad = net.module.weight.grad
|
|
avg = grad.clone()
|
|
# All-reducing the gradient averages should give us the gradient
|
|
# average. If not, then one of the workers has not correctly
|
|
# written back the averaged gradient before this all-reduce call.
|
|
dist.all_reduce(avg)
|
|
world_size = int(os.environ["WORLD_SIZE"])
|
|
avg.div_(world_size)
|
|
expected_grad = sum(i for i in range(world_size)) / world_size
|
|
self.assertEqual(
|
|
avg[0, 0],
|
|
expected_grad,
|
|
msg=f"Expected gradient of {expected_grad} but got {avg} on rank {self.rank}",
|
|
)
|
|
|
|
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
|
|
"Only Nccl & Gloo backend support DistributedDataParallel")
|
|
@skip_if_no_gpu
|
|
@skip_if_rocm
|
|
def test_DistributedDataParallel(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
gpus = list(rank_to_GPU[rank])
|
|
self._test_DistributedDataParallel(gpu_subset=gpus, rank=rank)
|
|
|
|
# test output_device
|
|
self._test_DistributedDataParallel(gpu_subset=gpus, rank=rank, output_device=torch.device('cuda'))
|
|
|
|
# test device_ids
|
|
gpus = list(map(lambda i: torch.device('cuda:' + str(i)), gpus))
|
|
self._test_DistributedDataParallel(gpu_subset=gpus, rank=rank, output_device=torch.device('cuda'))
|
|
|
|
def _test_DistributedDataParallel_SyncBatchNorm(self, gpu_subset, rank, local_bs, global_bs, offset, output_device=None):
|
|
# Run a simple end to end DDP model, use result of single node model
|
|
# as baseline
|
|
|
|
# cpu training setup
|
|
model = BN_NET
|
|
|
|
# single gpu training setup
|
|
model_gpu = copy.deepcopy(model)
|
|
model_gpu.cuda(gpu_subset[0])
|
|
|
|
# DDP training setup
|
|
model_DDP = nn.SyncBatchNorm.convert_sync_batchnorm(copy.deepcopy(model))
|
|
model_DDP.cuda(gpu_subset[0])
|
|
model_DDP = nn.parallel.DistributedDataParallel(
|
|
model_DDP, device_ids=gpu_subset
|
|
)
|
|
|
|
# test serializable/unserializable
|
|
with tempfile.NamedTemporaryFile() as tmp:
|
|
torch.save(model_DDP, tmp.name)
|
|
model_DDP = torch.load(tmp.name)
|
|
|
|
# data initialization
|
|
input_cpu = torch.randn(global_bs, 2)
|
|
target = torch.randn(global_bs, 4)
|
|
loss = nn.MSELoss()
|
|
|
|
# check two model parameters over 5 iterations
|
|
self._test_DDP_5iter(
|
|
model_gpu,
|
|
model_DDP,
|
|
input_cpu.cuda(gpu_subset[0]),
|
|
target.cuda(gpu_subset[0]),
|
|
loss,
|
|
local_bs,
|
|
rank,
|
|
global_bs,
|
|
True,
|
|
offset,
|
|
dist.get_world_size()
|
|
)
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
|
|
"Only Nccl & Gloo backend support DistributedDataParallel")
|
|
@skip_if_no_gpu
|
|
def test_DistributedDataParallel_SyncBatchNorm(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
# DDP does not support replicating BN layers within a process, hence
|
|
# testing with one module replica per process
|
|
gpus = [rank]
|
|
|
|
num_processes = dist.get_world_size()
|
|
local_bs = 2
|
|
bs_offset = int(rank * 2)
|
|
global_bs = int(num_processes * 2)
|
|
|
|
self._test_DistributedDataParallel_SyncBatchNorm(
|
|
gpu_subset=gpus,
|
|
rank=rank,
|
|
local_bs=local_bs,
|
|
global_bs=global_bs,
|
|
offset=bs_offset)
|
|
|
|
# test output_device
|
|
self._test_DistributedDataParallel_SyncBatchNorm(
|
|
gpu_subset=gpus,
|
|
rank=rank,
|
|
local_bs=local_bs,
|
|
global_bs=global_bs,
|
|
offset=bs_offset,
|
|
output_device=torch.device('cuda'))
|
|
|
|
# test device_ids
|
|
gpus = list(map(lambda i: torch.device('cuda:' + str(i)), gpus))
|
|
self._test_DistributedDataParallel_SyncBatchNorm(
|
|
gpu_subset=gpus,
|
|
rank=rank,
|
|
local_bs=local_bs,
|
|
global_bs=global_bs,
|
|
offset=bs_offset,
|
|
output_device=torch.device('cuda'))
|
|
|
|
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
|
|
"Only Nccl & Gloo backend support DistributedDataParallel")
|
|
@skip_if_no_gpu
|
|
def test_DistributedDataParallel_SyncBatchNorm_2D_Input(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
# DDP does not support replicating BN layers within a process, hence
|
|
# testing with one module replica per process
|
|
gpus = [rank]
|
|
|
|
model = nn.BatchNorm1d(2)
|
|
|
|
# single gpu training setup
|
|
model_gpu = copy.deepcopy(model)
|
|
model_gpu.cuda(gpus[0])
|
|
|
|
# DDP training setup
|
|
model_DDP = nn.SyncBatchNorm.convert_sync_batchnorm(copy.deepcopy(model))
|
|
model_DDP.cuda(gpus[0])
|
|
model_DDP = nn.parallel.DistributedDataParallel(
|
|
model_DDP, device_ids=gpus
|
|
)
|
|
|
|
local_bs = len(gpus) * 2
|
|
global_bs = dist.get_world_size() * local_bs
|
|
input_cpu = torch.randn(global_bs, 2)
|
|
target = torch.randn(global_bs, 2)
|
|
loss = nn.MSELoss()
|
|
|
|
# disabling cudnn.
|
|
# SyncBatchNorm goes through native_batch_norm kernel, this avoids the
|
|
# numerical issue created by the divergent code path.
|
|
with torch.backends.cudnn.flags(False):
|
|
# check two model parameters over 5 iterations
|
|
self._test_DDP_5iter(
|
|
model_gpu,
|
|
model_DDP,
|
|
input_cpu.cuda(gpus[0]),
|
|
target.cuda(gpus[0]),
|
|
loss,
|
|
local_bs,
|
|
rank,
|
|
global_bs,
|
|
True
|
|
)
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
|
|
"Only Nccl & Gloo backend support DistributedDataParallel")
|
|
@skip_if_no_gpu
|
|
@require_world_size(2)
|
|
@skip_if_rocm
|
|
def test_DistributedDataParallel_SyncBatchNorm_Single_Input_Per_Process(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
# DDP does not support replicating BN layers within a process, hence
|
|
# testing with one module replica per process
|
|
gpus = [rank]
|
|
|
|
model = nn.BatchNorm1d(2)
|
|
|
|
# single gpu training setup
|
|
model_gpu = copy.deepcopy(model)
|
|
model_gpu.cuda(gpus[0])
|
|
|
|
# DDP training setup
|
|
model_DDP = nn.SyncBatchNorm.convert_sync_batchnorm(copy.deepcopy(model))
|
|
model_DDP.cuda(gpus[0])
|
|
model_DDP = nn.parallel.DistributedDataParallel(
|
|
model_DDP, device_ids=gpus
|
|
)
|
|
|
|
local_bs = 1
|
|
global_bs = dist.get_world_size()
|
|
input_cpu = torch.randn(global_bs, 2)
|
|
target = torch.randn(global_bs, 2)
|
|
loss = nn.MSELoss()
|
|
|
|
# disabling cudnn.
|
|
# SyncBatchNorm goes through native_batch_norm kernel, this avoids the
|
|
# numerical issue created by the divergent code path.
|
|
with torch.backends.cudnn.flags(False):
|
|
# check two model parameters over 5 iterations
|
|
self._test_DDP_5iter(
|
|
model_gpu,
|
|
model_DDP,
|
|
input_cpu.cuda(gpus[0]),
|
|
target.cuda(gpus[0]),
|
|
loss,
|
|
local_bs,
|
|
rank,
|
|
global_bs,
|
|
True
|
|
)
|
|
self._barrier()
|
|
|
|
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
|
|
"Only Nccl & Gloo backend support DistributedDataParallel")
|
|
@skip_if_no_gpu
|
|
def test_DistributedDataParallel_SyncBatchNorm_Diff_Input_Sizes_Running_Value(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
rank_to_GPU = self._init_multigpu_helper()
|
|
model = nn.parallel.DistributedDataParallel(ONLY_SBN_NET.cuda(rank), device_ids=[rank])
|
|
|
|
input_var = []
|
|
for i in range(dist.get_world_size()):
|
|
input_var_rank = torch.cat([
|
|
torch.ones(2, 1, 10 ** (i + 1)) * (0.1 ** (i - 1)),
|
|
torch.ones(2, 1, 10 ** (i + 1)) * (0.3 ** (i - 1))
|
|
], dim=1)
|
|
input_var.append(input_var_rank)
|
|
|
|
all_input_var = torch.cat(
|
|
[x.permute(1, 0, 2).contiguous().view(ONLY_SBN_NET.num_features, -1) for x in input_var],
|
|
dim=1
|
|
).cuda(rank)
|
|
|
|
for i in range(100):
|
|
y = model(input_var[rank].cuda(rank))
|
|
y.mean().backward()
|
|
|
|
running_mean, running_var = model.module.running_mean, model.module.running_var
|
|
torch.testing.assert_allclose(running_mean, all_input_var.mean(1))
|
|
torch.testing.assert_allclose(running_var, all_input_var.var(1))
|
|
|
|
@unittest.skipIf(BACKEND != 'nccl' and BACKEND != 'gloo',
|
|
"Only Nccl & Gloo backend support DistributedDataParallel")
|
|
@skip_if_no_gpu
|
|
def test_DistributedDataParallel_SyncBatchNorm_Diff_Input_Sizes_gradient(self):
|
|
group, group_id, rank = self._init_global_test()
|
|
# only do single GPU per process
|
|
gpus = [rank]
|
|
|
|
# cpu training setup
|
|
model = BN_NET
|
|
|
|
num_processes = dist.get_world_size()
|
|
local_bs = rank + 2
|
|
bs_offset = int((rank + 3) * rank / 2)
|
|
global_bs = int((num_processes + 3) * num_processes / 2)
|
|
|
|
self._test_DistributedDataParallel_SyncBatchNorm(
|
|
gpu_subset=gpus,
|
|
rank=rank,
|
|
local_bs=local_bs,
|
|
global_bs=global_bs,
|
|
offset=bs_offset)
|
|
|
|
@skipIfNoTorchVision
|
|
def test_SyncBatchNorm_process_group(self):
|
|
# When adopting `convert_sync_batchnorm` to convert a `nn.modules`,
|
|
# it need to recursively pass the `process_group` in the module when the `SyncBatchNorm`
|
|
# is nested in a sub-module or sub-sub-module (e.g. resnet50 in torchvision.models).
|
|
|
|
process_ids = 0
|
|
process_group = torch.distributed.new_group([process_ids])
|
|
res50_model = torchvision.models.resnet50()
|
|
res50_model_sync = nn.SyncBatchNorm.convert_sync_batchnorm(copy.deepcopy(res50_model), process_group)
|
|
process_group_sync = res50_model_sync.layer1[0].bn1.process_group
|
|
self.assertEqual(process_group_sync, process_group)
|
|
|
|
def _run_reduction_test(
|
|
self, tensor, expected_tensor, op, reduction_fn=dist.all_reduce, dst=None
|
|
):
|
|
if reduction_fn != dist.all_reduce and dst is None:
|
|
raise ValueError(f"Reduction fn {reduction_fn} must specify dst!")
|
|
if dst is not None:
|
|
reduction_fn(tensor, dst, op)
|
|
# Only destination rank tensor is expected to have final result.
|
|
if dist.get_rank() == dst:
|
|
self.assertEqual(tensor, expected_tensor)
|
|
else:
|
|
reduction_fn(tensor, op)
|
|
self.assertEqual(tensor, expected_tensor)
|
|
|
|
@require_backend({"nccl"})
|
|
@require_backends_available({"nccl"})
|
|
@skip_if_lt_x_gpu(2)
|
|
@skip_if_rocm
|
|
def test_nccl_backend_bool_allreduce(self):
|
|
torch.cuda.set_device(self.rank)
|
|
# Run all_reduce with PRODUCT
|
|
element = self.rank % 2 == 0
|
|
for op in [dist.ReduceOp.PRODUCT, dist.ReduceOp.MIN]:
|
|
input_tensor = torch.tensor([element, element]).to(self.rank)
|
|
self._run_reduction_test(
|
|
input_tensor, torch.tensor([False, False]).to(self.rank), op
|
|
)
|
|
# Ensure that all ranks contributing True (cast to 1) results in the
|
|
# correct reduction.
|
|
input_tensor = torch.tensor([True, True]).to(self.rank)
|
|
expected_tensor = input_tensor.clone()
|
|
self._run_reduction_test(
|
|
input_tensor, expected_tensor, op
|
|
)
|
|
|
|
# Run all_reduce with SUM
|
|
for op in [dist.ReduceOp.SUM, dist.ReduceOp.MAX]:
|
|
input_tensor = torch.tensor([element, element]).to(self.rank)
|
|
self._run_reduction_test(
|
|
input_tensor, torch.tensor([True, True]).to(self.rank), op
|
|
)
|
|
# TODO: NCCL backend does not work correctly for bitwise reduction ops
|
|
# (see https://github.com/pytorch/pytorch/issues/41362). Add tests for
|
|
# these once it is supported.
|
|
|
|
@require_backend({"nccl"})
|
|
@require_backends_available({"nccl"})
|
|
@skip_if_lt_x_gpu(2)
|
|
@skip_if_rocm
|
|
def test_nccl_backend_bool_allgather(self):
|
|
torch.cuda.set_device(self.rank)
|
|
inp = {0: [True, True], 1: [False, True]}
|
|
input_tensor = torch.tensor(inp[self.rank % 2]).to(self.rank)
|
|
# Preserve a copy of the tensor to compare against after allgather.
|
|
input_tensor_copy = input_tensor.clone()
|
|
tensor_list = [
|
|
torch.tensor([False, False]).to(self.rank)
|
|
for _ in range(dist.get_world_size())
|
|
]
|
|
dist.all_gather(tensor_list, input_tensor)
|
|
|
|
self.assertEqual(len(tensor_list), dist.get_world_size())
|
|
for i, t in enumerate(tensor_list):
|
|
expected = torch.tensor(inp[i % 2]).to(self.rank)
|
|
self.assertEqual(t, expected)
|
|
# Ensure that the input tensor is not modified, since this collective
|
|
# does not modify its input.
|
|
self.assertEqual(input_tensor_copy, input_tensor)
|
|
|
|
@require_backend({"nccl"})
|
|
@require_backends_available({"nccl"})
|
|
@skip_if_lt_x_gpu(int(os.environ["WORLD_SIZE"]))
|
|
@skip_if_rocm
|
|
def test_nccl_backend_bool_reduce(self):
|
|
torch.cuda.set_device(self.rank)
|
|
inp = {0: [True, True], 1: [False, False]}
|
|
# Run reduce() with product op
|
|
for op in [dist.ReduceOp.PRODUCT, dist.ReduceOp.MIN]:
|
|
input_tensor = torch.tensor(inp[self.rank % 2]).to(self.rank)
|
|
expected = torch.tensor([False, False]).to(self.rank)
|
|
self._run_reduction_test(
|
|
input_tensor, expected, op, dist.reduce, dst=0
|
|
)
|
|
# Ensure that all ranks contributing True (cast to 1) results in the
|
|
# correct reduction.
|
|
input_tensor = torch.tensor([True, True]).to(self.rank)
|
|
expected_tensor = input_tensor.clone()
|
|
self._run_reduction_test(
|
|
input_tensor, expected_tensor, op, dist.reduce, dst=0
|
|
)
|
|
|
|
for op in [dist.ReduceOp.SUM, dist.ReduceOp.MAX]:
|
|
input_tensor = torch.tensor(inp[self.rank % 2]).to(self.rank)
|
|
expected = (
|
|
torch.tensor([True, True]).to(self.rank)
|
|
if self.rank == 0
|
|
else input_tensor.clone()
|
|
)
|
|
self._run_reduction_test(
|
|
input_tensor, expected, op, dist.reduce, dst=0
|
|
)
|
|
|
|
@require_backend({"nccl"})
|
|
@require_backends_available({"nccl"})
|
|
@skip_if_lt_x_gpu(2)
|
|
@skip_if_rocm
|
|
def test_nccl_backend_bool_broadcast(self):
|
|
tensor_size = 10
|
|
bcast_tensor = torch.tensor(
|
|
[
|
|
(random.random() < 0.5 if self.rank == 0 else False)
|
|
for _ in range(tensor_size)
|
|
]
|
|
).to(self.rank)
|
|
dist.broadcast(bcast_tensor, src=0)
|
|
# Now allgather and ensure the tensors are equal.
|
|
tensor_list = [
|
|
torch.tensor([False for _ in range(tensor_size)]).to(self.rank)
|
|
for _ in range(dist.get_world_size())
|
|
]
|
|
dist.all_gather(tensor_list, bcast_tensor)
|
|
expected = tensor_list[0]
|
|
for tensor in tensor_list[1:]:
|
|
self.assertEqual(tensor, expected)
|
|
|
|
@unittest.skipIf(
|
|
BACKEND != "nccl" and BACKEND != "gloo",
|
|
"Only NCCL and GLOO backend support DistributedDataParallel",
|
|
)
|
|
@skip_if_lt_x_gpu(int(os.environ["WORLD_SIZE"]))
|
|
def test_DistributedSampler_padding(self):
|
|
# Tests padding of distributed sampler.
|
|
world_size = dist.get_world_size()
|
|
dataset_size = 100 + world_size + 1
|
|
dataset = [torch.ones(1).to(self.rank) * i for i in range(dataset_size)]
|
|
|
|
# Specifying drop_last=True will cause the tail of the data to be dropped.
|
|
dist_sampler = DistributedSampler(dataset=dataset, drop_last=True)
|
|
local_num_samples, local_dataset_size = (
|
|
dist_sampler.num_samples,
|
|
dist_sampler.total_size,
|
|
)
|
|
# The effective dataset size should be the greatest integer that is <=
|
|
# dataset_size that is divisible by the world_size. This is to ensure each
|
|
# rank processes the same number of samples.
|
|
effective_dataset_size = (
|
|
math.ceil((dataset_size - world_size) / world_size)
|
|
if dataset_size % world_size != 0
|
|
else dataset_size / world_size
|
|
)
|
|
self.assertEqual(local_num_samples, effective_dataset_size)
|
|
self.assertEqual(local_dataset_size, local_num_samples * world_size)
|
|
indices_list = list(iter(dist_sampler))
|
|
self.assertEqual(len(indices_list), local_num_samples)
|
|
|
|
def validate_global_samples(local_num_samples):
|
|
# Ensure that each rank processes the same number of samples.
|
|
world_samples = [
|
|
torch.LongTensor([0]).to(self.rank) for _ in range(world_size)
|
|
]
|
|
dist.all_gather(world_samples, torch.tensor([local_num_samples]).to(self.rank))
|
|
world_samples = [sample.item() for sample in world_samples]
|
|
self.assertEqual(len(set(world_samples)), 1)
|
|
|
|
validate_global_samples(local_num_samples)
|
|
|
|
# drop_last=False is the default and will add additional indices to be sampled,
|
|
# increasing the effective dataset size.
|
|
dist_sampler_added_samples = DistributedSampler(dataset=dataset)
|
|
local_num_samples, local_dataset_size = (
|
|
dist_sampler_added_samples.num_samples,
|
|
dist_sampler_added_samples.total_size,
|
|
)
|
|
# The effective dataset size is the smallest integer that is >= dataset_size
|
|
# and divisible by the world size.
|
|
self.assertEqual(
|
|
local_num_samples, math.ceil(dataset_size / world_size)
|
|
)
|
|
self.assertEqual(local_dataset_size, local_num_samples * world_size)
|
|
indices_list = list(iter(dist_sampler_added_samples))
|
|
self.assertEqual(len(indices_list), local_num_samples)
|
|
|
|
# Ensure that each rank processes the same number of samples.
|
|
validate_global_samples(local_num_samples)
|
|
|
|
@require_backend({"nccl", "gloo"})
|
|
@require_n_gpus_for_nccl_backend(int(os.environ["WORLD_SIZE"]), os.environ["BACKEND"])
|
|
def test_allgather_object(self):
|
|
gather_objects = collectives_object_test_list
|
|
output_gathered = [None for _ in range(dist.get_world_size())]
|
|
dist.all_gather_object(
|
|
output_gathered, gather_objects[self.rank % len(gather_objects)]
|
|
)
|
|
|
|
for i, val in enumerate(output_gathered):
|
|
expected = gather_objects[i % len(gather_objects)]
|
|
self.assertEqual(val, expected)
|
|
|
|
output_gathered = [None for _ in range(dist.get_world_size())]
|
|
dist.all_gather_object(
|
|
output_gathered, gather_objects[self.rank % len(gather_objects)]
|
|
)
|
|
|
|
@require_backend({"gloo"})
|
|
@unittest.skipIf(BACKEND == "nccl", "NCCL does not support gather")
|
|
def test_gather_object(self):
|
|
# Ensure stateful objects can be gathered
|
|
gather_objects = collectives_object_test_list
|
|
output_gathered = [None for _ in range(dist.get_world_size())]
|
|
gather_on_rank = 0
|
|
my_rank = dist.get_rank()
|
|
dist.gather_object(
|
|
gather_objects[self.rank % len(gather_objects)],
|
|
object_gather_list=output_gathered if my_rank == gather_on_rank else None,
|
|
dst=gather_on_rank,
|
|
)
|
|
if my_rank != gather_on_rank:
|
|
self.assertEqual(
|
|
output_gathered, [None for _ in range(dist.get_world_size())]
|
|
)
|
|
else:
|
|
for i, val in enumerate(output_gathered):
|
|
expected = gather_objects[i % len(gather_objects)]
|
|
self.assertEqual(val, expected)
|
|
|
|
# Validate errors when objects can't be pickled.
|
|
class Bar:
|
|
pass
|
|
|
|
b = Bar()
|
|
gather_objects = [b for _ in range(dist.get_world_size())]
|
|
with self.assertRaisesRegex(AttributeError, "Can't pickle local object"):
|
|
dist.all_gather_object(
|
|
[None for _ in range(dist.get_world_size())], gather_objects[self.rank]
|
|
)
|
|
|
|
@require_backend({"nccl"})
|
|
@require_backends_available({"nccl"})
|
|
@skip_if_lt_x_gpu(2)
|
|
def test_nccl_gather_object_err(self):
|
|
output_gathered = [None for _ in range(dist.get_world_size())]
|
|
gather_on_rank = 0
|
|
my_rank = dist.get_rank()
|
|
with self.assertRaisesRegex(
|
|
RuntimeError, "ProcessGroupNCCL does not support gather"
|
|
):
|
|
dist.gather_object(
|
|
"foo",
|
|
object_gather_list=output_gathered
|
|
if my_rank == gather_on_rank
|
|
else None,
|
|
dst=gather_on_rank,
|
|
)
|
|
|
|
def validate_net_equivalence(self, net):
|
|
# Helper to validate synchronization of nets across ranks.
|
|
net_module_states = list(net.module.state_dict().values())
|
|
# Check that all tensors in module's state_dict() are equal.
|
|
for t in net_module_states:
|
|
tensor_list = [
|
|
torch.zeros_like(t) for _ in range(dist.get_world_size())
|
|
]
|
|
dist.all_gather(tensor_list, t)
|
|
for tensor in tensor_list:
|
|
self.assertEqual(tensor, t)
|
|
|
|
@require_backend({"gloo", "nccl"})
|
|
@require_backends_available({"gloo", "nccl"})
|
|
@skip_if_lt_x_gpu(2)
|
|
@skip_if_rocm
|
|
def test_ddp_sync_params_and_buffers(self):
|
|
# Test that after calling _sync_params_and_buffers, models across ranks
|
|
# are the same and are equal to the model on the input rank.
|
|
dim = 2
|
|
rank = self.rank
|
|
rank_to_broadcast = 1
|
|
# Seed to ensure that ranks are initialized with different initial models.
|
|
torch.manual_seed(rank)
|
|
model = nn.Linear(dim, dim, bias=False)
|
|
net = torch.nn.parallel.DistributedDataParallel(
|
|
model.cuda(rank), device_ids=[self.rank], bucket_cap_mb=1
|
|
)
|
|
new_model = nn.Linear(dim, dim, bias=False).cuda(rank)
|
|
net.module = copy.deepcopy(new_model)
|
|
# Assert params are different
|
|
net_module_states = list(net.module.state_dict().values())
|
|
for t in net_module_states:
|
|
tensor_list = [
|
|
torch.zeros_like(t) for _ in range(dist.get_world_size())
|
|
]
|
|
dist.all_gather(tensor_list, t)
|
|
for i, tensor in enumerate(tensor_list):
|
|
if i == rank:
|
|
self.assertEqual(t, tensor)
|
|
else:
|
|
# tensor from another rank should be different.
|
|
self.assertNotEqual(t, tensor)
|
|
|
|
net._sync_params_and_buffers(authoritative_rank=rank_to_broadcast)
|
|
# Now all model params should be the same.
|
|
self.validate_net_equivalence(net)
|
|
# Since the network params were broadcast from rank_to_broadcast, validate that
|
|
# they are the same as new_model on rank_to_broadcast.
|
|
if rank == rank_to_broadcast:
|
|
expected_states = new_model.state_dict().values()
|
|
for t, expected in zip(net_module_states, expected_states):
|
|
self.assertEqual(t, expected)
|
|
|
|
@require_backend({"gloo", "nccl"})
|
|
@require_backends_available({"gloo", "nccl"})
|
|
@skip_if_lt_x_gpu(2)
|
|
@skip_if_rocm
|
|
def test_ddp_grad_div_uneven_inputs(self):
|
|
# Test gradient division during training with join() API. If
|
|
# divide_by_initial_world_size=False, we scale by the effective world
|
|
# size when allreducing grads.
|
|
dim = 5
|
|
batch = 1
|
|
grad_scale = 50
|
|
rank = self.rank
|
|
model = nn.Linear(dim, dim, bias=False)
|
|
inp = torch.ones(batch, dim, device=self.rank) * grad_scale
|
|
net = torch.nn.parallel.DistributedDataParallel(
|
|
model.cuda(rank), device_ids=[self.rank], bucket_cap_mb=1
|
|
)
|
|
n_iters = 3
|
|
if self.rank > 0:
|
|
n_iters += 2
|
|
|
|
with net.join(divide_by_initial_world_size=False):
|
|
for _ in range(n_iters):
|
|
loss = net(inp).sum()
|
|
loss.backward()
|
|
# The grad is always expected_grad, since we divide by the number
|
|
# of currently active processes and inactive processes contribute
|
|
# zero gradient. If we kept dividing by static initial world
|
|
# size as processes leave, the grad would be smaller.
|
|
expected_grad = torch.ones(dim, dim, device=self.rank) * grad_scale
|
|
param = list(net.parameters())[0]
|
|
self.assertEqual(expected_grad, param.grad)
|
|
# Avoid accumulating grads so that it's the same every iteration
|
|
net.zero_grad()
|
|
torch.cuda.synchronize(device=self.rank)
|
|
|
|
# If divide_by_initial_world_size=True (default), we always scale grads
|
|
# by the initial world_size.
|
|
with net.join(divide_by_initial_world_size=True):
|
|
for i in range(n_iters):
|
|
loss = net(inp).sum()
|
|
loss.backward()
|
|
effective_ws = dist.get_world_size()
|
|
if i >= 3:
|
|
effective_ws -= 1
|
|
expected_grad = (
|
|
torch.ones(dim, dim, device=self.rank) * grad_scale * effective_ws
|
|
) / dist.get_world_size()
|
|
param = list(net.parameters())[0]
|
|
self.assertEqual(expected_grad, param.grad)
|
|
# Avoid accumulating grad so that it's the same every iteration.
|
|
net.zero_grad()
|
|
torch.cuda.synchronize(device=self.rank)
|
|
|
|
@require_backend({"gloo", "nccl"})
|
|
@require_backends_available({"gloo", "nccl"})
|
|
@skip_if_lt_x_gpu(2)
|
|
@skip_if_rocm
|
|
def test_ddp_join_model_equivalence(self):
|
|
# Verifies equivalence with model training locally and with DDP under
|
|
# the join context manager.
|
|
batch = 3
|
|
dim = 10
|
|
learning_rate = 0.03
|
|
model = nn.Linear(dim, dim, bias=False)
|
|
inp = torch.rand(batch, dim, device=self.rank)
|
|
local_model = copy.deepcopy(model)
|
|
local_model = local_model.cuda(self.rank)
|
|
rank_to_iter_mapping = {rank : 2 * (rank + 1) for rank in range(dist.get_world_size())}
|
|
# run local model
|
|
local_iters = sum(rank_to_iter_mapping.values())
|
|
local_optim = torch.optim.SGD(local_model.parameters(), lr=learning_rate)
|
|
for _ in range(local_iters):
|
|
local_optim.zero_grad()
|
|
out = local_model(inp)
|
|
loss = out.sum()
|
|
loss.backward()
|
|
local_optim.step()
|
|
|
|
# run DDP model with join API
|
|
num_iters = rank_to_iter_mapping[self.rank]
|
|
net = torch.nn.parallel.DistributedDataParallel(
|
|
model.cuda(self.rank), device_ids=[self.rank]
|
|
)
|
|
ddp_optim = torch.optim.SGD(
|
|
model.parameters(), lr=learning_rate * dist.get_world_size()
|
|
)
|
|
with net.join():
|
|
for i in range(num_iters):
|
|
ddp_optim.zero_grad()
|
|
out = net(inp)
|
|
loss = out.sum()
|
|
loss.backward()
|
|
torch.cuda.synchronize(device=self.rank)
|
|
ddp_optim.step()
|
|
|
|
# Validate model state dicts are equal
|
|
for (_, local_tensor), (_, dist_tensor) in zip(
|
|
local_model.state_dict().items(), net.module.state_dict().items()
|
|
):
|
|
self.assertEqual(local_tensor, dist_tensor)
|
|
|
|
def _run_uneven_inputs_test(
|
|
self, test_case, iteration_mapping, find_unused_params,
|
|
):
|
|
model = test_case.model
|
|
inp = test_case.inp
|
|
rank = self.rank
|
|
# Ensure all outsanding GPU work is comlete so this test runs independently.
|
|
torch.cuda.synchronize()
|
|
# Bucket_cap_mb is intentionally low to test allreduce scheduling when
|
|
# there are many buckets.
|
|
net = torch.nn.parallel.DistributedDataParallel(
|
|
model.cuda(rank),
|
|
device_ids=[rank],
|
|
bucket_cap_mb=1,
|
|
find_unused_parameters=find_unused_params,
|
|
)
|
|
|
|
# Determine num iters for this rank via the passed in mapping.
|
|
num_iters = iteration_mapping[rank]
|
|
with net.join():
|
|
for _ in range(num_iters):
|
|
if isinstance(inp, tuple):
|
|
loss = net(*inp).sum()
|
|
else:
|
|
loss = net(inp).sum()
|
|
loss.backward()
|
|
self._model_step(net)
|
|
# Ensure completion of GPU kernels (including allreduce). If the
|
|
# join API is not properly implemented, then this should hang
|
|
# since the allreduce will hang.
|
|
torch.cuda.synchronize(device=rank)
|
|
|
|
# Ensure completion of all GPU kernels.
|
|
torch.cuda.synchronize(device=rank)
|
|
self.assertTrue(net._authoritative_rank)
|
|
# All ranks should have agreed on the same authoritative_rank!
|
|
final_rank_tensor = torch.tensor([net._authoritative_rank], device=self.rank)
|
|
tensor_list = [
|
|
torch.zeros_like(final_rank_tensor)
|
|
for _ in range(dist.get_world_size())
|
|
]
|
|
dist.all_gather(tensor_list, final_rank_tensor)
|
|
max_rank = dist.get_world_size() - 1
|
|
self.assertSetEqual({max_rank}, set(tensor.item() for tensor in tensor_list))
|
|
# Ensure that all models are the same across ranks after all have joined.
|
|
self.validate_net_equivalence(net)
|
|
dist.barrier()
|
|
|
|
@require_backend({"gloo", "nccl"})
|
|
@require_backends_available({"gloo", "nccl"})
|
|
@skip_if_lt_x_gpu(2)
|
|
@skip_if_rocm
|
|
def test_ddp_uneven_inputs(self):
|
|
class DDPUnevenTestInput(NamedTuple):
|
|
name: str
|
|
model: nn.Module
|
|
inp: Union[torch.tensor, tuple]
|
|
|
|
dim = 1000
|
|
batch = 1
|
|
# Create a variety of models to run uneven input tests on.
|
|
large_model = nn.Sequential(
|
|
nn.Conv2d(1, 20, 5),
|
|
nn.ReLU(),
|
|
nn.Conv2d(20, 32, 5),
|
|
nn.ReLU(),
|
|
nn.Conv2d(32, 256, 5),
|
|
nn.ReLU(),
|
|
)
|
|
small_model = nn.Linear(dim, dim, bias=False)
|
|
bn_net = BatchNormNet()
|
|
|
|
class UnusedParamModule(nn.Module):
|
|
def __init__(self, unused_params_rank):
|
|
super().__init__()
|
|
self.t0 = Task()
|
|
self.t1 = Task()
|
|
self.unused_params_rank = unused_params_rank
|
|
|
|
def task_parameters(self):
|
|
return (self.t0.p, self.t1.p)
|
|
|
|
def forward(self, x, rank):
|
|
return (
|
|
self.t1(self.t0(x))
|
|
if rank != self.unused_params_rank
|
|
else self.t1(x)
|
|
)
|
|
|
|
unjoined_rank_with_unused_params_model = UnusedParamModule(1)
|
|
joined_rank_with_unused_params_model = UnusedParamModule(0)
|
|
|
|
rank = self.rank
|
|
models_to_test = [
|
|
# Network with batchnorm
|
|
DDPUnevenTestInput(
|
|
name="batch_norm_net", model=bn_net, inp=torch.ones(batch, 2, device=rank)
|
|
),
|
|
DDPUnevenTestInput(
|
|
name="large_conv_model",
|
|
model=large_model,
|
|
inp=torch.ones(batch, batch, dim, dim, device=rank),
|
|
),
|
|
DDPUnevenTestInput(
|
|
name="small_model",
|
|
model=small_model,
|
|
inp=torch.ones(batch, dim, device=rank),
|
|
),
|
|
# Unused parameter test where rank that does not join early has unused params
|
|
DDPUnevenTestInput(
|
|
name="unjoined_rank_with_unused_params_model",
|
|
model=unjoined_rank_with_unused_params_model,
|
|
inp=(torch.ones(batch, 2, device=rank), rank),
|
|
),
|
|
# Unused parameter test where rank that does join early has unused params
|
|
DDPUnevenTestInput(
|
|
name="joined_rank_with_unused_params_model",
|
|
model=joined_rank_with_unused_params_model,
|
|
inp=(torch.ones(batch, 2, device=rank), rank),
|
|
),
|
|
]
|
|
|
|
# Add resnet model if we have torchvision installed.
|
|
if HAS_TORCHVISION:
|
|
resnet_model = torchvision.models.resnet50()
|
|
models_to_test.append(
|
|
DDPUnevenTestInput(
|
|
name="resnet_model",
|
|
model=resnet_model,
|
|
inp=torch.ones(1, 3, 1000, 1000),
|
|
)
|
|
)
|
|
|
|
# 0 iteration tests for when one process does not train model at all, so
|
|
# we must shadow the broadcast calls made when rebuilding buckets.
|
|
baseline_num_iters = [0, 5]
|
|
iteration_offsets = [2, 3, 10]
|
|
num_uneven_ranks = [1]
|
|
if dist.get_world_size() > 2:
|
|
num_uneven_ranks.append(2)
|
|
iteration_mappings = []
|
|
# Generate rank : num_iters mappings for various uneven input scenarios.
|
|
# This includes cases where rank 0 joins early and all other ranks join
|
|
# later, and scenarios where multiple ranks join early, but at different
|
|
# iterations, and later ranks join later.
|
|
for num_early_join_ranks in num_uneven_ranks:
|
|
for baseline_iter in baseline_num_iters:
|
|
for offset in iteration_offsets:
|
|
mapping = {
|
|
rank: baseline_iter for rank in range(0, num_early_join_ranks)
|
|
}
|
|
# if num_early_join_ranks > 1, ranks > 0 that will join early
|
|
# iterate offset//2 more times than rank 0, to test nodes
|
|
# depleting inputs at different times.
|
|
if num_early_join_ranks > 1:
|
|
for rank in mapping.keys():
|
|
if rank > 0:
|
|
mapping[rank] += offset // 2
|
|
mapping.update(
|
|
{
|
|
rank: baseline_iter + offset
|
|
for rank in range(
|
|
num_early_join_ranks, dist.get_world_size()
|
|
)
|
|
}
|
|
)
|
|
iteration_mappings.append(mapping)
|
|
|
|
for (test_case, iteration_mapping) in itertools.product(
|
|
models_to_test, iteration_mappings
|
|
):
|
|
if self.rank == 0:
|
|
print(
|
|
f"Running test: {test_case.name} with iteration mapping {iteration_mapping}"
|
|
)
|
|
self._run_uneven_inputs_test(
|
|
test_case,
|
|
iteration_mapping,
|
|
find_unused_params=("unused_params_model" in test_case.name),
|
|
)
|
|
|
|
@require_backend({"gloo", "nccl"})
|
|
@require_backends_available({"gloo", "nccl"})
|
|
@skip_if_lt_x_gpu(2)
|
|
@skip_if_rocm
|
|
def test_ddp_uneven_input_join_disable(self):
|
|
# tests that if net.join() with enable=False is specified, DDP works as
|
|
# expected with even inputs.
|
|
torch.manual_seed(self.rank)
|
|
net = torch.nn.parallel.DistributedDataParallel(
|
|
torch.nn.Linear(1, 1).cuda(self.rank), device_ids=[self.rank]
|
|
)
|
|
inp = torch.ones(1) * self.rank
|
|
n_iters = 5
|
|
world_size = dist.get_world_size()
|
|
with net.join(enable=False):
|
|
for _ in range(n_iters):
|
|
# Clear grads
|
|
grad = net.module.weight.grad
|
|
if grad is not None:
|
|
grad.requires_grad_(False)
|
|
grad.zero_()
|
|
out = net(inp)
|
|
loss = out.sum()
|
|
loss.backward()
|
|
# Validate gradients to ensure that we divide by the correct
|
|
# world_size when join mode is disabled.
|
|
expected_grad = sum(i for i in range(world_size)) / world_size
|
|
self.assertEqual(
|
|
net.module.weight.grad.item(), expected_grad
|
|
)
|
|
|
|
self.assertFalse(net.ddp_join_enabled)
|
|
self.validate_net_equivalence(net)
|
|
|
|
@require_backend({"gloo", "nccl"})
|
|
@require_backends_available({"gloo", "nccl"})
|
|
@skip_if_lt_x_gpu(2)
|
|
@skip_if_rocm
|
|
def test_ddp_uneven_input_exception(self):
|
|
# Tests that exceptions during training are correctly propagated by the
|
|
# context manager.
|
|
error_str = "Intentional error"
|
|
|
|
class ExceptionModule(nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.param = nn.Parameter(torch.ones(1, requires_grad=True))
|
|
|
|
def forward(self, _):
|
|
raise ValueError(error_str)
|
|
|
|
exception_module = ExceptionModule()
|
|
net = torch.nn.parallel.DistributedDataParallel(
|
|
exception_module.cuda(self.rank), device_ids=[self.rank]
|
|
)
|
|
inp = torch.ones(1)
|
|
with self.assertRaisesRegex(ValueError, error_str):
|
|
with net.join():
|
|
out = net(inp)
|
|
loss = out.sum()
|
|
loss.backward()
|
|
|
|
@require_backend({"gloo", "nccl"})
|
|
@require_backends_available({"gloo", "nccl"})
|
|
@skip_if_lt_x_gpu(4)
|
|
@skip_if_rocm
|
|
def test_ddp_uneven_inputs_replicated_error(self):
|
|
# Tests that the context manager errors out in SPMD mode.
|
|
group = dist.new_group([0, 1])
|
|
if self.rank < 2:
|
|
model = nn.Linear(1, 1, bias=False)
|
|
rank_to_device = {0: [0, 1], 1: [2, 3]}
|
|
|
|
devices = rank_to_device[self.rank]
|
|
net = torch.nn.parallel.DistributedDataParallel(
|
|
model.cuda(devices[0]), device_ids=devices, process_group=group
|
|
)
|
|
with self.assertRaisesRegex(
|
|
ValueError, r"DDP join\(\) API does not support Single-Process Multi-GPU"
|
|
):
|
|
with net.join():
|
|
pass
|
|
# We need a barrier since otherwise non-participating processes exit too early
|
|
# and cause a timeout.
|
|
self._barrier(timeout=60)
|
|
|
|
@require_backend({"nccl", "gloo"})
|
|
@require_n_gpus_for_nccl_backend(int(os.environ["WORLD_SIZE"]), os.environ["BACKEND"])
|
|
def test_broadcast_object_list(self):
|
|
src_rank = 0
|
|
objects = collectives_object_test_list if self.rank == src_rank else [None for _ in collectives_object_test_list]
|
|
|
|
# Single object test
|
|
single_obj_list = [objects[0]]
|
|
if self.rank != src_rank:
|
|
self.assertNotEqual(single_obj_list[0], collectives_object_test_list[0])
|
|
dist.broadcast_object_list(single_obj_list, src=0)
|
|
self.assertEqual(single_obj_list[0], collectives_object_test_list[0])
|
|
|
|
# Multiple input objects test
|
|
if self.rank != src_rank:
|
|
self.assertNotEqual(objects, collectives_object_test_list)
|
|
dist.broadcast_object_list(objects, src=0)
|
|
self.assertEqual(objects, collectives_object_test_list)
|