1 /* SPDX-License-Identifier: GPL-2.0 */ 2 /* XDP user-space ring structure 3 * Copyright(c) 2018 Intel Corporation. 4 */ 5 6 #ifndef _LINUX_XSK_QUEUE_H 7 #define _LINUX_XSK_QUEUE_H 8 9 #include <linux/types.h> 10 #include <linux/if_xdp.h> 11 #include <net/xdp_sock.h> 12 13 #define RX_BATCH_SIZE 16 14 #define LAZY_UPDATE_THRESHOLD 128 15 16 struct xdp_ring { 17 u32 producer ____cacheline_aligned_in_smp; 18 u32 consumer ____cacheline_aligned_in_smp; 19 }; 20 21 /* Used for the RX and TX queues for packets */ 22 struct xdp_rxtx_ring { 23 struct xdp_ring ptrs; 24 struct xdp_desc desc[0] ____cacheline_aligned_in_smp; 25 }; 26 27 /* Used for the fill and completion queues for buffers */ 28 struct xdp_umem_ring { 29 struct xdp_ring ptrs; 30 u64 desc[0] ____cacheline_aligned_in_smp; 31 }; 32 33 struct xsk_queue { 34 u64 chunk_mask; 35 u64 size; 36 u32 ring_mask; 37 u32 nentries; 38 u32 prod_head; 39 u32 prod_tail; 40 u32 cons_head; 41 u32 cons_tail; 42 struct xdp_ring *ring; 43 u64 invalid_descs; 44 }; 45 46 /* Common functions operating for both RXTX and umem queues */ 47 48 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) 49 { 50 return q ? q->invalid_descs : 0; 51 } 52 53 static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) 54 { 55 u32 entries = q->prod_tail - q->cons_tail; 56 57 if (entries == 0) { 58 /* Refresh the local pointer */ 59 q->prod_tail = READ_ONCE(q->ring->producer); 60 entries = q->prod_tail - q->cons_tail; 61 } 62 63 return (entries > dcnt) ? dcnt : entries; 64 } 65 66 static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) 67 { 68 u32 free_entries = q->nentries - (producer - q->cons_tail); 69 70 if (free_entries >= dcnt) 71 return free_entries; 72 73 /* Refresh the local tail pointer */ 74 q->cons_tail = READ_ONCE(q->ring->consumer); 75 return q->nentries - (producer - q->cons_tail); 76 } 77 78 /* UMEM queue */ 79 80 static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) 81 { 82 if (addr >= q->size) { 83 q->invalid_descs++; 84 return false; 85 } 86 87 return true; 88 } 89 90 static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr) 91 { 92 while (q->cons_tail != q->cons_head) { 93 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 94 unsigned int idx = q->cons_tail & q->ring_mask; 95 96 *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask; 97 if (xskq_is_valid_addr(q, *addr)) 98 return addr; 99 100 q->cons_tail++; 101 } 102 103 return NULL; 104 } 105 106 static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr) 107 { 108 if (q->cons_tail == q->cons_head) { 109 WRITE_ONCE(q->ring->consumer, q->cons_tail); 110 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 111 112 /* Order consumer and data */ 113 smp_rmb(); 114 } 115 116 return xskq_validate_addr(q, addr); 117 } 118 119 static inline void xskq_discard_addr(struct xsk_queue *q) 120 { 121 q->cons_tail++; 122 } 123 124 static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) 125 { 126 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 127 128 if (xskq_nb_free(q, q->prod_tail, 1) == 0) 129 return -ENOSPC; 130 131 ring->desc[q->prod_tail++ & q->ring_mask] = addr; 132 133 /* Order producer and data */ 134 smp_wmb(); 135 136 WRITE_ONCE(q->ring->producer, q->prod_tail); 137 return 0; 138 } 139 140 static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) 141 { 142 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 143 144 if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) 145 return -ENOSPC; 146 147 ring->desc[q->prod_head++ & q->ring_mask] = addr; 148 return 0; 149 } 150 151 static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, 152 u32 nb_entries) 153 { 154 /* Order producer and data */ 155 smp_wmb(); 156 157 q->prod_tail += nb_entries; 158 WRITE_ONCE(q->ring->producer, q->prod_tail); 159 } 160 161 static inline int xskq_reserve_addr(struct xsk_queue *q) 162 { 163 if (xskq_nb_free(q, q->prod_head, 1) == 0) 164 return -ENOSPC; 165 166 q->prod_head++; 167 return 0; 168 } 169 170 /* Rx/Tx queue */ 171 172 static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d) 173 { 174 if (!xskq_is_valid_addr(q, d->addr)) 175 return false; 176 177 if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) || 178 d->options) { 179 q->invalid_descs++; 180 return false; 181 } 182 183 return true; 184 } 185 186 static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q, 187 struct xdp_desc *desc) 188 { 189 while (q->cons_tail != q->cons_head) { 190 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 191 unsigned int idx = q->cons_tail & q->ring_mask; 192 193 *desc = READ_ONCE(ring->desc[idx]); 194 if (xskq_is_valid_desc(q, desc)) 195 return desc; 196 197 q->cons_tail++; 198 } 199 200 return NULL; 201 } 202 203 static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, 204 struct xdp_desc *desc) 205 { 206 if (q->cons_tail == q->cons_head) { 207 WRITE_ONCE(q->ring->consumer, q->cons_tail); 208 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 209 210 /* Order consumer and data */ 211 smp_rmb(); 212 } 213 214 return xskq_validate_desc(q, desc); 215 } 216 217 static inline void xskq_discard_desc(struct xsk_queue *q) 218 { 219 q->cons_tail++; 220 } 221 222 static inline int xskq_produce_batch_desc(struct xsk_queue *q, 223 u64 addr, u32 len) 224 { 225 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 226 unsigned int idx; 227 228 if (xskq_nb_free(q, q->prod_head, 1) == 0) 229 return -ENOSPC; 230 231 idx = (q->prod_head++) & q->ring_mask; 232 ring->desc[idx].addr = addr; 233 ring->desc[idx].len = len; 234 235 return 0; 236 } 237 238 static inline void xskq_produce_flush_desc(struct xsk_queue *q) 239 { 240 /* Order producer and data */ 241 smp_wmb(); 242 243 q->prod_tail = q->prod_head, 244 WRITE_ONCE(q->ring->producer, q->prod_tail); 245 } 246 247 static inline bool xskq_full_desc(struct xsk_queue *q) 248 { 249 return xskq_nb_avail(q, q->nentries) == q->nentries; 250 } 251 252 static inline bool xskq_empty_desc(struct xsk_queue *q) 253 { 254 return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries; 255 } 256 257 void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask); 258 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue); 259 void xskq_destroy(struct xsk_queue *q_ops); 260 261 /* Executed by the core when the entire UMEM gets freed */ 262 void xsk_reuseq_destroy(struct xdp_umem *umem); 263 264 #endif /* _LINUX_XSK_QUEUE_H */ 265