Revert "Revert "Expandable blocks in allocator (#96995)"" (#99275)

This reverts commit 851e89c8e8.

Differential Revision: [D45034526](https://our.internmc.facebook.com/intern/diff/D45034526)
Pull Request resolved: https://github.com/pytorch/pytorch/pull/99275
Approved by: https://github.com/eellison
This commit is contained in:
Zachary DeVito 2023-04-16 19:05:07 -07:00 committed by PyTorch MergeBot
parent 99c6d46cf7
commit 7ff1f3f3f6
16 changed files with 974 additions and 79 deletions

View file

@ -623,6 +623,7 @@ include_patterns = [
exclude_patterns = [
'aten/src/ATen/test/**',
'c10/cuda/CUDAFunctions.h',
'c10/cuda/CUDACachingAllocator.cpp',
]
command = [
'python3',
@ -657,8 +658,8 @@ exclude_patterns = [
command = [
'python3',
'tools/linter/adapters/grep_linter.py',
'--pattern=cudaSetDevice',
'--pattern=cudaGetDevice',
'--pattern=cudaSetDevice(',
'--pattern=cudaGetDevice(',
'--linter-name=RAWCUDADEVICE',
'--error-name=raw CUDA API usage',
"""--error-description=\

View file

@ -29,6 +29,7 @@ set(C10_CUDA_SRCS
CUDAStream.cpp
impl/CUDAGuardImpl.cpp
impl/CUDATest.cpp
driver_api.cpp
)
set(C10_CUDA_HEADERS
CUDACachingAllocator.h
@ -56,6 +57,11 @@ endif()
# ---[ Dependency of c10_cuda
target_link_libraries(c10_cuda PUBLIC c10 torch::cudart)
if(NOT WIN32)
target_link_libraries(c10_cuda PRIVATE dl)
target_compile_options(c10_cuda PRIVATE "-DPYTORCH_EXPANDABLE_SEGMENTS_SUPPORTED")
endif()
target_include_directories(
c10_cuda PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../..>

File diff suppressed because it is too large Load diff

View file

@ -127,6 +127,7 @@ struct SegmentInfo {
int64_t active_size = 0;
cudaStream_t stream = 0;
bool is_large = false;
bool is_expandable = false;
MempoolId_t owner_private_pool_id = {0, 0};
std::vector<BlockInfo> blocks;
};
@ -145,6 +146,8 @@ struct TraceEntry {
SEGMENT_ALLOC, // a call to cudaMalloc to get more memory from the OS
SEGMENT_FREE, // a call to cudaFree to return memory to the OS (e.g. to
// defragment or empty_caches)
SEGMENT_MAP, // a call to cuMemMap (used with expandable_segments)
SEGMENT_UNMAP, // unmap part of a segment (used with expandable segments)
SNAPSHOT, // a call to snapshot, used to correlate memory snapshots to trace
// events
OOM // the allocator threw an OutOfMemoryError (addr_ is the amount of free

32
c10/cuda/driver_api.cpp Normal file
View file

@ -0,0 +1,32 @@
#if !defined(USE_ROCM) && defined(PYTORCH_EXPANDABLE_SEGMENTS_SUPPORTED)
#include <c10/cuda/driver_api.h>
#include <c10/util/Exception.h>
#include <dlfcn.h>
#include <iostream>
namespace c10 {
namespace cuda {
namespace {
DriverAPI create_driver_api() {
void* handle = dlopen("libcuda.so", RTLD_LAZY | RTLD_NOLOAD);
TORCH_INTERNAL_ASSERT(handle);
DriverAPI r;
#define LOOKUP_ENTRY(name) \
r.name##_ = ((decltype(&name))dlsym(handle, #name)); \
TORCH_INTERNAL_ASSERT(r.name##_)
C10_FORALL_DRIVER_API(LOOKUP_ENTRY)
#undef LOOKUP_ENTRY
return r;
}
} // namespace
DriverAPI* DriverAPI::get() {
static DriverAPI singleton = create_driver_api();
return &singleton;
}
} // namespace cuda
} // namespace c10
#endif

40
c10/cuda/driver_api.h Normal file
View file

@ -0,0 +1,40 @@
#pragma once
#include <cuda.h>
#define C10_CUDA_DRIVER_CHECK(EXPR) \
do { \
CUresult __err = EXPR; \
if (__err != CUDA_SUCCESS) { \
const char* err_str; \
CUresult get_error_str_err C10_UNUSED = \
c10::cuda::DriverAPI::get()->cuGetErrorString_(__err, &err_str); \
if (get_error_str_err != CUDA_SUCCESS) { \
AT_ERROR("CUDA driver error: unknown error"); \
} else { \
AT_ERROR("CUDA driver error: ", err_str); \
} \
} \
} while (0)
#define C10_FORALL_DRIVER_API(_) \
_(cuMemAddressReserve) \
_(cuMemRelease) \
_(cuMemMap) \
_(cuMemAddressFree) \
_(cuMemSetAccess) \
_(cuMemUnmap) \
_(cuMemCreate) \
_(cuGetErrorString)
namespace c10 {
namespace cuda {
struct DriverAPI {
#define CREATE_MEMBER(name) decltype(&name) name##_;
C10_FORALL_DRIVER_API(CREATE_MEMBER)
#undef CREATE_MEMBER
static DriverAPI* get();
};
} // namespace cuda
} // namespace c10

View file

@ -18,7 +18,10 @@ from torch.testing._internal.distributed.rpc_utils import (
TENSORPIPE_CUDA_TESTS,
generate_tests,
)
import torch
if torch.cuda.is_available():
torch.cuda.memory._set_allocator_settings('expandable_segments:False')
globals().update(
generate_tests(

View file

@ -17,6 +17,8 @@ from torch.testing._internal.common_utils import (
from torch.testing._internal.inductor_utils import HAS_CUDA
torch.set_float32_matmul_precision("high")
if HAS_CUDA:
torch.cuda.memory._set_allocator_settings("expandable_segments:False")
def benchmark_choice(choice, args, out, expected_out, timings):

View file

@ -259,6 +259,7 @@ CI_SERIAL_LIST = [
"test_cpp_api_parity",
"test_reductions",
"test_cuda",
"test_cuda_expandable_segments",
"test_jit_cuda_fuser", # OOM on test_issue_1785, also profiling?
"test_indexing",
"test_fx_backends",

View file

@ -17,6 +17,7 @@ import threading
import unittest
import warnings
import subprocess
import random
from random import randint
import torch
@ -99,11 +100,13 @@ class TestCuda(TestCase):
expected_each_device = collections.defaultdict(lambda: collections.defaultdict(int))
for segment in snapshot:
expandable = segment["is_expandable"]
expected = expected_each_device[segment["device"]]
pool_str = segment["segment_type"] + "_pool"
expected["segment.all.current"] += 1
expected["segment." + pool_str + ".current"] += 1
if not expandable:
expected["segment.all.current"] += 1
expected["segment." + pool_str + ".current"] += 1
expected["allocated_bytes.all.current"] += segment["allocated_size"]
expected["allocated_bytes." + pool_str + ".current"] += segment["allocated_size"]
@ -129,7 +132,7 @@ class TestCuda(TestCase):
expected["active.all.current"] += 1
expected["active." + pool_str + ".current"] += 1
if block["state"] == "inactive" and is_split:
if block["state"] == "inactive" and is_split and not expandable:
expected["inactive_split.all.current"] += 1
expected["inactive_split." + pool_str + ".current"] += 1
expected["inactive_split_bytes.all.current"] += block["size"]
@ -4976,7 +4979,7 @@ class TestCudaComm(TestCase):
del x
torch.cuda.empty_cache()
ss = torch.cuda.memory._snapshot()
self.assertTrue(ss['device_traces'][0][-1]['action'] == 'segment_free')
self.assertTrue(ss['device_traces'][0][-1]['action'] in ('segment_free', 'segment_unmap'))
finally:
torch.cuda.memory._record_memory_history(None)
@ -5231,6 +5234,39 @@ class TestCudaComm(TestCase):
torch.empty(1024 * 1024 * 1024 * 1024, device='cuda')
self.assertTrue(x)
def test_allocator_fuzz(self):
# fuzz
state = random.getstate()
random.seed(123)
N = 10000
try:
mem = []
total = 0
c = 0
def alloc():
nonlocal total, c
b = random.randrange(2 * 1024 * 1024 // 4, 200 * 1024 * 1024 // 4)
mem.append((c, torch.full((b,), c, dtype=torch.int32, device='cuda')))
c += 1
total += b
def free():
nonlocal total
idx = random.randrange(0, len(mem))
v, x = mem.pop(idx)
assert torch.all(v == x)
total -= x.numel()
choices = [alloc, free, torch.cuda.memory.empty_cache]
for i in range(N):
while total >= 1024 * 1024 * 1024 / 4:
free()
action, = random.choices(choices, weights=[1, 1 if mem else 0, .1])
action()
finally:
random.setstate(state)
@unittest.skipIf(TEST_PYNVML, "pynvml is not available")
def test_nvml_get_handler(self):
self.assertTrue(torch.cuda._get_pynvml_handler() is not None)

View file

@ -0,0 +1,12 @@
# Owner(s): ["module: cuda"]
# run time cuda tests, but with the allocator using expandable segments
import os
import torch
if torch.cuda.is_available():
torch.cuda.memory._set_allocator_settings('expandable_segments:True')
current_dir = os.path.dirname(os.path.abspath(__file__))
filepath = os.path.join(current_dir, 'test_cuda.py')
exec(compile(open(filepath, 'r').read(), filepath, mode='exec'))

View file

@ -78,6 +78,8 @@ load_tests = load_tests
# as well during the execution of this test suite, and it will cause
# CUDA OOM error on Windows.
TEST_CUDA = torch.cuda.is_available()
if TEST_CUDA:
torch.cuda.memory._set_allocator_settings('expandable_segments:False')
if not NO_MULTIPROCESSING_SPAWN:
# We want to use `spawn` if able because some of our tests check that the

View file

@ -18,6 +18,7 @@ from torch.testing._internal.common_utils import (TestCase, run_tests, IS_WINDOW
load_tests, slowTest, TEST_WITH_TSAN, TEST_WITH_TORCHDYNAMO,
TEST_WITH_ROCM, IS_MACOS)
# load_tests from common_utils is used to automatically filter tests for
# sharding on sandcastle. This line silences flake warnings
load_tests = load_tests
@ -31,6 +32,9 @@ TEST_CUDA_IPC = torch.cuda.is_available() and \
not TEST_WITH_ROCM # https://github.com/pytorch/pytorch/issues/90940
TEST_MULTIGPU = TEST_CUDA_IPC and torch.cuda.device_count() > 1
if TEST_CUDA_IPC:
torch.cuda.memory._set_allocator_settings('expandable_segments:False')
class SubProcess(mp.Process):
def __init__(self, tensor):

View file

@ -655,6 +655,7 @@ PyObject* THCPModule_memorySnapshot(PyObject* _unused, PyObject* noargs) {
py::str cpp_frames_s = "cpp_frames";
py::str history_s = "history";
py::str blocks_s = "blocks";
py::str is_expandable_s = "is_expandable";
std::vector<CapturedTraceback*> to_gather_frames;
std::vector<py::dict> to_gather_dest;
@ -672,6 +673,7 @@ PyObject* THCPModule_memorySnapshot(PyObject* _unused, PyObject* noargs) {
segmentDict[stream_s] = int64_t(segmentInfo.stream);
segmentDict[segment_type_s] = (segmentInfo.is_large ? large_s : small_s);
segmentDict[segment_pool_id] = segmentInfo.owner_private_pool_id;
segmentDict[is_expandable_s] = segmentInfo.is_expandable;
py::list blocks;
for (const auto& blockInfo : segmentInfo.blocks) {
@ -719,6 +721,9 @@ PyObject* THCPModule_memorySnapshot(PyObject* _unused, PyObject* noargs) {
py::str free_completed_s = "free_completed";
py::str segment_alloc_s = "segment_alloc";
py::str segment_free_s = "segment_free";
py::str segment_map_s = "segment_map";
py::str segment_unmap_s = "segment_unmap";
py::str snapshot_s = "snapshot";
py::str oom_s = "oom";
py::str device_free_s = "device_free";
@ -741,6 +746,10 @@ PyObject* THCPModule_memorySnapshot(PyObject* _unused, PyObject* noargs) {
return oom_s;
case TraceEntry::SNAPSHOT:
return snapshot_s;
case TraceEntry::SEGMENT_UNMAP:
return segment_unmap_s;
case TraceEntry::SEGMENT_MAP:
return segment_map_s;
}
throw std::runtime_error("unreachable");
};

View file

@ -144,6 +144,7 @@ std::string _memory_snapshot_pickled() {
IValue frames_s = "frames";
IValue history_s = "history";
IValue blocks_s = "blocks";
IValue is_expandable_s = "is_expandable";
auto empty_frames = new_list();
@ -164,6 +165,7 @@ std::string _memory_snapshot_pickled() {
segmentDict.insert(
segment_pool_id,
std::tuple<int64_t, int64_t>(segmentInfo.owner_private_pool_id));
segmentDict.insert(is_expandable_s, segmentInfo.is_expandable);
auto blocks = new_list();
for (const auto& blockInfo : segmentInfo.blocks) {
@ -210,6 +212,8 @@ std::string _memory_snapshot_pickled() {
IValue free_completed_s = "free_completed";
IValue segment_alloc_s = "segment_alloc";
IValue segment_free_s = "segment_free";
IValue segment_map_s = "segment_map";
IValue segment_unmap_s = "segment_unmap";
IValue snapshot_s = "snapshot";
IValue oom_s = "oom";
IValue device_free_s = "device_free";
@ -232,6 +236,10 @@ std::string _memory_snapshot_pickled() {
return oom_s;
case TraceEntry::SNAPSHOT:
return snapshot_s;
case TraceEntry::SEGMENT_UNMAP:
return segment_unmap_s;
case TraceEntry::SEGMENT_MAP:
return segment_map_s;
}
throw std::runtime_error("unreachable");
};

View file

@ -1035,6 +1035,8 @@ def segment_plot(data: Any, device=None):
}
for seg in data['segments']:
if seg['device'] != device:
continue
for k in segments.keys():
sk = segment_names.get(k, k)
segments[k].append(preproc.get(k, lambda x: x)(seg[sk]))
@ -1135,9 +1137,11 @@ function createEvents() {
t.version = block_version(t.addr, false)
break
case 'segment_free':
case 'segment_unmap':
t.version = segment_version(t.addr, true)
break;
case 'segment_alloc':
case 'segment_map':
t.version = segment_version(t.addr, false)
break
default:
@ -1221,7 +1225,7 @@ function formatSize(num) {
function formatEvent(event) {
function formatAddr(event) {
let version = event.version == 0 ? "" : `_${event.version}`
let prefix = (event.action == "segment_free" || event.action == "segment_alloc") ? "s" : "b"
let prefix = event.action.startsWith("segment") ? "s" : "b"
return `${prefix}${event.addr.toString(16)}_${event.version}`
}
let stream = event.stream == 0 ? "" : `\n (stream ${event.stream})`
@ -1276,14 +1280,15 @@ function MemoryView(outer, stack_info, trace_data, events) {
})
svg.call(seg_zoom)
let segments_map = {}
let sorted_segments = []
let block_map = {}
let segments_data = trace_data.segments
for (let [i, addr] of trace_data.segments.addr.entries()) {
segments_map[addr] = Segment(addr, segments_data.size[i], segments_data.stream[i],
null, trace_data.segment_version(addr, false))
sorted_segments.push(Segment(addr, segments_data.size[i], segments_data.stream[i],
null, trace_data.segment_version(addr, false)))
}
sorted_segments.sort((x, y) => x.addr - y.addr)
let blocks_data = trace_data.blocks
for (let [i, addr] of trace_data.blocks.addr.entries()) {
@ -1293,8 +1298,60 @@ function MemoryView(outer, stack_info, trace_data, events) {
}
function simulate_memory(idx) {
let l_segment_map = {...segments_map}
// create a copy of segments because we edit size properties below
let l_segments = sorted_segments.map((x) => { return {...x} })
let l_block_map = {...block_map}
function map_segment(merge, seg) {
let idx = l_segments.findIndex(e => e.addr > seg.addr)
if (!merge) {
l_segments.splice(idx, 0, seg)
return
}
if (idx == -1) {
idx = l_segments.length
}
l_segments.splice(idx, 0, seg)
if (idx + 1 < l_segments.length) {
let next = l_segments[idx + 1]
if (seg.addr + seg.size == next.addr && seg.stream == next.stream) {
seg.size += next.size
l_segments.splice(idx + 1, 1)
}
}
if (idx > 0) {
let prev = l_segments[idx - 1]
if (prev.addr + prev.size == seg.addr && prev.stream == seg.stream) {
prev.size += seg.size
l_segments.splice(idx, 1)
}
}
}
function unmap_segment(merge, seg) {
if (!merge) {
l_segments.splice(l_segments.findIndex(x => x.addr == seg.addr), 1)
return
}
let seg_end = seg.addr + seg.size
let idx = l_segments.findIndex(e => e.addr <= seg.addr && seg_end <= e.addr + e.size)
let existing = l_segments[idx]
let existing_end = existing.addr + existing.size
if (existing.addr == seg.addr) {
existing.addr += seg.size
existing.size -= seg.size
if (existing.size == 0) {
l_segments.splice(idx, 1)
}
} else if (existing_end == seg_end) {
existing.size -= seg.size
} else {
existing.size = seg.addr - existing.addr
seg.addr = seg_end
seg.size = existing_end - seg_end
l_segments.splice(idx + 1, 0, seg)
}
}
for (let i = events.length - 1; i > idx; i--) {
let event = events[i]
switch (event.action) {
@ -1311,10 +1368,14 @@ function MemoryView(outer, stack_info, trace_data, events) {
delete l_block_map[event.addr]
break
case 'segment_free':
l_segment_map[event.addr] = Segment(event.addr, event.size, event.stream, event.frames_idx, event.version)
case 'segment_unmap':
map_segment(event.action == 'segment_unmap',
Segment(event.addr, event.size, event.stream, event.frames_idx, event.version))
break
case 'segment_alloc':
delete l_segment_map[event.addr]
case 'segment_map':
unmap_segment(event.action == 'segment_map',
Segment(event.addr, event.size, event.stream, event.frames_idx, event.version))
break
case 'oom':
break
@ -1323,9 +1384,8 @@ function MemoryView(outer, stack_info, trace_data, events) {
break
}
}
let new_segments = Object.values(l_segment_map)
let new_blocks = Object.values(l_block_map)
return [new_segments, new_blocks]
return [l_segments, new_blocks]
}
return {
@ -1344,7 +1404,7 @@ function MemoryView(outer, stack_info, trace_data, events) {
let segments_by_addr = [...segments].sort((x, y) => x.addr - y.addr)
let max_size = segments.at(-1).size
let max_size = segments.length == 0 ? 0 : segments.at(-1).size
let xScale = scaleLinear([0, max_size], [0, 200])
let padding = xScale.invert(1)