summaryrefslogtreecommitdiff
path: root/net/xdp/xsk_queue.h
diff options
context:
space:
mode:
authorMagnus Karlsson <magnus.karlsson@intel.com>2019-12-19 13:39:26 +0100
committerAlexei Starovoitov <ast@kernel.org>2019-12-20 16:00:09 -0800
commitc5ed924b54c892ee637d2e6889ef83341835a560 (patch)
tree344a4394b00e50605ff67be098e8ac856ac5647b /net/xdp/xsk_queue.h
parentdf0ae6f78a45e5696427779fc3379c5d75f5d4a5 (diff)
xsk: Simplify the consumer ring access functions
Simplify and refactor consumer ring functions. The consumer first "peeks" to find descriptors or addresses that are available to read from the ring, then reads them and finally "releases" these descriptors once it is done. The two local variables cons_tail and cons_head are turned into one single variable called cached_cons. cached_tail referred to the cached value of the global consumer pointer and will be stored in cached_cons. For cached_head, we just use cached_prod instead as it was not used for a consumer queue before. It also better reflects what it really is now: a cached copy of the producer pointer. The names of the functions are also renamed in the same manner as the producer functions. The new functions are called xskq_cons_ followed by what it does. Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com> Signed-off-by: Alexei Starovoitov <ast@kernel.org> Link: https://lore.kernel.org/bpf/1576759171-28550-8-git-send-email-magnus.karlsson@intel.com
Diffstat (limited to 'net/xdp/xsk_queue.h')
-rw-r--r--net/xdp/xsk_queue.h102
1 files changed, 46 insertions, 56 deletions
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 8bfa2ee6864c..1436116767ea 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -34,8 +34,7 @@ struct xsk_queue {
u32 ring_mask;
u32 nentries;
u32 cached_prod;
- u32 cons_head;
- u32 cons_tail;
+ u32 cached_cons;
struct xdp_ring *ring;
u64 invalid_descs;
};
@@ -89,43 +88,48 @@ static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
return q ? q->invalid_descs : 0;
}
-static inline u32 xskq_nb_avail(struct xsk_queue *q)
+static inline void __xskq_cons_release(struct xsk_queue *q)
{
- u32 entries = q->cached_prod - q->cons_tail;
+ smp_mb(); /* D, matches A */
+ WRITE_ONCE(q->ring->consumer, q->cached_cons);
+}
- if (entries == 0) {
- /* Refresh the local pointer */
- q->cached_prod = READ_ONCE(q->ring->producer);
- entries = q->cached_prod - q->cons_tail;
- }
+static inline void __xskq_cons_peek(struct xsk_queue *q)
+{
+ /* Refresh the local pointer */
+ q->cached_prod = READ_ONCE(q->ring->producer);
+ smp_rmb(); /* C, matches B */
+}
- return entries;
+static inline void xskq_cons_get_entries(struct xsk_queue *q)
+{
+ __xskq_cons_release(q);
+ __xskq_cons_peek(q);
}
static inline bool xskq_prod_is_full(struct xsk_queue *q)
{
- u32 free_entries = q->nentries - (q->cached_prod - q->cons_tail);
+ u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
if (free_entries)
return false;
/* Refresh the local tail pointer */
- q->cons_tail = READ_ONCE(q->ring->consumer);
- free_entries = q->nentries - (q->cached_prod - q->cons_tail);
+ q->cached_cons = READ_ONCE(q->ring->consumer);
+ free_entries = q->nentries - (q->cached_prod - q->cached_cons);
return !free_entries;
}
-static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt)
+static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
{
- u32 entries = q->cached_prod - q->cons_tail;
+ u32 entries = q->cached_prod - q->cached_cons;
if (entries >= cnt)
return true;
- /* Refresh the local pointer. */
- q->cached_prod = READ_ONCE(q->ring->producer);
- entries = q->cached_prod - q->cons_tail;
+ __xskq_cons_peek(q);
+ entries = q->cached_prod - q->cached_cons;
return entries >= cnt;
}
@@ -172,9 +176,10 @@ static inline bool xskq_is_valid_addr_unaligned(struct xsk_queue *q, u64 addr,
static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr,
struct xdp_umem *umem)
{
- while (q->cons_tail != q->cons_head) {
- struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
- unsigned int idx = q->cons_tail & q->ring_mask;
+ struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+
+ while (q->cached_cons != q->cached_prod) {
+ u32 idx = q->cached_cons & q->ring_mask;
*addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask;
@@ -190,30 +195,27 @@ static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr,
return addr;
out:
- q->cons_tail++;
+ q->cached_cons++;
}
return NULL;
}
-static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr,
- struct xdp_umem *umem)
+static inline u64 *xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr,
+ struct xdp_umem *umem)
{
- if (q->cons_tail == q->cons_head) {
- smp_mb(); /* D, matches A */
- WRITE_ONCE(q->ring->consumer, q->cons_tail);
- q->cons_head = q->cons_tail + xskq_nb_avail(q);
-
- /* Order consumer and data */
- smp_rmb();
- }
-
+ if (q->cached_prod == q->cached_cons)
+ xskq_cons_get_entries(q);
return xskq_validate_addr(q, addr, umem);
}
-static inline void xskq_discard_addr(struct xsk_queue *q)
+static inline void xskq_cons_release(struct xsk_queue *q)
{
- q->cons_tail++;
+ /* To improve performance, only update local state here.
+ * Do the actual release operation when we get new entries
+ * from the ring in xskq_cons_get_entries() instead.
+ */
+ q->cached_cons++;
}
static inline int xskq_prod_reserve(struct xsk_queue *q)
@@ -299,41 +301,29 @@ static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q,
struct xdp_desc *desc,
struct xdp_umem *umem)
{
- while (q->cons_tail != q->cons_head) {
+ while (q->cached_cons != q->cached_prod) {
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
- unsigned int idx = q->cons_tail & q->ring_mask;
+ u32 idx = q->cached_cons & q->ring_mask;
*desc = READ_ONCE(ring->desc[idx]);
if (xskq_is_valid_desc(q, desc, umem))
return desc;
- q->cons_tail++;
+ q->cached_cons++;
}
return NULL;
}
-static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
- struct xdp_desc *desc,
- struct xdp_umem *umem)
+static inline struct xdp_desc *xskq_cons_peek_desc(struct xsk_queue *q,
+ struct xdp_desc *desc,
+ struct xdp_umem *umem)
{
- if (q->cons_tail == q->cons_head) {
- smp_mb(); /* D, matches A */
- WRITE_ONCE(q->ring->consumer, q->cons_tail);
- q->cons_head = q->cons_tail + xskq_nb_avail(q);
-
- /* Order consumer and data */
- smp_rmb(); /* C, matches B */
- }
-
+ if (q->cached_prod == q->cached_cons)
+ xskq_cons_get_entries(q);
return xskq_validate_desc(q, desc, umem);
}
-static inline void xskq_discard_desc(struct xsk_queue *q)
-{
- q->cons_tail++;
-}
-
static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
u64 addr, u32 len)
{
@@ -351,7 +341,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
return 0;
}
-static inline bool xskq_full_desc(struct xsk_queue *q)
+static inline bool xskq_cons_is_full(struct xsk_queue *q)
{
/* No barriers needed since data is not accessed */
return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==