pytorch/test/distributed/test_distributed_fork.py
Yanli Zhao 85b97e449d [RFC]fix test_ddp_logging_data_cpu with tsan (#54465)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/54465

It is reported that there is data race issue when the test runs with tsan. The root cause is from 'model.frc1.double()' call. This is not because DistributedDataParallel() works together with 'model.frc1.double()'. If we remove DistributedDataParallel(), just call 'model.frc1.double(); model.frc2.double();', it complained the same data race issue.

I'm not sure how to do data type cast in this test without tsan complains, so removing this line of codes and mixed data type logging check.

Please kindly let me know if you have a better suggestion on how to do data type cast correctly

Test Plan: unit test

Reviewed By: SciPioneer

Differential Revision: D27249821

fbshipit-source-id: 0368157e11cbe7d15828dccca78271d89d502ec4
2021-04-13 11:20:43 -07:00

115 lines
3.5 KiB
Python

import os
import sys
import tempfile
from functools import wraps
import torch
import torch.cuda
import torch.distributed as dist
import unittest
from torch.testing._internal.common_utils import TEST_WITH_TSAN
if not dist.is_available():
print("Distributed not available, skipping tests", file=sys.stderr)
sys.exit(0)
from torch.testing._internal.common_utils import TestCase, find_free_port, run_tests
from torch.distributed.distributed_c10d import _get_default_group
from torch.testing._internal.distributed.distributed_test import (
DistributedTest, TestDistBackend
)
torch.backends.cuda.matmul.allow_tf32 = False
CPP_EXTENSIONS_WARNING = """
Ninja (https://ninja-build.org) must be available to run C++ extensions tests,
but it could not be found. Install ninja with `pip install ninja`
or `conda install ninja`.
"""
BACKEND = os.environ["BACKEND"]
INIT_METHOD = os.getenv("INIT_METHOD", "env://")
def skip_if_no_ninja(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
import torch.utils.cpp_extension
torch.utils.cpp_extension.verify_ninja_availability()
except RuntimeError:
print(CPP_EXTENSIONS_WARNING)
return 0
return func(*args, **kwargs)
return wrapper
if BACKEND == "gloo" or BACKEND == "nccl":
@unittest.skipIf(
TEST_WITH_TSAN,
"TSAN is not fork-safe since we're forking in a multi-threaded environment",
)
class TestDistBackendWithFork(TestDistBackend, DistributedTest._DistTestBase):
def setUp(self):
super().setUp()
self._fork_processes()
torch.backends.cudnn.flags(allow_tf32=False).__enter__()
elif BACKEND == "mpi":
WORLD_SIZE = os.environ["WORLD_SIZE"]
dist.init_process_group(init_method=INIT_METHOD, backend="mpi")
class TestMPIWithFork(TestCase, DistributedTest._DistTestBase):
pass
elif BACKEND == "test":
class TestBackendDynamicLoad(TestCase):
def setUp(self):
super(TestBackendDynamicLoad, self).setUp()
def _load_test_backend(self):
temp_dir = tempfile.mkdtemp()
src = "{}/../cpp_extensions/cpp_c10d_extension.cpp".format(os.path.abspath(os.path.dirname(__file__)))
extension = torch.utils.cpp_extension.load(
name="torch_test",
sources=[src],
build_directory=temp_dir
)
@skip_if_no_ninja
def test_backend_apis(self):
self._load_test_backend()
os.environ['WORLD_SIZE'] = '1'
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = str(find_free_port())
os.environ['RANK'] = '0'
dist.init_process_group(backend='test', init_method='env://', world_size=1, rank=0)
self.assertEqual(dist.get_rank(), 0)
self.assertEqual(dist.get_world_size(), 1)
process_group = _get_default_group()
work = process_group.allreduce([torch.rand(1), torch.rand(1)])
self.assertTrue(work.wait())
self.assertTrue(work.is_completed())
self.assertTrue(work.is_success())
work = process_group.broadcast([torch.rand(1)])
self.assertTrue(work.wait())
self.assertTrue(work.is_completed())
self.assertTrue(work.is_success())
dist.destroy_process_group()
if __name__ == "__main__":
assert (
not torch.cuda._initialized
), "test_distributed must not have initialized CUDA context on main process"
run_tests()