1 /* SPDX-License-Identifier: GPL-2.0 */ 2 /* XDP user-space ring structure 3 * Copyright(c) 2018 Intel Corporation. 4 */ 5 6 #ifndef _LINUX_XSK_QUEUE_H 7 #define _LINUX_XSK_QUEUE_H 8 9 #include <linux/types.h> 10 #include <linux/if_xdp.h> 11 #include <net/xdp_sock.h> 12 13 #define RX_BATCH_SIZE 16 14 #define LAZY_UPDATE_THRESHOLD 128 15 16 struct xdp_ring { 17 u32 producer ____cacheline_aligned_in_smp; 18 u32 consumer ____cacheline_aligned_in_smp; 19 }; 20 21 /* Used for the RX and TX queues for packets */ 22 struct xdp_rxtx_ring { 23 struct xdp_ring ptrs; 24 struct xdp_desc desc[0] ____cacheline_aligned_in_smp; 25 }; 26 27 /* Used for the fill and completion queues for buffers */ 28 struct xdp_umem_ring { 29 struct xdp_ring ptrs; 30 u64 desc[0] ____cacheline_aligned_in_smp; 31 }; 32 33 struct xsk_queue { 34 u64 chunk_mask; 35 u64 size; 36 u32 ring_mask; 37 u32 nentries; 38 u32 prod_head; 39 u32 prod_tail; 40 u32 cons_head; 41 u32 cons_tail; 42 struct xdp_ring *ring; 43 u64 invalid_descs; 44 }; 45 46 /* The structure of the shared state of the rings are the same as the 47 * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion 48 * ring, the kernel is the producer and user space is the consumer. For 49 * the Tx and fill rings, the kernel is the consumer and user space is 50 * the producer. 51 * 52 * producer consumer 53 * 54 * if (LOAD ->consumer) { LOAD ->producer 55 * (A) smp_rmb() (C) 56 * STORE $data LOAD $data 57 * smp_wmb() (B) smp_mb() (D) 58 * STORE ->producer STORE ->consumer 59 * } 60 * 61 * (A) pairs with (D), and (B) pairs with (C). 62 * 63 * Starting with (B), it protects the data from being written after 64 * the producer pointer. If this barrier was missing, the consumer 65 * could observe the producer pointer being set and thus load the data 66 * before the producer has written the new data. The consumer would in 67 * this case load the old data. 68 * 69 * (C) protects the consumer from speculatively loading the data before 70 * the producer pointer actually has been read. If we do not have this 71 * barrier, some architectures could load old data as speculative loads 72 * are not discarded as the CPU does not know there is a dependency 73 * between ->producer and data. 74 * 75 * (A) is a control dependency that separates the load of ->consumer 76 * from the stores of $data. In case ->consumer indicates there is no 77 * room in the buffer to store $data we do not. So no barrier is needed. 78 * 79 * (D) protects the load of the data to be observed to happen after the 80 * store of the consumer pointer. If we did not have this memory 81 * barrier, the producer could observe the consumer pointer being set 82 * and overwrite the data with a new value before the consumer got the 83 * chance to read the old value. The consumer would thus miss reading 84 * the old entry and very likely read the new entry twice, once right 85 * now and again after circling through the ring. 86 */ 87 88 /* Common functions operating for both RXTX and umem queues */ 89 90 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) 91 { 92 return q ? q->invalid_descs : 0; 93 } 94 95 static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) 96 { 97 u32 entries = q->prod_tail - q->cons_tail; 98 99 if (entries == 0) { 100 /* Refresh the local pointer */ 101 q->prod_tail = READ_ONCE(q->ring->producer); 102 entries = q->prod_tail - q->cons_tail; 103 } 104 105 return (entries > dcnt) ? dcnt : entries; 106 } 107 108 static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) 109 { 110 u32 free_entries = q->nentries - (producer - q->cons_tail); 111 112 if (free_entries >= dcnt) 113 return free_entries; 114 115 /* Refresh the local tail pointer */ 116 q->cons_tail = READ_ONCE(q->ring->consumer); 117 return q->nentries - (producer - q->cons_tail); 118 } 119 120 /* UMEM queue */ 121 122 static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) 123 { 124 if (addr >= q->size) { 125 q->invalid_descs++; 126 return false; 127 } 128 129 return true; 130 } 131 132 static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr) 133 { 134 while (q->cons_tail != q->cons_head) { 135 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 136 unsigned int idx = q->cons_tail & q->ring_mask; 137 138 *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask; 139 if (xskq_is_valid_addr(q, *addr)) 140 return addr; 141 142 q->cons_tail++; 143 } 144 145 return NULL; 146 } 147 148 static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr) 149 { 150 if (q->cons_tail == q->cons_head) { 151 smp_mb(); /* D, matches A */ 152 WRITE_ONCE(q->ring->consumer, q->cons_tail); 153 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 154 155 /* Order consumer and data */ 156 smp_rmb(); 157 } 158 159 return xskq_validate_addr(q, addr); 160 } 161 162 static inline void xskq_discard_addr(struct xsk_queue *q) 163 { 164 q->cons_tail++; 165 } 166 167 static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) 168 { 169 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 170 171 if (xskq_nb_free(q, q->prod_tail, 1) == 0) 172 return -ENOSPC; 173 174 /* A, matches D */ 175 ring->desc[q->prod_tail++ & q->ring_mask] = addr; 176 177 /* Order producer and data */ 178 smp_wmb(); /* B, matches C */ 179 180 WRITE_ONCE(q->ring->producer, q->prod_tail); 181 return 0; 182 } 183 184 static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) 185 { 186 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 187 188 if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) 189 return -ENOSPC; 190 191 /* A, matches D */ 192 ring->desc[q->prod_head++ & q->ring_mask] = addr; 193 return 0; 194 } 195 196 static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, 197 u32 nb_entries) 198 { 199 /* Order producer and data */ 200 smp_wmb(); /* B, matches C */ 201 202 q->prod_tail += nb_entries; 203 WRITE_ONCE(q->ring->producer, q->prod_tail); 204 } 205 206 static inline int xskq_reserve_addr(struct xsk_queue *q) 207 { 208 if (xskq_nb_free(q, q->prod_head, 1) == 0) 209 return -ENOSPC; 210 211 /* A, matches D */ 212 q->prod_head++; 213 return 0; 214 } 215 216 /* Rx/Tx queue */ 217 218 static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d) 219 { 220 if (!xskq_is_valid_addr(q, d->addr)) 221 return false; 222 223 if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) || 224 d->options) { 225 q->invalid_descs++; 226 return false; 227 } 228 229 return true; 230 } 231 232 static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q, 233 struct xdp_desc *desc) 234 { 235 while (q->cons_tail != q->cons_head) { 236 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 237 unsigned int idx = q->cons_tail & q->ring_mask; 238 239 *desc = READ_ONCE(ring->desc[idx]); 240 if (xskq_is_valid_desc(q, desc)) 241 return desc; 242 243 q->cons_tail++; 244 } 245 246 return NULL; 247 } 248 249 static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, 250 struct xdp_desc *desc) 251 { 252 if (q->cons_tail == q->cons_head) { 253 smp_mb(); /* D, matches A */ 254 WRITE_ONCE(q->ring->consumer, q->cons_tail); 255 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 256 257 /* Order consumer and data */ 258 smp_rmb(); /* C, matches B */ 259 } 260 261 return xskq_validate_desc(q, desc); 262 } 263 264 static inline void xskq_discard_desc(struct xsk_queue *q) 265 { 266 q->cons_tail++; 267 } 268 269 static inline int xskq_produce_batch_desc(struct xsk_queue *q, 270 u64 addr, u32 len) 271 { 272 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 273 unsigned int idx; 274 275 if (xskq_nb_free(q, q->prod_head, 1) == 0) 276 return -ENOSPC; 277 278 /* A, matches D */ 279 idx = (q->prod_head++) & q->ring_mask; 280 ring->desc[idx].addr = addr; 281 ring->desc[idx].len = len; 282 283 return 0; 284 } 285 286 static inline void xskq_produce_flush_desc(struct xsk_queue *q) 287 { 288 /* Order producer and data */ 289 smp_wmb(); /* B, matches C */ 290 291 q->prod_tail = q->prod_head, 292 WRITE_ONCE(q->ring->producer, q->prod_tail); 293 } 294 295 static inline bool xskq_full_desc(struct xsk_queue *q) 296 { 297 return xskq_nb_avail(q, q->nentries) == q->nentries; 298 } 299 300 static inline bool xskq_empty_desc(struct xsk_queue *q) 301 { 302 return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries; 303 } 304 305 void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask); 306 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue); 307 void xskq_destroy(struct xsk_queue *q_ops); 308 309 /* Executed by the core when the entire UMEM gets freed */ 310 void xsk_reuseq_destroy(struct xdp_umem *umem); 311 312 #endif /* _LINUX_XSK_QUEUE_H */ 313