Skip to content

Conversation

@Artemy-Mellanox
Copy link
Contributor

No description provided.

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR implements a "Collapsed CQ" (Completion Queue) architecture for the UCT GDA (GPU Direct Async) transport layer. The refactoring consolidates completion queue management by moving from a per-endpoint, thread-based CQE consumption model to a direct, on-demand CQE polling approach. Key changes include: (1) reducing the CQ length from tx_qp_len * UCT_IB_MLX5_MAX_BB to 1 and marking it with UCT_IB_MLX5_CQ_IGNORE_OVERRUN, (2) removing per-endpoint state tracking fields (cqe_ci, sq_wqe_pi, producer_index, avail_count) from device endpoint structures, (3) replacing the dedicated progress thread with inline CQE reading functions (uct_rc_mlx5_gda_read_cqe, uct_rc_mlx5_gda_calc_pi), and (4) introducing an optimistic WQE reservation mechanism with atomicCAS-based rollback to handle resource contention. The test infrastructure is updated to validate only ready_index instead of the removed fields. This architectural shift simplifies the code by eliminating separate consumer index tracking and reducing memory footprint, while enabling all threads to independently determine resource availability by directly querying the collapsed CQ.

Important Files Changed

Filename Score Overview
src/uct/ib/mlx5/gdaki/gdaki.cuh 2/5 Replaces avail_count tracking with direct CQE polling and adds optimistic atomicCAS-based WQE reservation with rollback; contains potential race condition and unbounded retry loop
src/uct/ib/mlx5/gdaki/gdaki.c 3/5 Reduces CQ length to 1, sets IGNORE_OVERRUN flag, and removes per-endpoint SQ management fields from device endpoint initialization
src/uct/ib/mlx5/gdaki/gdaki_dev.h 3/5 Removes per-endpoint cqe_ci, sq_wqe_pi, and avail_count fields; adds cq_lock for shared CQ synchronization
test/gtest/ucp/cuda/test_kernels.cu 4/5 Removes tracking of producer_index and avail_count from kernel state collection logic, keeping only ready_index
test/gtest/ucp/test_ucp_device.cc 4/5 Removes assertions for producer_index and avail_count in test validation, keeping only ready_index check
test/gtest/ucp/cuda/test_kernels.h 4/5 Removes producer_index and avail_count fields from result struct, simplifying to status and ready_index only

Confidence score: 2/5

  • This PR requires careful review due to potential race conditions and synchronization issues in the collapsed CQ implementation
  • Score reflects concerns about the non-atomic read at line149 of gdaki.cuh (potential race between read and atomicAdd), the unbounded retry loop in the rollback path (lines 176-184) that could cause livelock under high contention, and the removal of explicit flow control mechanisms without clear replacement guarantees. The IGNORE_OVERRUN flag indicates intentional relaxation of CQ overflow protection that needs thorough validation
  • Pay close attention to src/uct/ib/mlx5/gdaki/gdaki.cuh for the WQE reservation logic and synchronization correctness, and verify that the single-CQE design in src/uct/ib/mlx5/gdaki/gdaki.c handles all completion scenarios without data loss

Sequence Diagram

