Test distributed backends in parallel (#84034)

This allows multiple backends (nccl, gloo) to be tested in parallel and speed up the process. The improvement is mainly in the 1st distributed CUDA shard where the long pole `distributed/test_distributed_spawn` test is executed:

* [linux-bionic-cuda11.6-py3.10-gcc7 / test (distributed, 1, 2, linux.8xlarge.nvidia.gpu)](https://github.com/pytorch/pytorch/runs/8007596825?check_suite_focus=true#logs) takes 1h24m. This is better than the current average expectation of 2h12m

On the other hand, there is no improvement for the following two jobs:

* [linux-focal-py3.7-gcc7 / test (distributed, 1, 1, linux.2xlarge)](https://github.com/pytorch/pytorch/runs/8007417353?check_suite_focus=true#logs) takes 1h47m
* [linux-bionic-cuda11.6-py3.10-gcc7 / test (distributed, 2, 2, linux.8xlarge.nvidia.gpu)](https://github.com/pytorch/pytorch/runs/8007596870?check_suite_focus=true#logs) takes 1h40m

This is still a gain though because it allows us to add more shards for distributed test if needed.

Issue https://github.com/pytorch/pytorch/issues/83694
Pull Request resolved: https://github.com/pytorch/pytorch/pull/84034
Approved by: https://github.com/wanchaol
This commit is contained in:
Huy Do 2022-08-30 19:06:49 +00:00 committed by PyTorch MergeBot
parent 641c395251
commit 3ae5be74ac
2 changed files with 55 additions and 16 deletions

View file

@ -27,6 +27,7 @@ from torch.testing._internal.common_utils import (
parser as common_parser,
)
import torch.distributed as dist
from torch.multiprocessing import Pool
REPO_ROOT = pathlib.Path(__file__).resolve().parent.parent
@ -360,7 +361,12 @@ def get_executable_command(options, allow_pytest, disable_coverage=False):
def run_test(
test_module, test_directory, options, launcher_cmd=None, extra_unittest_args=None
test_module,
test_directory,
options,
launcher_cmd=None,
extra_unittest_args=None,
env=None,
):
unittest_args = options.additional_unittest_args.copy()
if options.verbose:
@ -391,7 +397,7 @@ def run_test(
command = (launcher_cmd or []) + executable + argv
print_to_stderr("Executing {} ... [{}]".format(command, datetime.now()))
return shell(command, test_directory)
return shell(command, test_directory, env=env)
def test_cuda_primary_ctx(test_module, test_directory, options):
@ -482,15 +488,24 @@ def test_distributed(test_module, test_directory, options):
if options.verbose and not mpi_available:
print_to_stderr("MPI not available -- MPI backend tests will be skipped")
config = DISTRIBUTED_TESTS_CONFIG
for backend, env_vars in config.items():
if sys.platform == "win32" and backend != "gloo":
continue
if backend == "mpi" and not mpi_available:
continue
for with_init_file in {True, False}:
for with_init_file in {True, False}:
# Run all distributed backends in parallel, trying to run env/file init
# methods in parallel too ends in failures in which the subprocesses
# timeout
pool = Pool(processes=len(config))
return_codes = []
tmp_dirs = []
for backend, env_vars in config.items():
if sys.platform == "win32" and backend != "gloo":
continue
if backend == "mpi" and not mpi_available:
continue
if sys.platform == "win32" and not with_init_file:
continue
tmp_dir = tempfile.mkdtemp()
tmp_dirs.append(tmp_dir)
if options.verbose:
init_str = "with {} init_method"
with_init = init_str.format("file" if with_init_file else "env")
@ -510,6 +525,7 @@ def test_distributed(test_module, test_directory, options):
else:
init_method = f"{FILE_SCHEMA}{tmp_dir}/shared_init_file"
os.environ["INIT_METHOD"] = init_method
try:
os.mkdir(os.path.join(tmp_dir, "barrier"))
os.mkdir(os.path.join(tmp_dir, "test_dir"))
@ -540,18 +556,41 @@ def test_distributed(test_module, test_directory, options):
)
mpiexec = ["mpiexec", "-n", "3", noprefix_opt, allowrunasroot_opt]
return_code = run_test(
test_module, test_directory, options, launcher_cmd=mpiexec
return_code = pool.apply_async(
run_test,
args=(test_module, test_directory, options),
kwds={
"launcher_cmd": mpiexec,
"env": os.environ.copy(),
}
)
else:
return_code = run_test(test_module, test_directory, options, extra_unittest_args=["--subprocess"])
if return_code != 0:
return return_code
return_code = pool.apply_async(
run_test,
args=(test_module, test_directory, options),
kwds={
"extra_unittest_args": ["--subprocess"],
"env": os.environ.copy(),
}
)
return_codes.append(return_code)
finally:
shutil.rmtree(tmp_dir)
os.environ.clear()
os.environ.update(old_environ)
pool.close()
# Close the pool and wait for all the processes to finish
pool.join()
for tmp_dir in tmp_dirs:
shutil.rmtree(tmp_dir)
for return_code in return_codes:
if return_code.get() != 0:
return return_code
return 0

View file

@ -503,7 +503,7 @@ class TestDistBackend(MultiProcessTestCase):
@classmethod
def setUpClass(cls):
os.environ["MASTER_ADDR"] = str(MASTER_ADDR)
os.environ["MASTER_PORT"] = str(MASTER_PORT)
# Not setting MASTER_PORT and get a random free port
super().setUpClass()
def setUp(self):