1 /* SPDX-License-Identifier: GPL-2.0 */ 2 /* XDP user-space ring structure 3 * Copyright(c) 2018 Intel Corporation. 4 */ 5 6 #ifndef _LINUX_XSK_QUEUE_H 7 #define _LINUX_XSK_QUEUE_H 8 9 #include <linux/types.h> 10 #include <linux/if_xdp.h> 11 #include <net/xdp_sock.h> 12 13 #define RX_BATCH_SIZE 16 14 #define LAZY_UPDATE_THRESHOLD 128 15 16 struct xdp_ring { 17 u32 producer ____cacheline_aligned_in_smp; 18 u32 consumer ____cacheline_aligned_in_smp; 19 }; 20 21 /* Used for the RX and TX queues for packets */ 22 struct xdp_rxtx_ring { 23 struct xdp_ring ptrs; 24 struct xdp_desc desc[0] ____cacheline_aligned_in_smp; 25 }; 26 27 /* Used for the fill and completion queues for buffers */ 28 struct xdp_umem_ring { 29 struct xdp_ring ptrs; 30 u64 desc[0] ____cacheline_aligned_in_smp; 31 }; 32 33 struct xsk_queue { 34 struct xdp_umem_props umem_props; 35 u32 ring_mask; 36 u32 nentries; 37 u32 prod_head; 38 u32 prod_tail; 39 u32 cons_head; 40 u32 cons_tail; 41 struct xdp_ring *ring; 42 u64 invalid_descs; 43 }; 44 45 /* Common functions operating for both RXTX and umem queues */ 46 47 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) 48 { 49 return q ? q->invalid_descs : 0; 50 } 51 52 static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) 53 { 54 u32 entries = q->prod_tail - q->cons_tail; 55 56 if (entries == 0) { 57 /* Refresh the local pointer */ 58 q->prod_tail = READ_ONCE(q->ring->producer); 59 entries = q->prod_tail - q->cons_tail; 60 } 61 62 return (entries > dcnt) ? dcnt : entries; 63 } 64 65 static inline u32 xskq_nb_free_lazy(struct xsk_queue *q, u32 producer) 66 { 67 return q->nentries - (producer - q->cons_tail); 68 } 69 70 static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) 71 { 72 u32 free_entries = xskq_nb_free_lazy(q, producer); 73 74 if (free_entries >= dcnt) 75 return free_entries; 76 77 /* Refresh the local tail pointer */ 78 q->cons_tail = READ_ONCE(q->ring->consumer); 79 return q->nentries - (producer - q->cons_tail); 80 } 81 82 /* UMEM queue */ 83 84 static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) 85 { 86 if (addr >= q->umem_props.size) { 87 q->invalid_descs++; 88 return false; 89 } 90 91 return true; 92 } 93 94 static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr) 95 { 96 while (q->cons_tail != q->cons_head) { 97 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 98 unsigned int idx = q->cons_tail & q->ring_mask; 99 100 *addr = READ_ONCE(ring->desc[idx]) & q->umem_props.chunk_mask; 101 if (xskq_is_valid_addr(q, *addr)) 102 return addr; 103 104 q->cons_tail++; 105 } 106 107 return NULL; 108 } 109 110 static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr) 111 { 112 if (q->cons_tail == q->cons_head) { 113 WRITE_ONCE(q->ring->consumer, q->cons_tail); 114 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 115 116 /* Order consumer and data */ 117 smp_rmb(); 118 } 119 120 return xskq_validate_addr(q, addr); 121 } 122 123 static inline void xskq_discard_addr(struct xsk_queue *q) 124 { 125 q->cons_tail++; 126 } 127 128 static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) 129 { 130 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 131 132 if (xskq_nb_free(q, q->prod_tail, LAZY_UPDATE_THRESHOLD) == 0) 133 return -ENOSPC; 134 135 ring->desc[q->prod_tail++ & q->ring_mask] = addr; 136 137 /* Order producer and data */ 138 smp_wmb(); 139 140 WRITE_ONCE(q->ring->producer, q->prod_tail); 141 return 0; 142 } 143 144 static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) 145 { 146 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 147 148 if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) 149 return -ENOSPC; 150 151 ring->desc[q->prod_head++ & q->ring_mask] = addr; 152 return 0; 153 } 154 155 static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, 156 u32 nb_entries) 157 { 158 /* Order producer and data */ 159 smp_wmb(); 160 161 q->prod_tail += nb_entries; 162 WRITE_ONCE(q->ring->producer, q->prod_tail); 163 } 164 165 static inline int xskq_reserve_addr(struct xsk_queue *q) 166 { 167 if (xskq_nb_free(q, q->prod_head, 1) == 0) 168 return -ENOSPC; 169 170 q->prod_head++; 171 return 0; 172 } 173 174 /* Rx/Tx queue */ 175 176 static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d) 177 { 178 if (!xskq_is_valid_addr(q, d->addr)) 179 return false; 180 181 if (((d->addr + d->len) & q->umem_props.chunk_mask) != 182 (d->addr & q->umem_props.chunk_mask)) { 183 q->invalid_descs++; 184 return false; 185 } 186 187 return true; 188 } 189 190 static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q, 191 struct xdp_desc *desc) 192 { 193 while (q->cons_tail != q->cons_head) { 194 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 195 unsigned int idx = q->cons_tail & q->ring_mask; 196 197 *desc = READ_ONCE(ring->desc[idx]); 198 if (xskq_is_valid_desc(q, desc)) 199 return desc; 200 201 q->cons_tail++; 202 } 203 204 return NULL; 205 } 206 207 static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, 208 struct xdp_desc *desc) 209 { 210 if (q->cons_tail == q->cons_head) { 211 WRITE_ONCE(q->ring->consumer, q->cons_tail); 212 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 213 214 /* Order consumer and data */ 215 smp_rmb(); 216 } 217 218 return xskq_validate_desc(q, desc); 219 } 220 221 static inline void xskq_discard_desc(struct xsk_queue *q) 222 { 223 q->cons_tail++; 224 } 225 226 static inline int xskq_produce_batch_desc(struct xsk_queue *q, 227 u64 addr, u32 len) 228 { 229 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 230 unsigned int idx; 231 232 if (xskq_nb_free(q, q->prod_head, 1) == 0) 233 return -ENOSPC; 234 235 idx = (q->prod_head++) & q->ring_mask; 236 ring->desc[idx].addr = addr; 237 ring->desc[idx].len = len; 238 239 return 0; 240 } 241 242 static inline void xskq_produce_flush_desc(struct xsk_queue *q) 243 { 244 /* Order producer and data */ 245 smp_wmb(); 246 247 q->prod_tail = q->prod_head, 248 WRITE_ONCE(q->ring->producer, q->prod_tail); 249 } 250 251 static inline bool xskq_full_desc(struct xsk_queue *q) 252 { 253 return xskq_nb_avail(q, q->nentries) == q->nentries; 254 } 255 256 static inline bool xskq_empty_desc(struct xsk_queue *q) 257 { 258 return xskq_nb_free(q, q->prod_tail, 1) == q->nentries; 259 } 260 261 void xskq_set_umem(struct xsk_queue *q, struct xdp_umem_props *umem_props); 262 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue); 263 void xskq_destroy(struct xsk_queue *q_ops); 264 265 #endif /* _LINUX_XSK_QUEUE_H */ 266