sequenceDiagram
    participant User
    participant Host
    participant UCT_GDAKI
    participant CUDA_Driver
    participant GPU_Kernel
    participant HW_QP as Hardware QP
    participant HW_CQ as Hardware CQ (Collapsed)

    User->>Host: Initialize GDAKI Interface
    Host->>UCT_GDAKI: uct_rc_gdaki_iface_init()
    UCT_GDAKI->>CUDA_Driver: cuDeviceGet()
    UCT_GDAKI->>CUDA_Driver: cuDevicePrimaryCtxRetain()
    UCT_GDAKI->>CUDA_Driver: cuMemAlloc() for atomic buffer
    UCT_GDAKI->>UCT_GDAKI: ibv_reg_mr() for atomic buffer
    UCT_GDAKI-->>Host: Interface ready

    User->>Host: Create Endpoint
    Host->>UCT_GDAKI: uct_rc_gdaki_ep_init()
    UCT_GDAKI->>CUDA_Driver: cuCtxPushCurrent()
    UCT_GDAKI->>CUDA_Driver: cuMemAlloc() for dev_ep (counters, CQ, WQ)
    UCT_GDAKI->>UCT_GDAKI: mlx5dv_devx_umem_reg()
    UCT_GDAKI->>UCT_GDAKI: uct_ib_mlx5_devx_create_cq_common()
    Note over UCT_GDAKI,HW_CQ: Create collapsed CQ in GPU memory
    UCT_GDAKI->>UCT_GDAKI: uct_ib_mlx5_devx_create_qp_common()
    UCT_GDAKI->>CUDA_Driver: cuMemHostRegister() for UAR
    UCT_GDAKI->>CUDA_Driver: cuMemHostGetDevicePointer() for DB
    UCT_GDAKI->>CUDA_Driver: cuMemcpyHtoD() to initialize dev_ep
    UCT_GDAKI->>CUDA_Driver: cuCtxPopCurrent()
    UCT_GDAKI-->>Host: Endpoint ready

    User->>Host: Connect Endpoint
    Host->>UCT_GDAKI: uct_rc_gdaki_ep_connect_to_ep_v2()
    UCT_GDAKI->>UCT_GDAKI: uct_rc_mlx5_iface_common_devx_connect_qp()
    UCT_GDAKI-->>Host: Connection established

    User->>Host: Launch GPU Kernel for PUT operation
    Host->>GPU_Kernel: ucp_test_kernel<level>()
    GPU_Kernel->>GPU_Kernel: uct_rc_mlx5_gda_reserv_wqe()
    Note over GPU_Kernel: Atomically reserve WQE slots
    GPU_Kernel->>GPU_Kernel: uct_rc_mlx5_gda_wqe_prepare_put_or_atomic()
    GPU_Kernel->>GPU_Kernel: doca_gpu_dev_verbs_store_wqe_seg()
    Note over GPU_Kernel: Write WQE to GPU memory
    GPU_Kernel->>GPU_Kernel: uct_rc_mlx5_gda_db()
    GPU_Kernel->>GPU_Kernel: __threadfence()
    GPU_Kernel->>GPU_Kernel: atomicCAS to update sq_ready_index
    GPU_Kernel->>HW_QP: uct_rc_mlx5_gda_ring_db()
    Note over GPU_Kernel,HW_QP: Ring doorbell via GPU-mapped UAR
    GPU_Kernel->>HW_QP: uct_rc_mlx5_gda_update_dbr()
    Note over GPU_Kernel,HW_QP: Update doorbell record

    HW_QP->>HW_QP: Process RDMA WRITE
    HW_QP->>HW_CQ: Write CQE to GPU memory
    Note over HW_CQ: CQ is in GPU-accessible memory

    GPU_Kernel->>GPU_Kernel: uct_rc_mlx5_gda_ep_check_completion()
    GPU_Kernel->>HW_CQ: uct_rc_mlx5_gda_read_cqe()
    Note over GPU_Kernel,HW_CQ: Read CQE directly from GPU
    GPU_Kernel->>GPU_Kernel: uct_rc_mlx5_gda_calc_pi()
    GPU_Kernel->>GPU_Kernel: Compare wqe_idx with completion
    GPU_Kernel-->>Host: Return completion status

    User->>Host: Cleanup Endpoint
    Host->>UCT_GDAKI: uct_rc_gdaki_ep_cleanup()
    UCT_GDAKI->>CUDA_Driver: cuMemHostUnregister()
    UCT_GDAKI->>UCT_GDAKI: uct_ib_mlx5_devx_destroy_qp_common()
    UCT_GDAKI->>UCT_GDAKI: uct_ib_mlx5_devx_destroy_cq_common()
    UCT_GDAKI->>UCT_GDAKI: mlx5dv_devx_umem_dereg()
    UCT_GDAKI->>CUDA_Driver: cuMemFree()
    UCT_GDAKI-->>Host: Cleanup complete
Loading

6 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

