diff --git a/c10/xpu/XPUStream.cpp b/c10/xpu/XPUStream.cpp index 1bd8cb862f7..65d19a3324f 100644 --- a/c10/xpu/XPUStream.cpp +++ b/c10/xpu/XPUStream.cpp @@ -37,27 +37,30 @@ thread_local std::unique_ptr current_streams = nullptr; // ~~~~~~~~~~~~~~~~~~~~~~~~~~ // How do we assign stream IDs? // -// -- 56 bits -- -- 5 bits ----- -- 3 bits -- -- 1 bits -- -// zeros StreamIdIndex StreamIdType Ext/native stream +// -- 55 bits -- -- 5 bits -- -- 3 bits -- -- 1 bit -- +// zeros StreamIdIndex StreamIdType Ext/native stream +// ignored for ext ignored for ext // // Where StreamIdType: // 000 = normal priority queue // 001 = high priority queue +// 111 = external queue // -// for external stream, StreamID is a sycl::queue* pointer -// this means that last bit will always be 0 -// so when constructing StreamId for a native stream we set last bit to 1 -// to distinguish between native and external streams +// For external stream, StreamID is a sycl::queue* pointer. This means that last +// bit will always be 0. So when constructing StreamId for a native stream we +// set last bit to 1 to distinguish between native and external streams. For +// more details, see Note [External XPU Stream]. // // StreamId is 64-bit, so we can just rely on regular promotion rules. // We rely on StreamIdIndex and StreamIdType being non-negative; using StreamIdIndex = uint8_t; enum class StreamIdType : uint8_t { - // The higher the type number, the higher the priority. - // EXT is used for external streams, which we don't know the priority of. + // The higher the type number, the higher the priority for the native stream. NORMAL = 0x0, HIGH = 0X1, + // For an external stream, the last bit of StreamId is 0, whose priority is + // queried at runtime. EXT = 0x7, }; @@ -76,9 +79,9 @@ inline std::ostream& operator<<(std::ostream& stream, StreamIdType q) { } inline StreamIdType streamIdType(StreamId s) { - // Externally allocated streams have their id being the sycl:queue* pointer - // so the last bit will be 0 - if ((!(s & 1) && s)) { + // Externally allocated streams have their id being the sycl:queue* pointer. + // So the last bit will be 0. + if ((!(s & 1))) { return StreamIdType(StreamIdType::EXT); } int mask_for_type = (1 << kStreamTypeBits) - 1; @@ -180,13 +183,16 @@ XPUStream XPUStreamForId(DeviceIndex device_index, StreamId stream_id) { int XPUStream::priority() const { StreamId stream_id = stream_.id(); StreamIdType st = streamIdType(stream_id); - // For an external queue which is not created in XPUStream, we can not trace - // the priority. Workaround here since sycl doesn't support get priority from - // a sycl::queue, like cudaStreamGetPriority . - // TODO: remove this workaround when sycl supports get priority from a - // sycl::queue. - if (st == StreamIdType::EXT) { + if (C10_UNLIKELY(st == StreamIdType::EXT)) { + // Query external stream priority + using namespace sycl::ext::oneapi::property; + // Default priority for SYCL queue is normal. st = StreamIdType::NORMAL; + if (queue().has_property()) { + st = StreamIdType::NORMAL; + } else if (queue().has_property()) { + st = StreamIdType::HIGH; + } } // StreamIdType and priority number are inversely related. return -static_cast(st); @@ -199,11 +205,12 @@ sycl::queue& XPUStream::queue() const { StreamIdType st = streamIdType(stream_id); StreamIdIndex si = streamIdIndex(stream_id); switch (st) { - case StreamIdType::EXT: - return *(reinterpret_cast(stream_id)); case StreamIdType::NORMAL: case StreamIdType::HIGH: return *streams[device_index][static_cast(st)][si]; + // See Note [External XPU Stream] + case StreamIdType::EXT: + return *(reinterpret_cast(stream_id)); default: TORCH_CHECK( false, @@ -245,13 +252,54 @@ XPUStream getStreamFromPool(const bool isHighPriority, DeviceIndex device) { return getStreamFromPool(priority, device); } +/* + * Note [External XPU Stream] + * + * An external XPUStream is a wrapper around an external SYCL queue that was not + * created by PyTorch. This design enables interoperability with other libraries + * by allowing PyTorch to work seamlessly with SYCL queues created outside of + * its control. + * + * Key design requirements include: + * 1. Allowing retrieval of the its SYCL queue from the external XPUStream. + * 2. Supporting conversion between an external XPUStream and a `c10::Stream`. + * 3. Ensuring compatibility with the `get/setCurrentXPUStream` methods. + * 4. Enabling memory caching allocation through the external XPUStream. + * + * To address requirements (1) and (2), we associate the external SYCL queue + * pointer with the `stream_id`. It is the user's responsibility to ensure that + * the referenced SYCL queue remains alive while the corresponding XPUStream, or + * any c10::Stream derived from it, is in use. + * + * However, this approach introduces the following limitations: + * + * 1. Different SYCL queue pointers will result in distinct XPUStream + * instances, even if the SYCL queues they dereference are equivalent. + * 2. Memory blocks allocated by one external XPUStream CANNOT be reused by + * other non-equivalent XPUStreams, even if they originate from the same SYCL + * queue object. + */ + XPUStream getStreamFromExternal( - sycl::queue* ext_stream, + sycl::queue* ext_queue, DeviceIndex device_index) { // The sycl::queue* will be the actual id - TORCH_CHECK(ext_stream, "External stream must not be a nullptr."); - return XPUStreamForId(device_index, reinterpret_cast(ext_stream)); + TORCH_CHECK(ext_queue, "External sycl::queue* must not be a nullptr."); + TORCH_CHECK( + ext_queue->is_in_order(), "External SYCL queue must be in-order."); + TORCH_CHECK( + ext_queue->get_context() == c10::xpu::get_device_context(), + "External SYCL queue must be created with the same context as the PyTorch XPU used."); + TORCH_CHECK( + ext_queue->get_device() == c10::xpu::get_raw_device(device_index), + "External SYCL queue doesn't match the given device index."); + StreamId stream_id = reinterpret_cast(ext_queue); + TORCH_CHECK( + !(stream_id & 1), + "External sycl::queue* must have the last bit set to 0. ", + "You can file an issue at https://github.com/pytorch/pytorch/issues to describe your use case."); + return XPUStreamForId(device_index, stream_id); } // Note: The stream pools will be initialized if needed, at the first invocation diff --git a/c10/xpu/XPUStream.h b/c10/xpu/XPUStream.h index 987800b1299..05603640263 100644 --- a/c10/xpu/XPUStream.h +++ b/c10/xpu/XPUStream.h @@ -158,14 +158,21 @@ C10_XPU_API XPUStream getStreamFromPool(const int priority, DeviceIndex device = -1); /** - * Get a XPUStream from a externally allocated one. + * Get an XPUStream from an external SYCL queue. * - * This is mainly for interoperability with different libraries where we - * want to operate on a non-torch allocated stream for data exchange or similar - * purposes + * This function allows interoperability with other libraries by enabling + * the use of an external SYCL queue that was not created by PyTorch. This + * can be useful for data exchange or other operations where integration + * with non-PyTorch queues is required. + * + * NOTE: It is the user's responsibility to ensure that the referenced SYCL + * queue remains alive while the corresponding XPUStream, or any c10::Stream + * derived from it, is in use. The different SYCL queue pointers will result in + * distinct XPUStream instances, even if the SYCL queues they dereference are + * equivalent. */ -C10_API XPUStream -getStreamFromExternal(sycl::queue* ext_stream, DeviceIndex device_index); +C10_XPU_API XPUStream +getStreamFromExternal(sycl::queue* ext_queue, DeviceIndex device_index); /** * Get the current XPU stream, for the passed XPU device, or for the current diff --git a/c10/xpu/test/impl/XPUCachingAllocatorTest.cpp b/c10/xpu/test/impl/XPUCachingAllocatorTest.cpp index a337126f061..5875fc0ceb2 100644 --- a/c10/xpu/test/impl/XPUCachingAllocatorTest.cpp +++ b/c10/xpu/test/impl/XPUCachingAllocatorTest.cpp @@ -2,6 +2,7 @@ #include #include +#include bool has_xpu() { return c10::xpu::device_count() > 0; @@ -75,6 +76,46 @@ TEST(XPUCachingAllocatorTest, AllocateMemory) { for (const auto i : c10::irange(numel)) { EXPECT_EQ(hostData[i], i); } + c10::xpu::XPUCachingAllocator::emptyCache(); +} + +TEST(XPUCachingAllocatorTest, DeviceCachingAllocateByExternalStream) { + c10::xpu::XPUCachingAllocator::emptyCache(); + auto* allocator = c10::xpu::XPUCachingAllocator::get(); + sycl::queue* ext_queue = new sycl::queue( + c10::xpu::get_device_context(), + c10::xpu::get_raw_device(0), + c10::xpu::asyncHandler, + {sycl::property::queue::in_order()}); + // 500M memory is reserved, can be reused later. + { + c10::xpu::XPUStream ext_stream = + c10::xpu::getStreamFromExternal(ext_queue, 0); + c10::xpu::setCurrentXPUStream(ext_stream); + auto _500mb = 500 * 1024 * 1024; + auto cache = allocator->allocate(_500mb); + } + auto _10mb = 10 * 1024 * 1024; + auto buffer = allocator->allocate(_10mb); + void* ptr0 = buffer.get(); + // tmp is not allocated via device caching allocator. + void* tmp = sycl::aligned_alloc_device( + 512, _10mb, c10::xpu::get_raw_device(0), c10::xpu::get_device_context()); + void* ptr1 = c10::xpu::XPUCachingAllocator::raw_alloc(_10mb); + // We have reserved 500M of memory for resue. When allocating `ptr0` and + // `ptr1` through the device caching allocator, they should be allocated from + // the same block. Specifically, `ptr1` should follow immediately after `ptr0` + // in the block, forming a sequence like [ptr0, ptr1]. This behavior occurs + // because the `tmp` pointer is not allocated through the device caching + // allocator, meaning it cannot reuse the reserved memory. As a result, the + // offset between `ptr0` and `ptr1` should match the size of `ptr0` (10M in + // this case). + auto diff = static_cast(ptr1) - static_cast(ptr0); + EXPECT_EQ(diff, _10mb); + c10::xpu::XPUCachingAllocator::raw_delete(ptr1); + sycl::free(tmp, c10::xpu::get_device_context()); + delete ext_queue; + c10::xpu::XPUCachingAllocator::emptyCache(); } int main(int argc, char* argv[]) { diff --git a/c10/xpu/test/impl/XPUStreamTest.cpp b/c10/xpu/test/impl/XPUStreamTest.cpp index c1f2c884955..54ae4f5d8ab 100644 --- a/c10/xpu/test/impl/XPUStreamTest.cpp +++ b/c10/xpu/test/impl/XPUStreamTest.cpp @@ -202,6 +202,7 @@ TEST(XPUStreamTest, ExternalTest) { at::xpu::setCurrentXPUStream(myStream); at::xpu::XPUStream curStream = at::xpu::getCurrentXPUStream(); + EXPECT_EQ(myStream.priority(), 0); ASSERT_TRUE(curStream == myStream); ASSERT_TRUE(&(curStream.queue()) == stream); @@ -230,7 +231,7 @@ TEST(XPUStreamTest, ExternalMultiDeviceTest) { } { c10::DeviceGuard device_guard(c10::Device(c10::DeviceType::XPU, 1)); - stream_0 = new sycl::queue( + stream_1 = new sycl::queue( c10::xpu::get_device_context(), c10::xpu::get_raw_device(1), c10::xpu::asyncHandler, @@ -247,4 +248,28 @@ TEST(XPUStreamTest, ExternalMultiDeviceTest) { delete stream_0; delete stream_1; -} \ No newline at end of file +} + +TEST(XPUStreamTest, ExternalStreamDifferentPointersTest) { + if (!has_xpu()) { + return; + } + + using namespace sycl::ext::oneapi::property; + sycl::queue ext_queue = sycl::queue( + c10::xpu::get_device_context(), + c10::xpu::get_raw_device(0), + c10::xpu::asyncHandler, + {sycl::property::queue::in_order(), queue::priority_normal()}); + + // Ponters to queue and its copies will lead to distinct external XPUStreams. + auto queue_ptr1 = std::make_unique(ext_queue); + auto queue_ptr2 = std::make_unique(ext_queue); + + at::xpu::XPUStream myStream1 = + at::xpu::getStreamFromExternal(queue_ptr1.get(), 0); + at::xpu::XPUStream myStream2 = + at::xpu::getStreamFromExternal(queue_ptr2.get(), 0); + + EXPECT_NE(myStream1, myStream2); +}