1 /* SPDX-License-Identifier: GPL-2.0 */ 2 /* XDP user-space ring structure 3 * Copyright(c) 2018 Intel Corporation. 4 */ 5 6 #ifndef _LINUX_XSK_QUEUE_H 7 #define _LINUX_XSK_QUEUE_H 8 9 #include <linux/types.h> 10 #include <linux/if_xdp.h> 11 #include <net/xdp_sock.h> 12 13 #define RX_BATCH_SIZE 16 14 #define LAZY_UPDATE_THRESHOLD 128 15 16 struct xdp_ring { 17 u32 producer ____cacheline_aligned_in_smp; 18 u32 consumer ____cacheline_aligned_in_smp; 19 }; 20 21 /* Used for the RX and TX queues for packets */ 22 struct xdp_rxtx_ring { 23 struct xdp_ring ptrs; 24 struct xdp_desc desc[0] ____cacheline_aligned_in_smp; 25 }; 26 27 /* Used for the fill and completion queues for buffers */ 28 struct xdp_umem_ring { 29 struct xdp_ring ptrs; 30 u64 desc[0] ____cacheline_aligned_in_smp; 31 }; 32 33 struct xsk_queue { 34 struct xdp_umem_props umem_props; 35 u32 ring_mask; 36 u32 nentries; 37 u32 prod_head; 38 u32 prod_tail; 39 u32 cons_head; 40 u32 cons_tail; 41 struct xdp_ring *ring; 42 u64 invalid_descs; 43 }; 44 45 /* Common functions operating for both RXTX and umem queues */ 46 47 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) 48 { 49 return q ? q->invalid_descs : 0; 50 } 51 52 static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) 53 { 54 u32 entries = q->prod_tail - q->cons_tail; 55 56 if (entries == 0) { 57 /* Refresh the local pointer */ 58 q->prod_tail = READ_ONCE(q->ring->producer); 59 entries = q->prod_tail - q->cons_tail; 60 } 61 62 return (entries > dcnt) ? dcnt : entries; 63 } 64 65 static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) 66 { 67 u32 free_entries = q->nentries - (producer - q->cons_tail); 68 69 if (free_entries >= dcnt) 70 return free_entries; 71 72 /* Refresh the local tail pointer */ 73 q->cons_tail = READ_ONCE(q->ring->consumer); 74 return q->nentries - (producer - q->cons_tail); 75 } 76 77 /* UMEM queue */ 78 79 static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) 80 { 81 if (addr >= q->umem_props.size) { 82 q->invalid_descs++; 83 return false; 84 } 85 86 return true; 87 } 88 89 static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr) 90 { 91 while (q->cons_tail != q->cons_head) { 92 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 93 unsigned int idx = q->cons_tail & q->ring_mask; 94 95 *addr = READ_ONCE(ring->desc[idx]) & q->umem_props.chunk_mask; 96 if (xskq_is_valid_addr(q, *addr)) 97 return addr; 98 99 q->cons_tail++; 100 } 101 102 return NULL; 103 } 104 105 static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr) 106 { 107 if (q->cons_tail == q->cons_head) { 108 WRITE_ONCE(q->ring->consumer, q->cons_tail); 109 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 110 111 /* Order consumer and data */ 112 smp_rmb(); 113 } 114 115 return xskq_validate_addr(q, addr); 116 } 117 118 static inline void xskq_discard_addr(struct xsk_queue *q) 119 { 120 q->cons_tail++; 121 } 122 123 static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) 124 { 125 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 126 127 if (xskq_nb_free(q, q->prod_tail, 1) == 0) 128 return -ENOSPC; 129 130 ring->desc[q->prod_tail++ & q->ring_mask] = addr; 131 132 /* Order producer and data */ 133 smp_wmb(); 134 135 WRITE_ONCE(q->ring->producer, q->prod_tail); 136 return 0; 137 } 138 139 static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) 140 { 141 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; 142 143 if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) 144 return -ENOSPC; 145 146 ring->desc[q->prod_head++ & q->ring_mask] = addr; 147 return 0; 148 } 149 150 static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, 151 u32 nb_entries) 152 { 153 /* Order producer and data */ 154 smp_wmb(); 155 156 q->prod_tail += nb_entries; 157 WRITE_ONCE(q->ring->producer, q->prod_tail); 158 } 159 160 static inline int xskq_reserve_addr(struct xsk_queue *q) 161 { 162 if (xskq_nb_free(q, q->prod_head, 1) == 0) 163 return -ENOSPC; 164 165 q->prod_head++; 166 return 0; 167 } 168 169 /* Rx/Tx queue */ 170 171 static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d) 172 { 173 if (!xskq_is_valid_addr(q, d->addr)) 174 return false; 175 176 if (((d->addr + d->len) & q->umem_props.chunk_mask) != 177 (d->addr & q->umem_props.chunk_mask)) { 178 q->invalid_descs++; 179 return false; 180 } 181 182 return true; 183 } 184 185 static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q, 186 struct xdp_desc *desc) 187 { 188 while (q->cons_tail != q->cons_head) { 189 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 190 unsigned int idx = q->cons_tail & q->ring_mask; 191 192 *desc = READ_ONCE(ring->desc[idx]); 193 if (xskq_is_valid_desc(q, desc)) 194 return desc; 195 196 q->cons_tail++; 197 } 198 199 return NULL; 200 } 201 202 static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, 203 struct xdp_desc *desc) 204 { 205 if (q->cons_tail == q->cons_head) { 206 WRITE_ONCE(q->ring->consumer, q->cons_tail); 207 q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); 208 209 /* Order consumer and data */ 210 smp_rmb(); 211 } 212 213 return xskq_validate_desc(q, desc); 214 } 215 216 static inline void xskq_discard_desc(struct xsk_queue *q) 217 { 218 q->cons_tail++; 219 } 220 221 static inline int xskq_produce_batch_desc(struct xsk_queue *q, 222 u64 addr, u32 len) 223 { 224 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; 225 unsigned int idx; 226 227 if (xskq_nb_free(q, q->prod_head, 1) == 0) 228 return -ENOSPC; 229 230 idx = (q->prod_head++) & q->ring_mask; 231 ring->desc[idx].addr = addr; 232 ring->desc[idx].len = len; 233 234 return 0; 235 } 236 237 static inline void xskq_produce_flush_desc(struct xsk_queue *q) 238 { 239 /* Order producer and data */ 240 smp_wmb(); 241 242 q->prod_tail = q->prod_head, 243 WRITE_ONCE(q->ring->producer, q->prod_tail); 244 } 245 246 static inline bool xskq_full_desc(struct xsk_queue *q) 247 { 248 return xskq_nb_avail(q, q->nentries) == q->nentries; 249 } 250 251 static inline bool xskq_empty_desc(struct xsk_queue *q) 252 { 253 return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries; 254 } 255 256 void xskq_set_umem(struct xsk_queue *q, struct xdp_umem_props *umem_props); 257 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue); 258 void xskq_destroy(struct xsk_queue *q_ops); 259 260 #endif /* _LINUX_XSK_QUEUE_H */ 261