diff --git a/sycl/plugins/cuda/pi_cuda.cpp b/sycl/plugins/cuda/pi_cuda.cpp index 777e97c0f2570..0c962048262b1 100644 --- a/sycl/plugins/cuda/pi_cuda.cpp +++ b/sycl/plugins/cuda/pi_cuda.cpp @@ -96,20 +96,20 @@ pi_result forLatestEvents(const pi_event *event_wait_list, std::sort(events.begin(), events.end(), [](pi_event e0, pi_event e1) { // Tiered sort creating sublists of streams (smallest value first) in which // the corresponding events are sorted into a sequence of newest first. - return e0->get_queue()->stream_ < e1->get_queue()->stream_ || - (e0->get_queue()->stream_ == e1->get_queue()->stream_ && + return e0->get_stream() < e1->get_stream() || + (e0->get_stream() == e1->get_stream() && e0->get_event_id() > e1->get_event_id()); }); bool first = true; CUstream lastSeenStream = 0; for (pi_event event : events) { - if (!event || (!first && event->get_queue()->stream_ == lastSeenStream)) { + if (!event || (!first && event->get_stream() == lastSeenStream)) { continue; } first = false; - lastSeenStream = event->get_queue()->stream_; + lastSeenStream = event->get_stream(); auto result = f(event); if (result != PI_SUCCESS) { @@ -288,6 +288,32 @@ void guessLocalWorkSize(size_t *threadsPerBlock, const size_t *global_work_size, } } +pi_result enqueueEventsWait(pi_queue command_queue, CUstream stream, + pi_uint32 num_events_in_wait_list, + const pi_event *event_wait_list) { + if (!event_wait_list) { + return PI_SUCCESS; + } + try { + ScopedContext active(command_queue->get_context()); + + auto result = forLatestEvents( + event_wait_list, num_events_in_wait_list, + [stream](pi_event event) -> pi_result { + return PI_CHECK_ERROR(cuStreamWaitEvent(stream, event->get(), 0)); + }); + + if (result != PI_SUCCESS) { + return result; + } + return PI_SUCCESS; + } catch (pi_result err) { + return err; + } catch (...) { + return PI_ERROR_UNKNOWN; + } +} + } // anonymous namespace /// ------ Error handling, matching OpenCL plugin semantics. @@ -341,10 +367,42 @@ pi_result cuda_piEventRetain(pi_event event); /// \endcond -_pi_event::_pi_event(pi_command_type type, pi_context context, pi_queue queue) +CUstream _pi_queue::get_next_compute_stream() { + if (num_compute_streams_ < compute_streams_.size()) { + // the check above is for performance - so as not to lock mutex every time + std::lock_guard guard(compute_stream_mutex_); + // The second check is done after mutex is locked so other threads can not + // change num_compute_streams_ after that + if (num_compute_streams_ < compute_streams_.size()) { + PI_CHECK_ERROR( + cuStreamCreate(&compute_streams_[num_compute_streams_++], flags_)); + } + } + return compute_streams_[compute_stream_idx_++ % compute_streams_.size()]; +} + +CUstream _pi_queue::get_next_transfer_stream() { + if (transfer_streams_.empty()) { // for example in in-order queue + return get_next_compute_stream(); + } + if (num_transfer_streams_ < transfer_streams_.size()) { + // the check above is for performance - so as not to lock mutex every time + std::lock_guard guard(transfer_stream_mutex_); + // The second check is done after mutex is locked so other threads can not + // change num_transfer_streams_ after that + if (num_transfer_streams_ < transfer_streams_.size()) { + PI_CHECK_ERROR( + cuStreamCreate(&transfer_streams_[num_transfer_streams_++], flags_)); + } + } + return transfer_streams_[transfer_stream_idx_++ % transfer_streams_.size()]; +} + +_pi_event::_pi_event(pi_command_type type, pi_context context, pi_queue queue, + CUstream stream) : commandType_{type}, refCount_{1}, hasBeenWaitedOn_{false}, isRecorded_{false}, isStarted_{false}, evEnd_{nullptr}, evStart_{nullptr}, - evQueued_{nullptr}, queue_{queue}, context_{context} { + evQueued_{nullptr}, queue_{queue}, stream_{stream}, context_{context} { bool profilingEnabled = queue_->properties_ & PI_QUEUE_PROFILING_ENABLE; @@ -377,7 +435,7 @@ pi_result _pi_event::start() { if (queue_->properties_ & PI_QUEUE_PROFILING_ENABLE) { // NOTE: This relies on the default stream to be unused. result = PI_CHECK_ERROR(cuEventRecord(evQueued_, 0)); - result = PI_CHECK_ERROR(cuEventRecord(evStart_, queue_->get())); + result = PI_CHECK_ERROR(cuEventRecord(evStart_, stream_)); } } catch (pi_result error) { result = error; @@ -441,15 +499,13 @@ pi_result _pi_event::record() { return PI_INVALID_QUEUE; } - CUstream cuStream = queue_->get(); - try { eventId_ = queue_->get_next_event_id(); if (eventId_ == 0) { cl::sycl::detail::pi::die( "Unrecoverable program state reached in event identifier overflow"); } - result = PI_CHECK_ERROR(cuEventRecord(evEnd_, cuStream)); + result = PI_CHECK_ERROR(cuEventRecord(evEnd_, stream_)); } catch (pi_result error) { result = error; } @@ -490,9 +546,9 @@ pi_result enqueueEventWait(pi_queue queue, pi_event event) { // for native events, the cuStreamWaitEvent call is used. // This makes all future work submitted to stream wait for all // work captured in event. - if (queue->get() != event->get_queue()->get()) { - return PI_CHECK_ERROR(cuStreamWaitEvent(queue->get(), event->get(), 0)); - } + queue->for_each_stream([e = event->get()](CUstream s) { + PI_CHECK_ERROR(cuStreamWaitEvent(s, e, 0)); + }); return PI_SUCCESS; } @@ -2217,8 +2273,6 @@ pi_result cuda_piextMemCreateWithNativeHandle(pi_native_handle nativeHandle, pi_result cuda_piQueueCreate(pi_context context, pi_device device, pi_queue_properties properties, pi_queue *queue) { try { - pi_result err = PI_SUCCESS; - std::unique_ptr<_pi_queue> queueImpl{nullptr}; if (context->get_device() != device) { @@ -2226,11 +2280,7 @@ pi_result cuda_piQueueCreate(pi_context context, pi_device device, return PI_INVALID_DEVICE; } - ScopedContext active(context); - - CUstream cuStream; unsigned int flags = 0; - if (properties == __SYCL_PI_CUDA_USE_DEFAULT_STREAM) { flags = CU_STREAM_DEFAULT; } else if (properties == __SYCL_PI_CUDA_SYNC_WITH_DEFAULT) { @@ -2239,13 +2289,17 @@ pi_result cuda_piQueueCreate(pi_context context, pi_device device, flags = CU_STREAM_NON_BLOCKING; } - err = PI_CHECK_ERROR(cuStreamCreate(&cuStream, flags)); - if (err != PI_SUCCESS) { - return err; - } + const bool is_out_of_order = + properties & PI_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE; + + std::vector computeCuStreams( + is_out_of_order ? _pi_queue::default_num_compute_streams : 1); + std::vector transferCuStreams( + is_out_of_order ? _pi_queue::default_num_transfer_streams : 0); queueImpl = std::unique_ptr<_pi_queue>( - new _pi_queue{cuStream, context, device, properties}); + new _pi_queue{std::move(computeCuStreams), std::move(transferCuStreams), + context, device, properties, flags}); *queue = queueImpl.release(); @@ -2305,9 +2359,10 @@ pi_result cuda_piQueueRelease(pi_queue command_queue) { ScopedContext active(command_queue->get_context()); - auto stream = queueImpl->stream_; - PI_CHECK_ERROR(cuStreamSynchronize(stream)); - PI_CHECK_ERROR(cuStreamDestroy(stream)); + command_queue->for_each_stream([](CUstream s) { + PI_CHECK_ERROR(cuStreamSynchronize(s)); + PI_CHECK_ERROR(cuStreamDestroy(s)); + }); return PI_SUCCESS; } catch (pi_result err) { @@ -2318,16 +2373,17 @@ pi_result cuda_piQueueRelease(pi_queue command_queue) { } pi_result cuda_piQueueFinish(pi_queue command_queue) { - - // set default result to a negative result (avoid false-positve tests) - pi_result result = PI_OUT_OF_HOST_MEMORY; + pi_result result = PI_SUCCESS; try { assert(command_queue != nullptr); // need PI_ERROR_INVALID_EXTERNAL_HANDLE error code ScopedContext active(command_queue->get_context()); - result = PI_CHECK_ERROR(cuStreamSynchronize(command_queue->stream_)); + + command_queue->for_each_stream([&result](CUstream s) { + result = PI_CHECK_ERROR(cuStreamSynchronize(s)); + }); } catch (pi_result err) { @@ -2357,7 +2413,9 @@ pi_result cuda_piQueueFlush(pi_queue command_queue) { /// \return PI_SUCCESS pi_result cuda_piextQueueGetNativeHandle(pi_queue queue, pi_native_handle *nativeHandle) { - *nativeHandle = reinterpret_cast(queue->get()); + ScopedContext active(queue->get_context()); + *nativeHandle = + reinterpret_cast(queue->get_next_compute_stream()); return PI_SUCCESS; } @@ -2391,19 +2449,19 @@ pi_result cuda_piEnqueueMemBufferWrite(pi_queue command_queue, pi_mem buffer, assert(buffer != nullptr); assert(command_queue != nullptr); pi_result retErr = PI_SUCCESS; - CUstream cuStream = command_queue->get(); CUdeviceptr devPtr = buffer->mem_.buffer_mem_.get(); std::unique_ptr<_pi_event> retImplEv{nullptr}; try { ScopedContext active(command_queue->get_context()); + CUstream cuStream = command_queue->get_next_transfer_stream(); - retErr = cuda_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + retErr = enqueueEventsWait(command_queue, cuStream, num_events_in_wait_list, + event_wait_list); if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_WRITE, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_WRITE, command_queue, cuStream)); retImplEv->start(); } @@ -2437,19 +2495,19 @@ pi_result cuda_piEnqueueMemBufferRead(pi_queue command_queue, pi_mem buffer, assert(buffer != nullptr); assert(command_queue != nullptr); pi_result retErr = PI_SUCCESS; - CUstream cuStream = command_queue->get(); CUdeviceptr devPtr = buffer->mem_.buffer_mem_.get(); std::unique_ptr<_pi_event> retImplEv{nullptr}; try { ScopedContext active(command_queue->get_context()); + CUstream cuStream = command_queue->get_next_transfer_stream(); - retErr = cuda_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + retErr = enqueueEventsWait(command_queue, cuStream, num_events_in_wait_list, + event_wait_list); if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_READ, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_READ, command_queue, cuStream)); retImplEv->start(); } @@ -2767,13 +2825,11 @@ pi_result cuda_piEnqueueKernelLaunch( std::unique_ptr<_pi_event> retImplEv{nullptr}; - CUstream cuStream = command_queue->get(); + CUstream cuStream = command_queue->get_next_compute_stream(); CUfunction cuFunc = kernel->get(); - if (event_wait_list) { - retError = cuda_piEnqueueEventsWait( - command_queue, num_events_in_wait_list, event_wait_list, nullptr); - } + retError = enqueueEventsWait(command_queue, cuStream, + num_events_in_wait_list, event_wait_list); // Set the implicit global offset parameter if kernel has offset variant if (kernel->get_with_offset_parameter()) { @@ -2795,7 +2851,7 @@ pi_result cuda_piEnqueueKernelLaunch( if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_NDRANGE_KERNEL, command_queue)); + PI_COMMAND_TYPE_NDRANGE_KERNEL, command_queue, cuStream)); retImplEv->start(); } @@ -3608,7 +3664,8 @@ pi_result cuda_piEnqueueEventsWaitWithBarrier(pi_queue command_queue, } if (event) { - *event = _pi_event::make_native(PI_COMMAND_TYPE_MARKER, command_queue); + *event = _pi_event::make_native(PI_COMMAND_TYPE_MARKER, command_queue, + command_queue->get_next_compute_stream()); (*event)->start(); (*event)->record(); } @@ -3858,19 +3915,19 @@ pi_result cuda_piEnqueueMemBufferReadRect( assert(command_queue != nullptr); pi_result retErr = PI_SUCCESS; - CUstream cuStream = command_queue->get(); CUdeviceptr devPtr = buffer->mem_.buffer_mem_.get(); std::unique_ptr<_pi_event> retImplEv{nullptr}; try { ScopedContext active(command_queue->get_context()); + CUstream cuStream = command_queue->get_next_transfer_stream(); - retErr = cuda_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + retErr = enqueueEventsWait(command_queue, cuStream, num_events_in_wait_list, + event_wait_list); if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_READ_RECT, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_READ_RECT, command_queue, cuStream)); retImplEv->start(); } @@ -3909,19 +3966,18 @@ pi_result cuda_piEnqueueMemBufferWriteRect( assert(command_queue != nullptr); pi_result retErr = PI_SUCCESS; - CUstream cuStream = command_queue->get(); CUdeviceptr devPtr = buffer->mem_.buffer_mem_.get(); std::unique_ptr<_pi_event> retImplEv{nullptr}; try { ScopedContext active(command_queue->get_context()); - - retErr = cuda_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + CUstream cuStream = command_queue->get_next_transfer_stream(); + retErr = enqueueEventsWait(command_queue, cuStream, num_events_in_wait_list, + event_wait_list); if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_WRITE_RECT, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_WRITE_RECT, command_queue, cuStream)); retImplEv->start(); } @@ -3962,21 +4018,18 @@ pi_result cuda_piEnqueueMemBufferCopy(pi_queue command_queue, pi_mem src_buffer, try { ScopedContext active(command_queue->get_context()); - - if (event_wait_list) { - cuda_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); - } - pi_result result; + auto stream = command_queue->get_next_transfer_stream(); + result = enqueueEventsWait(command_queue, stream, num_events_in_wait_list, + event_wait_list); + if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_COPY, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_COPY, command_queue, stream)); result = retImplEv->start(); } - auto stream = command_queue->get(); auto src = src_buffer->mem_.buffer_mem_.get() + src_offset; auto dst = dst_buffer->mem_.buffer_mem_.get() + dst_offset; @@ -4008,20 +4061,19 @@ pi_result cuda_piEnqueueMemBufferCopyRect( assert(command_queue != nullptr); pi_result retErr = PI_SUCCESS; - CUstream cuStream = command_queue->get(); CUdeviceptr srcPtr = src_buffer->mem_.buffer_mem_.get(); CUdeviceptr dstPtr = dst_buffer->mem_.buffer_mem_.get(); std::unique_ptr<_pi_event> retImplEv{nullptr}; try { ScopedContext active(command_queue->get_context()); - - retErr = cuda_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); + CUstream cuStream = command_queue->get_next_transfer_stream(); + retErr = enqueueEventsWait(command_queue, cuStream, num_events_in_wait_list, + event_wait_list); if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_COPY_RECT, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_COPY_RECT, command_queue, cuStream)); retImplEv->start(); } @@ -4069,21 +4121,18 @@ pi_result cuda_piEnqueueMemBufferFill(pi_queue command_queue, pi_mem buffer, try { ScopedContext active(command_queue->get_context()); - if (event_wait_list) { - cuda_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); - } - + auto stream = command_queue->get_next_transfer_stream(); pi_result result; + result = enqueueEventsWait(command_queue, stream, num_events_in_wait_list, + event_wait_list); if (event) { retImplEv = std::unique_ptr<_pi_event>(_pi_event::make_native( - PI_COMMAND_TYPE_MEM_BUFFER_FILL, command_queue)); + PI_COMMAND_TYPE_MEM_BUFFER_FILL, command_queue, stream)); result = retImplEv->start(); } auto dstDevice = buffer->mem_.buffer_mem_.get() + offset; - auto stream = command_queue->get(); auto N = size / pattern_size; // pattern size in bytes @@ -4244,15 +4293,12 @@ pi_result cuda_piEnqueueMemImageRead( assert(image->mem_type_ == _pi_mem::mem_type::surface); pi_result retErr = PI_SUCCESS; - CUstream cuStream = command_queue->get(); try { ScopedContext active(command_queue->get_context()); - - if (event_wait_list) { - cuda_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); - } + CUstream cuStream = command_queue->get_next_transfer_stream(); + retErr = enqueueEventsWait(command_queue, cuStream, num_events_in_wait_list, + event_wait_list); CUarray array = image->mem_.surface_mem_.get_array(); @@ -4282,8 +4328,8 @@ pi_result cuda_piEnqueueMemImageRead( } if (event) { - auto new_event = - _pi_event::make_native(PI_COMMAND_TYPE_IMAGE_READ, command_queue); + auto new_event = _pi_event::make_native(PI_COMMAND_TYPE_IMAGE_READ, + command_queue, cuStream); new_event->record(); *event = new_event; } @@ -4317,15 +4363,12 @@ cuda_piEnqueueMemImageWrite(pi_queue command_queue, pi_mem image, assert(image->mem_type_ == _pi_mem::mem_type::surface); pi_result retErr = PI_SUCCESS; - CUstream cuStream = command_queue->get(); try { ScopedContext active(command_queue->get_context()); - - if (event_wait_list) { - cuda_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); - } + CUstream cuStream = command_queue->get_next_transfer_stream(); + retErr = enqueueEventsWait(command_queue, cuStream, num_events_in_wait_list, + event_wait_list); CUarray array = image->mem_.surface_mem_.get_array(); @@ -4355,8 +4398,8 @@ cuda_piEnqueueMemImageWrite(pi_queue command_queue, pi_mem image, } if (event) { - auto new_event = - _pi_event::make_native(PI_COMMAND_TYPE_IMAGE_WRITE, command_queue); + auto new_event = _pi_event::make_native(PI_COMMAND_TYPE_IMAGE_WRITE, + command_queue, cuStream); new_event->record(); *event = new_event; } @@ -4382,15 +4425,12 @@ pi_result cuda_piEnqueueMemImageCopy(pi_queue command_queue, pi_mem src_image, dst_image->mem_.surface_mem_.get_image_type()); pi_result retErr = PI_SUCCESS; - CUstream cuStream = command_queue->get(); try { ScopedContext active(command_queue->get_context()); - - if (event_wait_list) { - cuda_piEnqueueEventsWait(command_queue, num_events_in_wait_list, - event_wait_list, nullptr); - } + CUstream cuStream = command_queue->get_next_transfer_stream(); + retErr = enqueueEventsWait(command_queue, cuStream, num_events_in_wait_list, + event_wait_list); CUarray srcArray = src_image->mem_.surface_mem_.get_array(); CUarray dstArray = dst_image->mem_.surface_mem_.get_array(); @@ -4430,8 +4470,8 @@ pi_result cuda_piEnqueueMemImageCopy(pi_queue command_queue, pi_mem src_image, } if (event) { - auto new_event = - _pi_event::make_native(PI_COMMAND_TYPE_IMAGE_COPY, command_queue); + auto new_event = _pi_event::make_native(PI_COMMAND_TYPE_IMAGE_COPY, + command_queue, cuStream); new_event->record(); *event = new_event; } @@ -4501,8 +4541,9 @@ pi_result cuda_piEnqueueMemBufferMap(pi_queue command_queue, pi_mem buffer, if (event) { try { - *event = _pi_event::make_native(PI_COMMAND_TYPE_MEM_BUFFER_MAP, - command_queue); + *event = _pi_event::make_native( + PI_COMMAND_TYPE_MEM_BUFFER_MAP, command_queue, + command_queue->get_next_transfer_stream()); (*event)->start(); (*event)->record(); } catch (pi_result error) { @@ -4555,8 +4596,9 @@ pi_result cuda_piEnqueueMemUnmap(pi_queue command_queue, pi_mem memobj, if (event) { try { - *event = _pi_event::make_native(PI_COMMAND_TYPE_MEM_BUFFER_UNMAP, - command_queue); + *event = _pi_event::make_native( + PI_COMMAND_TYPE_MEM_BUFFER_UNMAP, command_queue, + command_queue->get_next_transfer_stream()); (*event)->start(); (*event)->record(); } catch (pi_result error) { @@ -4676,17 +4718,17 @@ pi_result cuda_piextUSMEnqueueMemset(pi_queue queue, void *ptr, pi_int32 value, pi_event *event) { assert(queue != nullptr); assert(ptr != nullptr); - CUstream cuStream = queue->get(); pi_result result = PI_SUCCESS; std::unique_ptr<_pi_event> event_ptr{nullptr}; try { ScopedContext active(queue->get_context()); - result = cuda_piEnqueueEventsWait(queue, num_events_in_waitlist, - events_waitlist, nullptr); + CUstream cuStream = queue->get_next_compute_stream(); + result = enqueueEventsWait(queue, cuStream, num_events_in_waitlist, + events_waitlist); if (event) { - event_ptr = std::unique_ptr<_pi_event>( - _pi_event::make_native(PI_COMMAND_TYPE_MEM_BUFFER_FILL, queue)); + event_ptr = std::unique_ptr<_pi_event>(_pi_event::make_native( + PI_COMMAND_TYPE_MEM_BUFFER_FILL, queue, cuStream)); event_ptr->start(); } result = PI_CHECK_ERROR(cuMemsetD8Async( @@ -4710,17 +4752,18 @@ pi_result cuda_piextUSMEnqueueMemcpy(pi_queue queue, pi_bool blocking, assert(queue != nullptr); assert(dst_ptr != nullptr); assert(src_ptr != nullptr); - CUstream cuStream = queue->get(); pi_result result = PI_SUCCESS; + std::unique_ptr<_pi_event> event_ptr{nullptr}; try { ScopedContext active(queue->get_context()); - result = cuda_piEnqueueEventsWait(queue, num_events_in_waitlist, - events_waitlist, nullptr); + CUstream cuStream = queue->get_next_transfer_stream(); + result = enqueueEventsWait(queue, cuStream, num_events_in_waitlist, + events_waitlist); if (event) { - event_ptr = std::unique_ptr<_pi_event>( - _pi_event::make_native(PI_COMMAND_TYPE_MEM_BUFFER_COPY, queue)); + event_ptr = std::unique_ptr<_pi_event>(_pi_event::make_native( + PI_COMMAND_TYPE_MEM_BUFFER_COPY, queue, cuStream)); event_ptr->start(); } result = PI_CHECK_ERROR(cuMemcpyAsync( @@ -4767,17 +4810,17 @@ pi_result cuda_piextUSMEnqueuePrefetch(pi_queue queue, const void *ptr, return PI_INVALID_VALUE; assert(queue != nullptr); assert(ptr != nullptr); - CUstream cuStream = queue->get(); pi_result result = PI_SUCCESS; std::unique_ptr<_pi_event> event_ptr{nullptr}; try { ScopedContext active(queue->get_context()); - result = cuda_piEnqueueEventsWait(queue, num_events_in_waitlist, - events_waitlist, nullptr); + CUstream cuStream = queue->get_next_transfer_stream(); + result = enqueueEventsWait(queue, cuStream, num_events_in_waitlist, + events_waitlist); if (event) { - event_ptr = std::unique_ptr<_pi_event>( - _pi_event::make_native(PI_COMMAND_TYPE_MEM_BUFFER_COPY, queue)); + event_ptr = std::unique_ptr<_pi_event>(_pi_event::make_native( + PI_COMMAND_TYPE_MEM_BUFFER_COPY, queue, cuStream)); event_ptr->start(); } result = PI_CHECK_ERROR(cuMemPrefetchAsync( @@ -4807,8 +4850,8 @@ pi_result cuda_piextUSMEnqueueMemAdvise(pi_queue queue, const void *ptr, ScopedContext active(queue->get_context()); if (event) { - event_ptr = std::unique_ptr<_pi_event>( - _pi_event::make_native(PI_COMMAND_TYPE_USER, queue)); + event_ptr = std::unique_ptr<_pi_event>(_pi_event::make_native( + PI_COMMAND_TYPE_USER, queue, queue->get_next_transfer_stream())); event_ptr->start(); } diff --git a/sycl/plugins/cuda/pi_cuda.hpp b/sycl/plugins/cuda/pi_cuda.hpp index 0d10142caf073..108852ea5c08d 100644 --- a/sycl/plugins/cuda/pi_cuda.hpp +++ b/sycl/plugins/cuda/pi_cuda.hpp @@ -36,12 +36,12 @@ extern "C" { /// \cond IGNORE_BLOCK_IN_DOXYGEN -pi_result cuda_piContextRetain(pi_context ); -pi_result cuda_piContextRelease(pi_context ); -pi_result cuda_piDeviceRelease(pi_device ); -pi_result cuda_piDeviceRetain(pi_device ); -pi_result cuda_piProgramRetain(pi_program ); -pi_result cuda_piProgramRelease(pi_program ); +pi_result cuda_piContextRetain(pi_context); +pi_result cuda_piContextRelease(pi_context); +pi_result cuda_piDeviceRelease(pi_device); +pi_result cuda_piDeviceRetain(pi_device); +pi_result cuda_piProgramRetain(pi_program); +pi_result cuda_piProgramRelease(pi_program); pi_result cuda_piQueueRelease(pi_queue); pi_result cuda_piQueueRetain(pi_queue); pi_result cuda_piMemRetain(pi_mem); @@ -379,18 +379,33 @@ struct _pi_mem { /// struct _pi_queue { using native_type = CUstream; + static constexpr int default_num_compute_streams = 128; + static constexpr int default_num_transfer_streams = 64; - native_type stream_; + std::vector compute_streams_; + std::vector transfer_streams_; _pi_context *context_; _pi_device *device_; pi_queue_properties properties_; std::atomic_uint32_t refCount_; std::atomic_uint32_t eventCount_; - - _pi_queue(CUstream stream, _pi_context *context, _pi_device *device, - pi_queue_properties properties) - : stream_{stream}, context_{context}, device_{device}, - properties_{properties}, refCount_{1}, eventCount_{0} { + std::atomic_uint32_t compute_stream_idx_; + std::atomic_uint32_t transfer_stream_idx_; + unsigned int num_compute_streams_; + unsigned int num_transfer_streams_; + unsigned int flags_; + std::mutex compute_stream_mutex_; + std::mutex transfer_stream_mutex_; + + _pi_queue(std::vector &&compute_streams, + std::vector &&transfer_streams, _pi_context *context, + _pi_device *device, pi_queue_properties properties, + unsigned int flags) + : compute_streams_{std::move(compute_streams)}, + transfer_streams_{std::move(transfer_streams)}, context_{context}, + device_{device}, properties_{properties}, refCount_{1}, eventCount_{0}, + compute_stream_idx_{0}, transfer_stream_idx_{0}, + num_compute_streams_{0}, num_transfer_streams_{0}, flags_(flags) { cuda_piContextRetain(context_); cuda_piDeviceRetain(device_); } @@ -400,7 +415,32 @@ struct _pi_queue { cuda_piDeviceRelease(device_); } - native_type get() const noexcept { return stream_; }; + // get_next_compute/transfer_stream() functions return streams from + // appropriate pools in round-robin fashion + native_type get_next_compute_stream(); + native_type get_next_transfer_stream(); + native_type get() { return get_next_compute_stream(); }; + + template void for_each_stream(T &&f) { + { + std::lock_guard compute_guard(compute_stream_mutex_); + unsigned int end = + std::min(static_cast(compute_streams_.size()), + num_compute_streams_); + for (unsigned int i = 0; i < end; i++) { + f(compute_streams_[i]); + } + } + { + std::lock_guard transfer_guard(transfer_stream_mutex_); + unsigned int end = + std::min(static_cast(transfer_streams_.size()), + num_transfer_streams_); + for (unsigned int i = 0; i < end; i++) { + f(transfer_streams_[i]); + } + } + } _pi_context *get_context() const { return context_; }; @@ -431,6 +471,8 @@ struct _pi_event { pi_queue get_queue() const noexcept { return queue_; } + CUstream get_stream() const noexcept { return stream_; } + pi_command_type get_command_type() const noexcept { return commandType_; } pi_uint32 get_reference_count() const noexcept { return refCount_; } @@ -474,8 +516,9 @@ struct _pi_event { pi_uint64 get_end_time() const; // construct a native CUDA. This maps closely to the underlying CUDA event. - static pi_event make_native(pi_command_type type, pi_queue queue) { - return new _pi_event(type, queue->get_context(), queue); + static pi_event make_native(pi_command_type type, pi_queue queue, + CUstream stream) { + return new _pi_event(type, queue->get_context(), queue, stream); } pi_result release(); @@ -485,7 +528,8 @@ struct _pi_event { private: // This constructor is private to force programmers to use the make_native / // make_user static members in order to create a pi_event for CUDA. - _pi_event(pi_command_type type, pi_context context, pi_queue queue); + _pi_event(pi_command_type type, pi_context context, pi_queue queue, + CUstream stream); pi_command_type commandType_; // The type of command associated with event. @@ -514,6 +558,9 @@ struct _pi_event { pi_queue queue_; // pi_queue associated with the event. If this is a user // event, this will be nullptr. + CUstream stream_; // CUstream associated with the event. If this is a user + // event, this will be uninitialized. + pi_context context_; // pi_context associated with the event. If this is a // native event, this will be the same context associated // with the queue_ member. @@ -547,7 +594,7 @@ struct _pi_program { pi_result set_binary(const char *binary, size_t binarySizeInBytes); - pi_result build_program(const char* build_options); + pi_result build_program(const char *build_options); pi_context get_context() const { return context_; }; @@ -688,8 +735,7 @@ struct _pi_kernel { assert(retError == PI_SUCCESS); } - ~_pi_kernel() - { + ~_pi_kernel() { cuda_piProgramRelease(program_); cuda_piContextRelease(context_); } diff --git a/sycl/unittests/pi/cuda/CMakeLists.txt b/sycl/unittests/pi/cuda/CMakeLists.txt index 6afd689af1305..df1679c8a5eb2 100644 --- a/sycl/unittests/pi/cuda/CMakeLists.txt +++ b/sycl/unittests/pi/cuda/CMakeLists.txt @@ -7,7 +7,6 @@ add_sycl_unittest(PiCudaTests OBJECT test_kernels.cpp test_mem_obj.cpp test_primary_context.cpp - test_queue.cpp test_sampler_properties.cpp ) diff --git a/sycl/unittests/pi/cuda/test_queue.cpp b/sycl/unittests/pi/cuda/test_queue.cpp deleted file mode 100644 index f5c3a7b22173d..0000000000000 --- a/sycl/unittests/pi/cuda/test_queue.cpp +++ /dev/null @@ -1,172 +0,0 @@ -//==---- test_queue.cpp --- PI unit tests ----------------------------------==// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// - -#include "gtest/gtest.h" - -#include - -#include "TestGetPlatforms.hpp" -#include "TestGetPlugin.hpp" -#include -#include -#include -#include -#include - -using namespace sycl; - -struct CudaTestQueue : public ::testing::TestWithParam { - -protected: - std::optional plugin = - pi::initializeAndGet(backend::ext_oneapi_cuda); - - pi_platform platform_; - pi_device device_; - pi_context context_; - - void SetUp() override { - // skip the tests if the CUDA backend is not available - if (!plugin.has_value()) { - GTEST_SKIP(); - } - - pi_uint32 numPlatforms = 0; - ASSERT_EQ(plugin->getBackend(), backend::ext_oneapi_cuda); - - ASSERT_EQ((plugin->call_nocheck( - 0, nullptr, &numPlatforms)), - PI_SUCCESS) - << "piPlatformsGet failed.\n"; - - ASSERT_EQ((plugin->call_nocheck( - numPlatforms, &platform_, nullptr)), - PI_SUCCESS) - << "piPlatformsGet failed.\n"; - - ASSERT_EQ((plugin->call_nocheck( - platform_, PI_DEVICE_TYPE_GPU, 1, &device_, nullptr)), - PI_SUCCESS); - ASSERT_EQ((plugin->call_nocheck( - nullptr, 1, &device_, nullptr, nullptr, &context_)), - PI_SUCCESS); - EXPECT_NE(context_, nullptr); - } - - void TearDown() override { - if (plugin.has_value()) { - plugin->call(device_); - plugin->call(context_); - } - } - - CudaTestQueue() = default; - - ~CudaTestQueue() = default; -}; - -TEST_F(CudaTestQueue, PICreateQueueSimple) { - pi_queue queue; - ASSERT_EQ((plugin->call_nocheck( - context_, device_, 0, &queue)), - PI_SUCCESS); - ASSERT_NE(queue, nullptr); - EXPECT_EQ(queue->get_context(), context_); - - unsigned int flags = 0; - CUstream stream = queue->get(); - cuStreamGetFlags(stream, &flags); - ASSERT_EQ(flags, CU_STREAM_NON_BLOCKING); - - ASSERT_EQ((plugin->call_nocheck(queue)), - PI_SUCCESS); -} - -TEST_F(CudaTestQueue, PIQueueFinishSimple) { - pi_queue queue; - ASSERT_EQ((plugin->call_nocheck( - context_, device_, 0, &queue)), - PI_SUCCESS); - ASSERT_NE(queue, nullptr); - - // todo: post work on queue, ensure the results are valid and the work is - // complete after piQueueFinish? - - ASSERT_EQ((plugin->call_nocheck(queue)), - PI_SUCCESS); - - ASSERT_EQ(cuStreamQuery(queue->get()), CUDA_SUCCESS); - - ASSERT_EQ((plugin->call_nocheck(queue)), - PI_SUCCESS); -} - -TEST_F(CudaTestQueue, PICreateQueueSimpleDefault) { - pi_queue queue; - ASSERT_EQ((plugin->call_nocheck( - context_, device_, __SYCL_PI_CUDA_USE_DEFAULT_STREAM, &queue)), - PI_SUCCESS); - ASSERT_NE(queue, nullptr); - EXPECT_EQ(queue->get_context(), context_); - - unsigned int flags = 0; - CUstream stream = queue->get(); - cuStreamGetFlags(stream, &flags); - ASSERT_EQ(flags, CU_STREAM_DEFAULT); - - ASSERT_EQ((plugin->call_nocheck(queue)), - PI_SUCCESS); -} - -TEST_F(CudaTestQueue, PICreateQueueSyncWithDefault) { - pi_queue queue; - ASSERT_EQ((plugin->call_nocheck( - context_, device_, __SYCL_PI_CUDA_SYNC_WITH_DEFAULT, &queue)), - PI_SUCCESS); - ASSERT_NE(queue, nullptr); - EXPECT_EQ(queue->get_context(), context_); - - unsigned int flags = 0; - CUstream stream = queue->get(); - cuStreamGetFlags(stream, &flags); - ASSERT_NE(flags, CU_STREAM_NON_BLOCKING); - - ASSERT_EQ((plugin->call_nocheck(queue)), - PI_SUCCESS); -} - -TEST_F(CudaTestQueue, PICreateQueueInterop) { - pi_queue queue; - ASSERT_EQ((plugin->call_nocheck( - context_, device_, 0, &queue)), - PI_SUCCESS); - ASSERT_NE(queue, nullptr); - EXPECT_EQ(queue->get_context(), context_); - - CUstream cuStream = queue->get(); - - CUcontext cuCtx; - CUresult res = cuStreamGetCtx(cuStream, &cuCtx); - ASSERT_EQ(res, CUDA_SUCCESS); - EXPECT_EQ(cuCtx, context_->get()); - - ASSERT_EQ((plugin->call_nocheck(queue)), - PI_SUCCESS); -} - -TEST_P(CudaTestQueue, SYCLQueueDefaultStream) { - std::vector CudaDevices = GetParam().get_devices(); - auto deviceA_ = CudaDevices[0]; - queue Queue(deviceA_, async_handler{}, - {ext::oneapi::cuda::property::queue::use_default_stream{}}); - - CUstream CudaStream = get_native(Queue); - unsigned int flags; - cuStreamGetFlags(CudaStream, &flags); - ASSERT_EQ(flags, CU_STREAM_DEFAULT); -}