Comment on lines +149 to 151
if (ep->sq_rsvd_index > max_wqe_base) {
return UCT_RC_GDA_RESV_WQE_NO_RESOURCE;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: race condition: ep->sq_rsvd_index read without atomic protection could see stale value, leading to incorrect reservation checks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add READ_ONCE to make sure value is not cached?
lock would bee too expensive in this case

Comment on lines +176 to +184
while (wqe_base > max_wqe_base) {
uint64_t wqe_next = wqe_base + count;
if (atomicCAS(reinterpret_cast<unsigned long long*>(&ep->sq_rsvd_index),
wqe_next, wqe_base) == wqe_next) {
return UCT_RC_GDA_RESV_WQE_NO_RESOURCE;
}

max_wqe_base = uct_rc_mlx5_gda_max_alloc_wqe_base(ep, count);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: potential livelock: if max_wqe_base keeps getting updated by concurrent operations, threads could spin indefinitely in this rollback loop without bounded retry limit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_wqe_base updated eventually will make condition wqe_base > max_wqe_base false and quit loop

Comment on lines +137 to +139
uct_rc_mlx5_gda_read_cqe(ep, &wqe_cnt, NULL);
pi = uct_rc_mlx5_gda_calc_pi(ep, wqe_cnt);
return pi + ep->sq_wqe_num + 1 - count;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: recalculating CQE data on every iteration adds overhead; consider caching if called frequently in tight loops

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each iteration should see update wqe_cnt - how we can cache?

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR implements a "collapsed completion queue" (CQ) architecture for the UCX GDA (GPU Direct Async) InfiniBand transport layer. The changes eliminate the separate progress thread and shared atomic counters (avail_count, producer_index) that previously tracked send queue state on the host side. Instead, GPU device kernels now directly poll completion queue entries (CQEs) during work queue entry (WQE) reservation to compute available slots on-demand. The endpoint device structure (uct_rc_gdaki_dev_ep_t) has been streamlined by removing redundant hardware indices (sq_wqe_pi, cqe_ci, avail_count), consolidating state into reservation indices (sq_rsvd_index, sq_ready_index) and a minimal TX CQ (size 1). Test infrastructure was updated to match the new result structure that now returns only status and ready_index. This refactor reduces host-GPU synchronization overhead and memory footprint but shifts complexity into device-side atomic reservation logic with rollback loops.

Important Files Changed

Filename Score Overview
test/gtest/ucp/cuda/test_kernels.h 4/5 Removed producer_index and avail_count fields from kernel result structure, simplifying test API
test/gtest/ucp/test_ucp_device.cc 4/5 Removed validation of producer_index and avail_count, retaining only ready_index check
test/gtest/ucp/cuda/test_kernels.cu 4/5 Removed intermediate state tracking for producer_index and avail_count in kernel state capture
src/uct/ib/mlx5/gdaki/gdaki.c 3/5 Collapsed TX CQ to size 1 and removed initialization of queue tracking fields in device endpoint
src/uct/ib/mlx5/gdaki/gdaki_dev.h 3/5 Removed sq_wqe_pi, cqe_ci, and avail_count fields from device endpoint structure
src/uct/ib/mlx5/gdaki/gdaki.cuh 2/5 Replaced avail_count tracking with direct CQE polling and added atomic rollback loop for reservations

Confidence score: 2/5

  • This PR introduces significant architectural changes to critical GPU-device path logic that could cause silent queue corruption or livelock under race conditions
  • Score reflects multiple concurrency issues: non-atomic reads of sq_rsvd_index creating race conditions, unbounded rollback loops that can livelock when threads contend, and fixed CQE address reads that may yield stale data in circular buffers
  • Pay close attention to src/uct/ib/mlx5/gdaki/gdaki.cuh where the reservation and CQE polling logic has been rewritten, and verify the atomic semantics around sq_rsvd_index access patterns across all device-side code paths

Sequence Diagram

sequenceDiagram
    participant User
Loading

6 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

Comment on lines +110 to +121
UCS_F_DEVICE void uct_rc_mlx5_gda_read_cqe(uct_rc_gdaki_dev_ep_t *ep,
uint16_t *wqe_cnt, uint8_t *opcode)
{
auto *cqe64 = reinterpret_cast<mlx5_cqe64*>(ep->cqe_daddr);
uint32_t *data_ptr = (uint32_t*)&cqe64->wqe_counter;
uint32_t data = READ_ONCE(*data_ptr);

*wqe_cnt = uct_rc_mlx5_gda_bswap16(data);
if (opcode != NULL) {
*opcode = data >> 28;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: CQE always read from fixed location ep->cqe_daddr; if this is a circular buffer (typical for CQs), the code must index by cqe_ci % cqe_num, otherwise stale or incorrect CQEs will be read. Does ep->cqe_daddr always point to the current CQE (updated elsewhere), or is it a base address requiring indexing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in collapsed CQ case the queue consist of single CQE element so it should be ok

uct_rc_mlx5_gda_fc(const uct_rc_gdaki_dev_ep_t *ep, uint16_t wqe_idx)
{
return (wqe_idx & ep->sq_fc_mask) == 1;
return !(wqe_idx & ep->sq_fc_mask);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: flow-control condition inverted from (wqe_idx & mask) == 1 to !(wqe_idx & mask); these produce different true sets unless mask is crafted so that & mask yields only 0 or 1; confirm mask semantics. What is the intended range of ep->sq_fc_mask? If it's a multi-bit mask, the new logic will trigger FC on any zero result, not just when the masked value equals 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flow-control request completion at least on half of work queue size,
each time when sq_fc_mask (=wqe_size/2-1) & wqe_idx equal to same number. comparing to 0 supposed to save explicit comparison instruction

@iyastreb
Copy link
Contributor

Maybe you can document performance implications (just % improve) of this change with 1-32-128 threads?

uint32_t data = READ_ONCE(*data_ptr);

*wqe_cnt = uct_rc_mlx5_gda_bswap16(data);
if (opcode != NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO better read it unconditionally than having a branch
Can just ignore the result if not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect this condition to be ruled out at compile time

uct_rc_mlx5_gda_reserv_wqe(uct_rc_gdaki_dev_ep_t *ep, unsigned count,
unsigned lane_id, uint64_t &wqe_base)
{
wqe_base = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally added zero initialization to avoid a crash with syndrome 68

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it cause this crash and how this initialization prevent it?
code looks like it's just overwritten by shuffle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this one was quite tricky, and I also struggled to understand.
So I asked chatgpt and gemini, and both pointed that uninitialized wqe_base leads to UB in some cases:

The CUDA execution model might still produce the correct result most of the time because the __shfl_sync instruction will force the other lanes to wait for lane 0 to arrive. When lane 0 finally executes the shuffle, its value will be correctly broadcast.

However, relying on this implicit synchronization is dangerous and can lead to undefined behavior. The code is not robust because it makes assumptions about instruction scheduling and thread divergence that may not hold true on all GPU architectures or with future compiler versions. The most significant risk is that the compiler might perform optimizations based on the uninitialized value of wqe_base in the non-zero lanes before the shuffle call, leading to incorrect code generation.

This issue was not always reproducible on rock, but quite frequently failed in CI with syndrome 68.
So better keep this change

UCS_F_DEVICE void uct_rc_mlx5_gda_read_cqe(uct_rc_gdaki_dev_ep_t *ep,
uint16_t *wqe_cnt, uint8_t *opcode)
{
auto *cqe64 = reinterpret_cast<mlx5_cqe64*>(ep->cqe_daddr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to check the owner in title of collapsed CQ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in collapsed CQ ownership is always software


struct test_ucp_device_kernel_result_t {
ucs_status_t status;
uint64_t producer_index;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the producer index and retrieve it from sq_rsvd_index maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could call uct_rc_mlx5_gda_read_cqe/calc_pi here

uint64_t pi;

uct_rc_mlx5_gda_read_cqe(ep, &wqe_cnt, NULL);
pi = uct_rc_mlx5_gda_calc_pi(ep, wqe_cnt);
Copy link
Contributor

@ofirfarjun7 ofirfarjun7 Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not unite them if uct_rc_mlx5_gda_read_cqe is always used before uct_rc_mlx5_gda_calc_pi with common params? you think it will be strange if uct_rc_mlx5_gda_calc_pi will return wqe_cnt as out param?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants