1 /* SPDX-License-Identifier: GPL-2.0 */ 2 /* XDP user-space ring structure 3 * Copyright(c) 2018 Intel Corporation. 4 */ 5 6 #ifndef _LINUX_XSK_QUEUE_H 7 #define _LINUX_XSK_QUEUE_H 8 9 #include <linux/types.h> 10 #include <linux/if_xdp.h> 11 #include <net/xdp_sock.h> 12 #include <net/xsk_buff_pool.h> 13 14 #include "xsk.h" 15 16 struct xdp_ring { 17 u32 producer ____cacheline_aligned_in_smp; 18 u32 consumer ____cacheline_aligned_in_smp; 19 u32 flags; 20 }; 21 22 /* Used for the RX and TX queues for packets */ 23 struct xdp_rxtx_ring { 24 struct xdp_ring ptrs; 25 struct xdp_desc desc[] ____cacheline_aligned_in_smp; 26 }; 27 28 /* Used for the fill and completion queues for buffers */ 29 struct xdp_umem_ring { 30 struct xdp_ring ptrs; 31 u64 desc[] ____cacheline_aligned_in_smp; 32 }; 33 34 struct xsk_queue { 35 u32 ring_mask; 36 u32 nentries; 37 u32 cached_prod; 38 u32 cached_cons; 39 struct xdp_ring *ring; 40 u64 invalid_descs; 41 }; 42 43 /* The structure of the shared state of the rings are the same as the 44 * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion 45 * ring, the kernel is the producer and user space is the consumer. For 46 * the Tx and fill rings, the kernel is the consumer and user space is 47 * the producer. 48 * 49 * producer consumer 50 * 51 * if (LOAD ->consumer) { LOAD ->producer 52 * (A) smp_rmb() (C) 53 * STORE $data LOAD $data 54 * smp_wmb() (B) smp_mb() (D) 55 * STORE ->producer STORE ->consumer 56 * } 57 * 58 * (A) pairs with (D), and (B) pairs with (C). 59 * 60 * Starting with (B), it protects the data from being written after 61 * the producer pointer. If this barrier was missing, the consumer 62 * could observe the producer pointer being set and thus load the data 63 * before the producer has written the new data. The consumer would in 64 * this case load the old data. 65 * 66 * (C) protects the consumer from speculatively loading the data before 67 * the producer pointer actually has been read. If we do not have this 68 * barrier, some architectures could load old data as speculative loads 69 * are not discarded as the CPU does not know there is a dependency 70 * between ->producer and data. 71 * 72 * (A) is a control dependency that separates the load of ->consumer 73 * from the stores of $data. In case ->consumer indicates there is no 74 * room in the buffer to store $data we do not. So no barrier is needed. 75 * 76 * (D) protects the load of the data to be observed to happen after the 77 * store of the consumer pointer. If we did not have this memory 78 * barrier, the producer could observe the consumer pointer being set 79 * and overwrite the data with a new value before the consumer got the 80 * chance to read the old value. The consumer would thus miss reading 81 * the old entry and very likely read the new entry twice, once right 82 * now and again after circling through the ring. 83 */ 84 85 /* The operations on the rings are the following: 86 * 87 * producer consumer 88 * 89 * RESERVE entries PEEK in the ring for entries 90 * WRITE data into the ring READ data from the ring 91 * SUBMIT entries RELEASE entries 92 * 93 * The producer reserves one or more entries in the ring. It can then 94 * fill in these entries and finally submit them so that they can be 95 * seen and read by the consumer. 96 * 97 * The consumer peeks into the ring to see if the producer has written 98 * any new entries. If so, the producer can then read these entries 99 * and when it is done reading them release them back to the producer 100 * so that the producer can use these slots to fill in new entries. 101 * 102 * The function names below reflect these operations. 103 */ 104 105 /* Functions that read and validate content from consumer rings. */ 106 107 static inline bool xskq_cons_read_addr_unchecked(struct xsk_queue *q, u64 *addr) 108 { 109 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 110 111 if (q->cached_cons != q->cached_prod) { 112 u32 idx = q->cached_cons & q->ring_mask; 113 114 *addr = ring->desc[idx]; 115 return true; 116 } 117 118 return false; 119 } 120 121 static inline bool xp_aligned_validate_desc(struct xsk_buff_pool *pool, 122 struct xdp_desc *desc) 123 { 124 u64 chunk, chunk_end; 125 126 chunk = xp_aligned_extract_addr(pool, desc->addr); 127 chunk_end = xp_aligned_extract_addr(pool, desc->addr + desc->len); 128 if (chunk != chunk_end) 129 return false; 130 131 if (chunk >= pool->addrs_cnt) 132 return false; 133 134 if (desc->options) 135 return false; 136 return true; 137 } 138 139 static inline bool xp_unaligned_validate_desc(struct xsk_buff_pool *pool, 140 struct xdp_desc *desc) 141 { 142 u64 addr, base_addr; 143 144 base_addr = xp_unaligned_extract_addr(desc->addr); 145 addr = xp_unaligned_add_offset_to_addr(desc->addr); 146 147 if (desc->len > pool->chunk_size) 148 return false; 149 150 if (base_addr >= pool->addrs_cnt || addr >= pool->addrs_cnt || 151 xp_desc_crosses_non_contig_pg(pool, addr, desc->len)) 152 return false; 153 154 if (desc->options) 155 return false; 156 return true; 157 } 158 159 static inline bool xp_validate_desc(struct xsk_buff_pool *pool, 160 struct xdp_desc *desc) 161 { 162 return pool->unaligned ? xp_unaligned_validate_desc(pool, desc) : 163 xp_aligned_validate_desc(pool, desc); 164 } 165 166 static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q, 167 struct xdp_desc *d, 168 struct xdp_umem *umem) 169 { 170 if (!xp_validate_desc(umem->pool, d)) { 171 q->invalid_descs++; 172 return false; 173 } 174 return true; 175 } 176 177 static inline bool xskq_cons_read_desc(struct xsk_queue *q, 178 struct xdp_desc *desc, 179 struct xdp_umem *umem) 180 { 181 while (q->cached_cons != q->cached_prod) { 182 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 183 u32 idx = q->cached_cons & q->ring_mask; 184 185 *desc = ring->desc[idx]; 186 if (xskq_cons_is_valid_desc(q, desc, umem)) 187 return true; 188 189 q->cached_cons++; 190 } 191 192 return false; 193 } 194 195 /* Functions for consumers */ 196 197 static inline void __xskq_cons_release(struct xsk_queue *q) 198 { 199 smp_mb(); /* D, matches A */ 200 WRITE_ONCE(q->ring->consumer, q->cached_cons); 201 } 202 203 static inline void __xskq_cons_peek(struct xsk_queue *q) 204 { 205 /* Refresh the local pointer */ 206 q->cached_prod = READ_ONCE(q->ring->producer); 207 smp_rmb(); /* C, matches B */ 208 } 209 210 static inline void xskq_cons_get_entries(struct xsk_queue *q) 211 { 212 __xskq_cons_release(q); 213 __xskq_cons_peek(q); 214 } 215 216 static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt) 217 { 218 u32 entries = q->cached_prod - q->cached_cons; 219 220 if (entries >= cnt) 221 return true; 222 223 __xskq_cons_peek(q); 224 entries = q->cached_prod - q->cached_cons; 225 226 return entries >= cnt; 227 } 228 229 static inline bool xskq_cons_peek_addr_unchecked(struct xsk_queue *q, u64 *addr) 230 { 231 if (q->cached_prod == q->cached_cons) 232 xskq_cons_get_entries(q); 233 return xskq_cons_read_addr_unchecked(q, addr); 234 } 235 236 static inline bool xskq_cons_peek_desc(struct xsk_queue *q, 237 struct xdp_desc *desc, 238 struct xdp_umem *umem) 239 { 240 if (q->cached_prod == q->cached_cons) 241 xskq_cons_get_entries(q); 242 return xskq_cons_read_desc(q, desc, umem); 243 } 244 245 static inline void xskq_cons_release(struct xsk_queue *q) 246 { 247 /* To improve performance, only update local state here. 248 * Reflect this to global state when we get new entries 249 * from the ring in xskq_cons_get_entries() and whenever 250 * Rx or Tx processing are completed in the NAPI loop. 251 */ 252 q->cached_cons++; 253 } 254 255 static inline bool xskq_cons_is_full(struct xsk_queue *q) 256 { 257 /* No barriers needed since data is not accessed */ 258 return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) == 259 q->nentries; 260 } 261 262 /* Functions for producers */ 263 264 static inline bool xskq_prod_is_full(struct xsk_queue *q) 265 { 266 u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons); 267 268 if (free_entries) 269 return false; 270 271 /* Refresh the local tail pointer */ 272 q->cached_cons = READ_ONCE(q->ring->consumer); 273 free_entries = q->nentries - (q->cached_prod - q->cached_cons); 274 275 return !free_entries; 276 } 277 278 static inline int xskq_prod_reserve(struct xsk_queue *q) 279 { 280 if (xskq_prod_is_full(q)) 281 return -ENOSPC; 282 283 /* A, matches D */ 284 q->cached_prod++; 285 return 0; 286 } 287 288 static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr) 289 { 290 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 291 292 if (xskq_prod_is_full(q)) 293 return -ENOSPC; 294 295 /* A, matches D */ 296 ring->desc[q->cached_prod++ & q->ring_mask] = addr; 297 return 0; 298 } 299 300 static inline int xskq_prod_reserve_desc(struct xsk_queue *q, 301 u64 addr, u32 len) 302 { 303 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 304 u32 idx; 305 306 if (xskq_prod_is_full(q)) 307 return -ENOSPC; 308 309 /* A, matches D */ 310 idx = q->cached_prod++ & q->ring_mask; 311 ring->desc[idx].addr = addr; 312 ring->desc[idx].len = len; 313 314 return 0; 315 } 316 317 static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx) 318 { 319 smp_wmb(); /* B, matches C */ 320 321 WRITE_ONCE(q->ring->producer, idx); 322 } 323 324 static inline void xskq_prod_submit(struct xsk_queue *q) 325 { 326 __xskq_prod_submit(q, q->cached_prod); 327 } 328 329 static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr) 330 { 331 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 332 u32 idx = q->ring->producer; 333 334 ring->desc[idx++ & q->ring_mask] = addr; 335 336 __xskq_prod_submit(q, idx); 337 } 338 339 static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries) 340 { 341 __xskq_prod_submit(q, q->ring->producer + nb_entries); 342 } 343 344 static inline bool xskq_prod_is_empty(struct xsk_queue *q) 345 { 346 /* No barriers needed since data is not accessed */ 347 return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer); 348 } 349 350 /* For both producers and consumers */ 351 352 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) 353 { 354 return q ? q->invalid_descs : 0; 355 } 356 357 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue); 358 void xskq_destroy(struct xsk_queue *q_ops); 359 360 #endif /* _LINUX_XSK_QUEUE_H */ 361