1 /* SPDX-License-Identifier: GPL-2.0 */ 2 /* XDP user-space ring structure 3 * Copyright(c) 2018 Intel Corporation. 4 */ 5 6 #ifndef _LINUX_XSK_QUEUE_H 7 #define _LINUX_XSK_QUEUE_H 8 9 #include <linux/types.h> 10 #include <linux/if_xdp.h> 11 #include <net/xdp_sock.h> 12 13 #define RX_BATCH_SIZE 16 14 #define LAZY_UPDATE_THRESHOLD 128 15 16 struct xdp_ring { 17 u32 producer ____cacheline_aligned_in_smp; 18 u32 consumer ____cacheline_aligned_in_smp; 19 }; 20 21 /* Used for the RX and TX queues for packets */ 22 struct xdp_rxtx_ring { 23 struct xdp_ring ptrs; 24 struct xdp_desc desc[0] ____cacheline_aligned_in_smp; 25 }; 26 27 /* Used for the fill and completion queues for buffers */ 28 struct xdp_umem_ring { 29 struct xdp_ring ptrs; 30 u64 desc[0] ____cacheline_aligned_in_smp; 31 }; 32 33 struct xsk_queue { 34 u64 chunk_mask; 35 u64 size; 36 u32 ring_mask; 37 u32 nentries; 38 u32 prod_head; 39 u32 prod_tail; 40 u32 cons_head; 41 u32 cons_tail; 42 struct xdp_ring *ring; 43 u64 invalid_descs; 44 }; 45 46 /* The structure of the shared state of the rings are the same as the 47 * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion 48 * ring, the kernel is the producer and user space is the consumer. For 49 * the Tx and fill rings, the kernel is the consumer and user space is 50 * the producer. 51 * 52 * producer consumer 53 * 54 * if (LOAD ->consumer) { LOAD ->producer 55 * (A) smp_rmb() (C) 56 * STORE $data LOAD $data 57 * smp_wmb() (B) smp_mb() (D) 58 * STORE ->producer STORE ->consumer 59 * } 60 * 61 * (A) pairs with (D), and (B) pairs with (C). 62 * 63 * Starting with (B), it protects the data from being written after 64 * the producer pointer. If this barrier was missing, the consumer 65 * could observe the producer pointer being set and thus load the data 66 * before the producer has written the new data. The consumer would in 67 * this case load the old data. 68 * 69 * (C) protects the consumer from speculatively loading the data before 70 * the producer pointer actually has been read. If we do not have this 71 * barrier, some architectures could load old data as speculative loads 72 * are not discarded as the CPU does not know there is a dependency 73 * between ->producer and data. 74 * 75 * (A) is a control dependency that separates the load of ->consumer 76 * from the stores of $data. In case ->consumer indicates there is no 77 * room in the buffer to store $data we do not. So no barrier is needed. 78 * 79 * (D) protects the load of the data to be observed to happen after the 80 * store of the consumer pointer. If we did not have this memory 81 * barrier, the producer could observe the consumer pointer being set 82 * and overwrite the data with a new value before the consumer got the 83 * chance to read the old value. The consumer would thus miss reading 84 * the old entry and very likely read the new entry twice, once right 85 * now and again after circling through the ring. 86 */ 87 88 /* Common functions operating for both RXTX and umem queues */ 89 90 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) 91 { 92 return q ? q->invalid_descs : 0; 93 } 94 95 static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) 96 { 97 u32 entries = q->prod_tail - q->cons_tail; 98 99 if (entries == 0) { 100 /* Refresh the local pointer */ 101 q->prod_tail = READ_ONCE(q->ring->producer); 102 entries = q->prod_tail - q->cons_tail; 103 } 104 105 return (entries > dcnt) ? dcnt : entries; 106 } 107 108 static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) 109 { 110 u32 free_entries = q->nentries - (producer - q->cons_tail); 111 112 if (free_entries >= dcnt) 113 return free_entries; 114 115 /* Refresh the local tail pointer */ 116 q->cons_tail = READ_ONCE(q->ring->consumer); 117 return q->nentries - (producer - q->cons_tail); 118 } 119 120 static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt) 121 { 122 u32 entries = q->prod_tail - q->cons_tail; 123 124 if (entries >= cnt) 125 return true; 126 127 /* Refresh the local pointer. */ 128 q->prod_tail = READ_ONCE(q->ring->producer); 129 entries = q->prod_tail - q->cons_tail; 130 131 return entries >= cnt; 132 } 133 134 /* UMEM queue */ 135 136 static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) 137 { 138 if (addr >= q->size) { 139 q->invalid_descs++; 140 return false; 141 } 142 143 return true; 144 } 145 146 static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr) 147 { 148 while (q->cons_tail != q->cons_head) { 149 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 150 unsigned int idx = q->cons_tail & q->ring_mask; 151 152 *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask; 153 if (xskq_is_valid_addr(q, *addr)) 154 return addr; 155 156 q->cons_tail++; 157 } 158 159 return NULL; 160 } 161 162 static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr) 163 { 164 if (q->cons_tail == q->cons_head) { 165 smp_mb(); /* D, matches A */ 166 WRITE_ONCE(q->ring->consumer, q->cons_tail); 167 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 168 169 /* Order consumer and data */ 170 smp_rmb(); 171 } 172 173 return xskq_validate_addr(q, addr); 174 } 175 176 static inline void xskq_discard_addr(struct xsk_queue *q) 177 { 178 q->cons_tail++; 179 } 180 181 static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) 182 { 183 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 184 185 if (xskq_nb_free(q, q->prod_tail, 1) == 0) 186 return -ENOSPC; 187 188 /* A, matches D */ 189 ring->desc[q->prod_tail++ & q->ring_mask] = addr; 190 191 /* Order producer and data */ 192 smp_wmb(); /* B, matches C */ 193 194 WRITE_ONCE(q->ring->producer, q->prod_tail); 195 return 0; 196 } 197 198 static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) 199 { 200 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 201 202 if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) 203 return -ENOSPC; 204 205 /* A, matches D */ 206 ring->desc[q->prod_head++ & q->ring_mask] = addr; 207 return 0; 208 } 209 210 static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, 211 u32 nb_entries) 212 { 213 /* Order producer and data */ 214 smp_wmb(); /* B, matches C */ 215 216 q->prod_tail += nb_entries; 217 WRITE_ONCE(q->ring->producer, q->prod_tail); 218 } 219 220 static inline int xskq_reserve_addr(struct xsk_queue *q) 221 { 222 if (xskq_nb_free(q, q->prod_head, 1) == 0) 223 return -ENOSPC; 224 225 /* A, matches D */ 226 q->prod_head++; 227 return 0; 228 } 229 230 /* Rx/Tx queue */ 231 232 static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d) 233 { 234 if (!xskq_is_valid_addr(q, d->addr)) 235 return false; 236 237 if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) || 238 d->options) { 239 q->invalid_descs++; 240 return false; 241 } 242 243 return true; 244 } 245 246 static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q, 247 struct xdp_desc *desc) 248 { 249 while (q->cons_tail != q->cons_head) { 250 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 251 unsigned int idx = q->cons_tail & q->ring_mask; 252 253 *desc = READ_ONCE(ring->desc[idx]); 254 if (xskq_is_valid_desc(q, desc)) 255 return desc; 256 257 q->cons_tail++; 258 } 259 260 return NULL; 261 } 262 263 static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, 264 struct xdp_desc *desc) 265 { 266 if (q->cons_tail == q->cons_head) { 267 smp_mb(); /* D, matches A */ 268 WRITE_ONCE(q->ring->consumer, q->cons_tail); 269 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 270 271 /* Order consumer and data */ 272 smp_rmb(); /* C, matches B */ 273 } 274 275 return xskq_validate_desc(q, desc); 276 } 277 278 static inline void xskq_discard_desc(struct xsk_queue *q) 279 { 280 q->cons_tail++; 281 } 282 283 static inline int xskq_produce_batch_desc(struct xsk_queue *q, 284 u64 addr, u32 len) 285 { 286 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 287 unsigned int idx; 288 289 if (xskq_nb_free(q, q->prod_head, 1) == 0) 290 return -ENOSPC; 291 292 /* A, matches D */ 293 idx = (q->prod_head++) & q->ring_mask; 294 ring->desc[idx].addr = addr; 295 ring->desc[idx].len = len; 296 297 return 0; 298 } 299 300 static inline void xskq_produce_flush_desc(struct xsk_queue *q) 301 { 302 /* Order producer and data */ 303 smp_wmb(); /* B, matches C */ 304 305 q->prod_tail = q->prod_head; 306 WRITE_ONCE(q->ring->producer, q->prod_tail); 307 } 308 309 static inline bool xskq_full_desc(struct xsk_queue *q) 310 { 311 return xskq_nb_avail(q, q->nentries) == q->nentries; 312 } 313 314 static inline bool xskq_empty_desc(struct xsk_queue *q) 315 { 316 return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries; 317 } 318 319 void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask); 320 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue); 321 void xskq_destroy(struct xsk_queue *q_ops); 322 323 /* Executed by the core when the entire UMEM gets freed */ 324 void xsk_reuseq_destroy(struct xdp_umem *umem); 325 326 #endif /* _LINUX_XSK_QUEUE_H */ 327