1 /* SPDX-License-Identifier: GPL-2.0 */ 2 /* XDP user-space ring structure 3 * Copyright(c) 2018 Intel Corporation. 4 */ 5 6 #ifndef _LINUX_XSK_QUEUE_H 7 #define _LINUX_XSK_QUEUE_H 8 9 #include <linux/types.h> 10 #include <linux/if_xdp.h> 11 #include <net/xdp_sock.h> 12 13 #define RX_BATCH_SIZE 16 14 #define LAZY_UPDATE_THRESHOLD 128 15 16 struct xdp_ring { 17 u32 producer ____cacheline_aligned_in_smp; 18 u32 consumer ____cacheline_aligned_in_smp; 19 u32 flags; 20 }; 21 22 /* Used for the RX and TX queues for packets */ 23 struct xdp_rxtx_ring { 24 struct xdp_ring ptrs; 25 struct xdp_desc desc[0] ____cacheline_aligned_in_smp; 26 }; 27 28 /* Used for the fill and completion queues for buffers */ 29 struct xdp_umem_ring { 30 struct xdp_ring ptrs; 31 u64 desc[0] ____cacheline_aligned_in_smp; 32 }; 33 34 struct xsk_queue { 35 u64 chunk_mask; 36 u64 size; 37 u32 ring_mask; 38 u32 nentries; 39 u32 prod_head; 40 u32 prod_tail; 41 u32 cons_head; 42 u32 cons_tail; 43 struct xdp_ring *ring; 44 u64 invalid_descs; 45 }; 46 47 /* The structure of the shared state of the rings are the same as the 48 * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion 49 * ring, the kernel is the producer and user space is the consumer. For 50 * the Tx and fill rings, the kernel is the consumer and user space is 51 * the producer. 52 * 53 * producer consumer 54 * 55 * if (LOAD ->consumer) { LOAD ->producer 56 * (A) smp_rmb() (C) 57 * STORE $data LOAD $data 58 * smp_wmb() (B) smp_mb() (D) 59 * STORE ->producer STORE ->consumer 60 * } 61 * 62 * (A) pairs with (D), and (B) pairs with (C). 63 * 64 * Starting with (B), it protects the data from being written after 65 * the producer pointer. If this barrier was missing, the consumer 66 * could observe the producer pointer being set and thus load the data 67 * before the producer has written the new data. The consumer would in 68 * this case load the old data. 69 * 70 * (C) protects the consumer from speculatively loading the data before 71 * the producer pointer actually has been read. If we do not have this 72 * barrier, some architectures could load old data as speculative loads 73 * are not discarded as the CPU does not know there is a dependency 74 * between ->producer and data. 75 * 76 * (A) is a control dependency that separates the load of ->consumer 77 * from the stores of $data. In case ->consumer indicates there is no 78 * room in the buffer to store $data we do not. So no barrier is needed. 79 * 80 * (D) protects the load of the data to be observed to happen after the 81 * store of the consumer pointer. If we did not have this memory 82 * barrier, the producer could observe the consumer pointer being set 83 * and overwrite the data with a new value before the consumer got the 84 * chance to read the old value. The consumer would thus miss reading 85 * the old entry and very likely read the new entry twice, once right 86 * now and again after circling through the ring. 87 */ 88 89 /* Common functions operating for both RXTX and umem queues */ 90 91 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) 92 { 93 return q ? q->invalid_descs : 0; 94 } 95 96 static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) 97 { 98 u32 entries = q->prod_tail - q->cons_tail; 99 100 if (entries == 0) { 101 /* Refresh the local pointer */ 102 q->prod_tail = READ_ONCE(q->ring->producer); 103 entries = q->prod_tail - q->cons_tail; 104 } 105 106 return (entries > dcnt) ? dcnt : entries; 107 } 108 109 static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) 110 { 111 u32 free_entries = q->nentries - (producer - q->cons_tail); 112 113 if (free_entries >= dcnt) 114 return free_entries; 115 116 /* Refresh the local tail pointer */ 117 q->cons_tail = READ_ONCE(q->ring->consumer); 118 return q->nentries - (producer - q->cons_tail); 119 } 120 121 static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt) 122 { 123 u32 entries = q->prod_tail - q->cons_tail; 124 125 if (entries >= cnt) 126 return true; 127 128 /* Refresh the local pointer. */ 129 q->prod_tail = READ_ONCE(q->ring->producer); 130 entries = q->prod_tail - q->cons_tail; 131 132 return entries >= cnt; 133 } 134 135 /* UMEM queue */ 136 137 static inline bool xskq_crosses_non_contig_pg(struct xdp_umem *umem, u64 addr, 138 u64 length) 139 { 140 bool cross_pg = (addr & (PAGE_SIZE - 1)) + length > PAGE_SIZE; 141 bool next_pg_contig = 142 (unsigned long)umem->pages[(addr >> PAGE_SHIFT)].addr & 143 XSK_NEXT_PG_CONTIG_MASK; 144 145 return cross_pg && !next_pg_contig; 146 } 147 148 static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) 149 { 150 if (addr >= q->size) { 151 q->invalid_descs++; 152 return false; 153 } 154 155 return true; 156 } 157 158 static inline bool xskq_is_valid_addr_unaligned(struct xsk_queue *q, u64 addr, 159 u64 length, 160 struct xdp_umem *umem) 161 { 162 u64 base_addr = xsk_umem_extract_addr(addr); 163 164 addr = xsk_umem_add_offset_to_addr(addr); 165 if (base_addr >= q->size || addr >= q->size || 166 xskq_crosses_non_contig_pg(umem, addr, length)) { 167 q->invalid_descs++; 168 return false; 169 } 170 171 return true; 172 } 173 174 static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr, 175 struct xdp_umem *umem) 176 { 177 while (q->cons_tail != q->cons_head) { 178 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 179 unsigned int idx = q->cons_tail & q->ring_mask; 180 181 *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask; 182 183 if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) { 184 if (xskq_is_valid_addr_unaligned(q, *addr, 185 umem->chunk_size_nohr, 186 umem)) 187 return addr; 188 goto out; 189 } 190 191 if (xskq_is_valid_addr(q, *addr)) 192 return addr; 193 194 out: 195 q->cons_tail++; 196 } 197 198 return NULL; 199 } 200 201 static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr, 202 struct xdp_umem *umem) 203 { 204 if (q->cons_tail == q->cons_head) { 205 smp_mb(); /* D, matches A */ 206 WRITE_ONCE(q->ring->consumer, q->cons_tail); 207 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 208 209 /* Order consumer and data */ 210 smp_rmb(); 211 } 212 213 return xskq_validate_addr(q, addr, umem); 214 } 215 216 static inline void xskq_discard_addr(struct xsk_queue *q) 217 { 218 q->cons_tail++; 219 } 220 221 static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) 222 { 223 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 224 225 if (xskq_nb_free(q, q->prod_tail, 1) == 0) 226 return -ENOSPC; 227 228 /* A, matches D */ 229 ring->desc[q->prod_tail++ & q->ring_mask] = addr; 230 231 /* Order producer and data */ 232 smp_wmb(); /* B, matches C */ 233 234 WRITE_ONCE(q->ring->producer, q->prod_tail); 235 return 0; 236 } 237 238 static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) 239 { 240 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 241 242 if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) 243 return -ENOSPC; 244 245 /* A, matches D */ 246 ring->desc[q->prod_head++ & q->ring_mask] = addr; 247 return 0; 248 } 249 250 static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, 251 u32 nb_entries) 252 { 253 /* Order producer and data */ 254 smp_wmb(); /* B, matches C */ 255 256 q->prod_tail += nb_entries; 257 WRITE_ONCE(q->ring->producer, q->prod_tail); 258 } 259 260 static inline int xskq_reserve_addr(struct xsk_queue *q) 261 { 262 if (xskq_nb_free(q, q->prod_head, 1) == 0) 263 return -ENOSPC; 264 265 /* A, matches D */ 266 q->prod_head++; 267 return 0; 268 } 269 270 /* Rx/Tx queue */ 271 272 static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d, 273 struct xdp_umem *umem) 274 { 275 if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) { 276 if (!xskq_is_valid_addr_unaligned(q, d->addr, d->len, umem)) 277 return false; 278 279 if (d->len > umem->chunk_size_nohr || d->options) { 280 q->invalid_descs++; 281 return false; 282 } 283 284 return true; 285 } 286 287 if (!xskq_is_valid_addr(q, d->addr)) 288 return false; 289 290 if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) || 291 d->options) { 292 q->invalid_descs++; 293 return false; 294 } 295 296 return true; 297 } 298 299 static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q, 300 struct xdp_desc *desc, 301 struct xdp_umem *umem) 302 { 303 while (q->cons_tail != q->cons_head) { 304 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 305 unsigned int idx = q->cons_tail & q->ring_mask; 306 307 *desc = READ_ONCE(ring->desc[idx]); 308 if (xskq_is_valid_desc(q, desc, umem)) 309 return desc; 310 311 q->cons_tail++; 312 } 313 314 return NULL; 315 } 316 317 static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, 318 struct xdp_desc *desc, 319 struct xdp_umem *umem) 320 { 321 if (q->cons_tail == q->cons_head) { 322 smp_mb(); /* D, matches A */ 323 WRITE_ONCE(q->ring->consumer, q->cons_tail); 324 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 325 326 /* Order consumer and data */ 327 smp_rmb(); /* C, matches B */ 328 } 329 330 return xskq_validate_desc(q, desc, umem); 331 } 332 333 static inline void xskq_discard_desc(struct xsk_queue *q) 334 { 335 q->cons_tail++; 336 } 337 338 static inline int xskq_produce_batch_desc(struct xsk_queue *q, 339 u64 addr, u32 len) 340 { 341 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 342 unsigned int idx; 343 344 if (xskq_nb_free(q, q->prod_head, 1) == 0) 345 return -ENOSPC; 346 347 /* A, matches D */ 348 idx = (q->prod_head++) & q->ring_mask; 349 ring->desc[idx].addr = addr; 350 ring->desc[idx].len = len; 351 352 return 0; 353 } 354 355 static inline void xskq_produce_flush_desc(struct xsk_queue *q) 356 { 357 /* Order producer and data */ 358 smp_wmb(); /* B, matches C */ 359 360 q->prod_tail = q->prod_head; 361 WRITE_ONCE(q->ring->producer, q->prod_tail); 362 } 363 364 static inline bool xskq_full_desc(struct xsk_queue *q) 365 { 366 return xskq_nb_avail(q, q->nentries) == q->nentries; 367 } 368 369 static inline bool xskq_empty_desc(struct xsk_queue *q) 370 { 371 return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries; 372 } 373 374 void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask); 375 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue); 376 void xskq_destroy(struct xsk_queue *q_ops); 377 378 /* Executed by the core when the entire UMEM gets freed */ 379 void xsk_reuseq_destroy(struct xdp_umem *umem); 380 381 #endif /* _LINUX_XSK_QUEUE_H */ 382