blob: 636f33c668824c643b754aff499fd33e52bac4fa [file] [log] [blame]
From: Konstantin Taranov <kotaranov@microsoft.com>
Date: Tue, 11 Mar 2025 20:49:21 +0100
Subject: providers/mana: improve synchronization on the shadow queue
Use release/acquire semantics for the shadow queue.
It ensures synchronization between sender and poller threads.
Signed-off-by: Konstantin Taranov <kotaranov@microsoft.com>
Origin: upstream, https://github.com/linux-rdma/rdma-core/pull/1584
---
providers/mana/shadow_queue.h | 58 ++++++++++++++++++++++++++-----------------
1 file changed, 35 insertions(+), 23 deletions(-)
diff --git a/providers/mana/shadow_queue.h b/providers/mana/shadow_queue.h
index 1073f7c..9343fec 100644
--- a/providers/mana/shadow_queue.h
+++ b/providers/mana/shadow_queue.h
@@ -12,6 +12,9 @@
#include <infiniband/verbs.h>
#include <sys/mman.h>
#include <util/util.h>
+#include <stdatomic.h>
+
+typedef _Atomic(uint64_t) _atomic_t;
#define MANA_NO_SIGNAL_WC (0xff)
@@ -83,8 +86,18 @@ static inline void destroy_shadow_queue(struct shadow_queue *queue)
}
}
+static inline _atomic_t *producer(struct shadow_queue *queue)
+{
+ return (_atomic_t *)&queue->prod_idx;
+}
+
+static inline _atomic_t *consumer(struct shadow_queue *queue)
+{
+ return (_atomic_t *)&queue->cons_idx;
+}
+
static inline struct shadow_wqe_header *
-shadow_queue_get_element(const struct shadow_queue *queue, uint64_t unmasked_index)
+shadow_queue_get_element(struct shadow_queue *queue, uint64_t unmasked_index)
{
uint32_t index = unmasked_index & (queue->length - 1);
@@ -93,53 +106,51 @@ shadow_queue_get_element(const struct shadow_queue *queue, uint64_t unmasked_ind
static inline bool shadow_queue_full(struct shadow_queue *queue)
{
- return (queue->prod_idx - queue->cons_idx) >= queue->length;
+ uint64_t prod_idx = atomic_load_explicit(producer(queue), memory_order_relaxed);
+ uint64_t cons_idx = atomic_load_explicit(consumer(queue), memory_order_acquire);
+
+ return (prod_idx - cons_idx) >= queue->length;
}
static inline struct shadow_wqe_header *
shadow_queue_producer_entry(struct shadow_queue *queue)
{
- return shadow_queue_get_element(queue, queue->prod_idx);
+ uint64_t prod_idx = atomic_load_explicit(producer(queue), memory_order_relaxed);
+
+ return shadow_queue_get_element(queue, prod_idx);
}
static inline void shadow_queue_advance_producer(struct shadow_queue *queue)
{
- queue->prod_idx++;
-}
+ uint64_t prod_idx = atomic_load_explicit(producer(queue), memory_order_relaxed);
-static inline void shadow_queue_retreat_producer(struct shadow_queue *queue)
-{
- queue->prod_idx--;
+ atomic_store_explicit(producer(queue), prod_idx + 1, memory_order_release);
}
static inline void shadow_queue_advance_consumer(struct shadow_queue *queue)
{
- queue->cons_idx++;
-}
+ uint64_t cons_idx = atomic_load_explicit(consumer(queue), memory_order_relaxed);
-static inline bool shadow_queue_empty(struct shadow_queue *queue)
-{
- return queue->prod_idx == queue->cons_idx;
-}
-
-static inline uint32_t shadow_queue_get_pending_wqe_count(struct shadow_queue *queue)
-{
- return (uint32_t)(queue->prod_idx - queue->next_to_complete_idx);
+ atomic_store_explicit(consumer(queue), cons_idx + 1, memory_order_release);
}
static inline struct shadow_wqe_header *
-shadow_queue_get_next_to_consume(const struct shadow_queue *queue)
+shadow_queue_get_next_to_consume(struct shadow_queue *queue)
{
- if (queue->cons_idx == queue->next_to_complete_idx)
+ uint64_t cons_idx = atomic_load_explicit(consumer(queue), memory_order_relaxed);
+
+ if (cons_idx == queue->next_to_complete_idx)
return NULL;
- return shadow_queue_get_element(queue, queue->cons_idx);
+ return shadow_queue_get_element(queue, cons_idx);
}
static inline struct shadow_wqe_header *
shadow_queue_get_next_to_complete(struct shadow_queue *queue)
{
- if (queue->next_to_complete_idx == queue->prod_idx)
+ uint64_t prod_idx = atomic_load_explicit(producer(queue), memory_order_acquire);
+
+ if (queue->next_to_complete_idx == prod_idx)
return NULL;
return shadow_queue_get_element(queue, queue->next_to_complete_idx);
@@ -153,10 +164,11 @@ static inline void shadow_queue_advance_next_to_complete(struct shadow_queue *qu
static inline struct shadow_wqe_header *
shadow_queue_get_next_to_signal(struct shadow_queue *queue)
{
+ uint64_t prod_idx = atomic_load_explicit(producer(queue), memory_order_acquire);
struct shadow_wqe_header *wqe = NULL;
queue->next_to_signal_idx = max(queue->next_to_signal_idx, queue->next_to_complete_idx);
- while (queue->next_to_signal_idx < queue->prod_idx) {
+ while (queue->next_to_signal_idx < prod_idx) {
wqe = shadow_queue_get_element(queue, queue->next_to_signal_idx);
queue->next_to_signal_idx++;
if (wqe->flags != MANA_NO_SIGNAL_WC)