1 /*
2 * AF_XDP network backend.
3 *
4 * Copyright (c) 2023 Red Hat, Inc.
5 *
6 * Authors:
7 * Ilya Maximets <i.maximets@ovn.org>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
11 */
12
13
14 #include "qemu/osdep.h"
15 #include <bpf/bpf.h>
16 #include <linux/if_link.h>
17 #include <linux/if_xdp.h>
18 #include <net/if.h>
19 #include <xdp/xsk.h>
20
21 #include "clients.h"
22 #include "monitor/monitor.h"
23 #include "net/net.h"
24 #include "qapi/error.h"
25 #include "qemu/cutils.h"
26 #include "qemu/error-report.h"
27 #include "qemu/iov.h"
28 #include "qemu/main-loop.h"
29 #include "qemu/memalign.h"
30
31
32 typedef struct AFXDPState {
33 NetClientState nc;
34
35 struct xsk_socket *xsk;
36 struct xsk_ring_cons rx;
37 struct xsk_ring_prod tx;
38 struct xsk_ring_cons cq;
39 struct xsk_ring_prod fq;
40
41 char ifname[IFNAMSIZ];
42 int ifindex;
43 bool read_poll;
44 bool write_poll;
45 uint32_t outstanding_tx;
46
47 uint64_t *pool;
48 uint32_t n_pool;
49 char *buffer;
50 struct xsk_umem *umem;
51
52 uint32_t n_queues;
53 uint32_t xdp_flags;
54 bool inhibit;
55 } AFXDPState;
56
57 #define AF_XDP_BATCH_SIZE 64
58
59 static void af_xdp_send(void *opaque);
60 static void af_xdp_writable(void *opaque);
61
62 /* Set the event-loop handlers for the af-xdp backend. */
af_xdp_update_fd_handler(AFXDPState * s)63 static void af_xdp_update_fd_handler(AFXDPState *s)
64 {
65 qemu_set_fd_handler(xsk_socket__fd(s->xsk),
66 s->read_poll ? af_xdp_send : NULL,
67 s->write_poll ? af_xdp_writable : NULL,
68 s);
69 }
70
71 /* Update the read handler. */
af_xdp_read_poll(AFXDPState * s,bool enable)72 static void af_xdp_read_poll(AFXDPState *s, bool enable)
73 {
74 if (s->read_poll != enable) {
75 s->read_poll = enable;
76 af_xdp_update_fd_handler(s);
77 }
78 }
79
80 /* Update the write handler. */
af_xdp_write_poll(AFXDPState * s,bool enable)81 static void af_xdp_write_poll(AFXDPState *s, bool enable)
82 {
83 if (s->write_poll != enable) {
84 s->write_poll = enable;
85 af_xdp_update_fd_handler(s);
86 }
87 }
88
af_xdp_poll(NetClientState * nc,bool enable)89 static void af_xdp_poll(NetClientState *nc, bool enable)
90 {
91 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
92
93 if (s->read_poll != enable || s->write_poll != enable) {
94 s->write_poll = enable;
95 s->read_poll = enable;
96 af_xdp_update_fd_handler(s);
97 }
98 }
99
af_xdp_complete_tx(AFXDPState * s)100 static void af_xdp_complete_tx(AFXDPState *s)
101 {
102 uint32_t idx = 0;
103 uint32_t done, i;
104 uint64_t *addr;
105
106 done = xsk_ring_cons__peek(&s->cq, XSK_RING_CONS__DEFAULT_NUM_DESCS, &idx);
107
108 for (i = 0; i < done; i++) {
109 addr = (void *) xsk_ring_cons__comp_addr(&s->cq, idx++);
110 s->pool[s->n_pool++] = *addr;
111 s->outstanding_tx--;
112 }
113
114 if (done) {
115 xsk_ring_cons__release(&s->cq, done);
116 }
117 }
118
119 /*
120 * The fd_write() callback, invoked if the fd is marked as writable
121 * after a poll.
122 */
af_xdp_writable(void * opaque)123 static void af_xdp_writable(void *opaque)
124 {
125 AFXDPState *s = opaque;
126
127 /* Try to recover buffers that are already sent. */
128 af_xdp_complete_tx(s);
129
130 /*
131 * Unregister the handler, unless we still have packets to transmit
132 * and kernel needs a wake up.
133 */
134 if (!s->outstanding_tx || !xsk_ring_prod__needs_wakeup(&s->tx)) {
135 af_xdp_write_poll(s, false);
136 }
137
138 /* Flush any buffered packets. */
139 qemu_flush_queued_packets(&s->nc);
140 }
141
af_xdp_receive(NetClientState * nc,const uint8_t * buf,size_t size)142 static ssize_t af_xdp_receive(NetClientState *nc,
143 const uint8_t *buf, size_t size)
144 {
145 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
146 struct xdp_desc *desc;
147 uint32_t idx;
148 void *data;
149
150 /* Try to recover buffers that are already sent. */
151 af_xdp_complete_tx(s);
152
153 if (size > XSK_UMEM__DEFAULT_FRAME_SIZE) {
154 /* We can't transmit packet this size... */
155 return size;
156 }
157
158 if (!s->n_pool || !xsk_ring_prod__reserve(&s->tx, 1, &idx)) {
159 /*
160 * Out of buffers or space in tx ring. Poll until we can write.
161 * This will also kick the Tx, if it was waiting on CQ.
162 */
163 af_xdp_write_poll(s, true);
164 return 0;
165 }
166
167 desc = xsk_ring_prod__tx_desc(&s->tx, idx);
168 desc->addr = s->pool[--s->n_pool];
169 desc->len = size;
170
171 data = xsk_umem__get_data(s->buffer, desc->addr);
172 memcpy(data, buf, size);
173
174 xsk_ring_prod__submit(&s->tx, 1);
175 s->outstanding_tx++;
176
177 if (xsk_ring_prod__needs_wakeup(&s->tx)) {
178 af_xdp_write_poll(s, true);
179 }
180
181 return size;
182 }
183
184 /*
185 * Complete a previous send (backend --> guest) and enable the
186 * fd_read callback.
187 */
af_xdp_send_completed(NetClientState * nc,ssize_t len)188 static void af_xdp_send_completed(NetClientState *nc, ssize_t len)
189 {
190 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
191
192 af_xdp_read_poll(s, true);
193 }
194
af_xdp_fq_refill(AFXDPState * s,uint32_t n)195 static void af_xdp_fq_refill(AFXDPState *s, uint32_t n)
196 {
197 uint32_t i, idx = 0;
198
199 /* Leave one packet for Tx, just in case. */
200 if (s->n_pool < n + 1) {
201 n = s->n_pool;
202 }
203
204 if (!n || !xsk_ring_prod__reserve(&s->fq, n, &idx)) {
205 return;
206 }
207
208 for (i = 0; i < n; i++) {
209 *xsk_ring_prod__fill_addr(&s->fq, idx++) = s->pool[--s->n_pool];
210 }
211 xsk_ring_prod__submit(&s->fq, n);
212
213 if (xsk_ring_prod__needs_wakeup(&s->fq)) {
214 /* Receive was blocked by not having enough buffers. Wake it up. */
215 af_xdp_read_poll(s, true);
216 }
217 }
218
af_xdp_send(void * opaque)219 static void af_xdp_send(void *opaque)
220 {
221 uint32_t i, n_rx, idx = 0;
222 AFXDPState *s = opaque;
223
224 n_rx = xsk_ring_cons__peek(&s->rx, AF_XDP_BATCH_SIZE, &idx);
225 if (!n_rx) {
226 return;
227 }
228
229 for (i = 0; i < n_rx; i++) {
230 const struct xdp_desc *desc;
231 struct iovec iov;
232
233 desc = xsk_ring_cons__rx_desc(&s->rx, idx++);
234
235 iov.iov_base = xsk_umem__get_data(s->buffer, desc->addr);
236 iov.iov_len = desc->len;
237
238 s->pool[s->n_pool++] = desc->addr;
239
240 if (!qemu_sendv_packet_async(&s->nc, &iov, 1,
241 af_xdp_send_completed)) {
242 /*
243 * The peer does not receive anymore. Packet is queued, stop
244 * reading from the backend until af_xdp_send_completed().
245 */
246 af_xdp_read_poll(s, false);
247
248 /* Return unused descriptors to not break the ring cache. */
249 xsk_ring_cons__cancel(&s->rx, n_rx - i - 1);
250 n_rx = i + 1;
251 break;
252 }
253 }
254
255 /* Release actually sent descriptors and try to re-fill. */
256 xsk_ring_cons__release(&s->rx, n_rx);
257 af_xdp_fq_refill(s, AF_XDP_BATCH_SIZE);
258 }
259
260 /* Flush and close. */
af_xdp_cleanup(NetClientState * nc)261 static void af_xdp_cleanup(NetClientState *nc)
262 {
263 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
264
265 qemu_purge_queued_packets(nc);
266
267 af_xdp_poll(nc, false);
268
269 xsk_socket__delete(s->xsk);
270 s->xsk = NULL;
271 g_free(s->pool);
272 s->pool = NULL;
273 xsk_umem__delete(s->umem);
274 s->umem = NULL;
275 qemu_vfree(s->buffer);
276 s->buffer = NULL;
277
278 /* Remove the program if it's the last open queue. */
279 if (!s->inhibit && nc->queue_index == s->n_queues - 1 && s->xdp_flags
280 && bpf_xdp_detach(s->ifindex, s->xdp_flags, NULL) != 0) {
281 fprintf(stderr,
282 "af-xdp: unable to remove XDP program from '%s', ifindex: %d\n",
283 s->ifname, s->ifindex);
284 }
285 }
286
af_xdp_umem_create(AFXDPState * s,int sock_fd,Error ** errp)287 static int af_xdp_umem_create(AFXDPState *s, int sock_fd, Error **errp)
288 {
289 struct xsk_umem_config config = {
290 .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
291 .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
292 .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
293 .frame_headroom = 0,
294 };
295 uint64_t n_descs;
296 uint64_t size;
297 int64_t i;
298 int ret;
299
300 /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */
301 n_descs = (XSK_RING_PROD__DEFAULT_NUM_DESCS
302 + XSK_RING_CONS__DEFAULT_NUM_DESCS) * 2;
303 size = n_descs * XSK_UMEM__DEFAULT_FRAME_SIZE;
304
305 s->buffer = qemu_memalign(qemu_real_host_page_size(), size);
306 memset(s->buffer, 0, size);
307
308 if (sock_fd < 0) {
309 ret = xsk_umem__create(&s->umem, s->buffer, size,
310 &s->fq, &s->cq, &config);
311 } else {
312 ret = xsk_umem__create_with_fd(&s->umem, sock_fd, s->buffer, size,
313 &s->fq, &s->cq, &config);
314 }
315
316 if (ret) {
317 qemu_vfree(s->buffer);
318 error_setg_errno(errp, errno,
319 "failed to create umem for %s queue_index: %d",
320 s->ifname, s->nc.queue_index);
321 return -1;
322 }
323
324 s->pool = g_new(uint64_t, n_descs);
325 /* Fill the pool in the opposite order, because it's a LIFO queue. */
326 for (i = n_descs; i >= 0; i--) {
327 s->pool[i] = i * XSK_UMEM__DEFAULT_FRAME_SIZE;
328 }
329 s->n_pool = n_descs;
330
331 af_xdp_fq_refill(s, XSK_RING_PROD__DEFAULT_NUM_DESCS);
332
333 return 0;
334 }
335
af_xdp_socket_create(AFXDPState * s,const NetdevAFXDPOptions * opts,Error ** errp)336 static int af_xdp_socket_create(AFXDPState *s,
337 const NetdevAFXDPOptions *opts, Error **errp)
338 {
339 struct xsk_socket_config cfg = {
340 .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
341 .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
342 .libxdp_flags = 0,
343 .bind_flags = XDP_USE_NEED_WAKEUP,
344 .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST,
345 };
346 int queue_id, error = 0;
347
348 s->inhibit = opts->has_inhibit && opts->inhibit;
349 if (s->inhibit) {
350 cfg.libxdp_flags |= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD;
351 }
352
353 if (opts->has_force_copy && opts->force_copy) {
354 cfg.bind_flags |= XDP_COPY;
355 }
356
357 queue_id = s->nc.queue_index;
358 if (opts->has_start_queue && opts->start_queue > 0) {
359 queue_id += opts->start_queue;
360 }
361
362 if (opts->has_mode) {
363 /* Specific mode requested. */
364 cfg.xdp_flags |= (opts->mode == AFXDP_MODE_NATIVE)
365 ? XDP_FLAGS_DRV_MODE : XDP_FLAGS_SKB_MODE;
366 if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
367 s->umem, &s->rx, &s->tx, &cfg)) {
368 error = errno;
369 }
370 } else {
371 /* No mode requested, try native first. */
372 cfg.xdp_flags |= XDP_FLAGS_DRV_MODE;
373
374 if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
375 s->umem, &s->rx, &s->tx, &cfg)) {
376 /* Can't use native mode, try skb. */
377 cfg.xdp_flags &= ~XDP_FLAGS_DRV_MODE;
378 cfg.xdp_flags |= XDP_FLAGS_SKB_MODE;
379
380 if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
381 s->umem, &s->rx, &s->tx, &cfg)) {
382 error = errno;
383 }
384 }
385 }
386
387 if (error) {
388 error_setg_errno(errp, error,
389 "failed to create AF_XDP socket for %s queue_id: %d",
390 s->ifname, queue_id);
391 return -1;
392 }
393
394 s->xdp_flags = cfg.xdp_flags;
395
396 return 0;
397 }
398
399 /* NetClientInfo methods. */
400 static NetClientInfo net_af_xdp_info = {
401 .type = NET_CLIENT_DRIVER_AF_XDP,
402 .size = sizeof(AFXDPState),
403 .receive = af_xdp_receive,
404 .poll = af_xdp_poll,
405 .cleanup = af_xdp_cleanup,
406 };
407
parse_socket_fds(const char * sock_fds_str,int64_t n_expected,Error ** errp)408 static int *parse_socket_fds(const char *sock_fds_str,
409 int64_t n_expected, Error **errp)
410 {
411 gchar **substrings = g_strsplit(sock_fds_str, ":", -1);
412 int64_t i, n_sock_fds = g_strv_length(substrings);
413 int *sock_fds = NULL;
414
415 if (n_sock_fds != n_expected) {
416 error_setg(errp, "expected %"PRIi64" socket fds, got %"PRIi64,
417 n_expected, n_sock_fds);
418 goto exit;
419 }
420
421 sock_fds = g_new(int, n_sock_fds);
422
423 for (i = 0; i < n_sock_fds; i++) {
424 sock_fds[i] = monitor_fd_param(monitor_cur(), substrings[i], errp);
425 if (sock_fds[i] < 0) {
426 g_free(sock_fds);
427 sock_fds = NULL;
428 goto exit;
429 }
430 }
431
432 exit:
433 g_strfreev(substrings);
434 return sock_fds;
435 }
436
437 /*
438 * The exported init function.
439 *
440 * ... -netdev af-xdp,ifname="..."
441 */
net_init_af_xdp(const Netdev * netdev,const char * name,NetClientState * peer,Error ** errp)442 int net_init_af_xdp(const Netdev *netdev,
443 const char *name, NetClientState *peer, Error **errp)
444 {
445 const NetdevAFXDPOptions *opts = &netdev->u.af_xdp;
446 NetClientState *nc, *nc0 = NULL;
447 unsigned int ifindex;
448 uint32_t prog_id = 0;
449 g_autofree int *sock_fds = NULL;
450 int64_t i, queues;
451 Error *err = NULL;
452 AFXDPState *s;
453
454 ifindex = if_nametoindex(opts->ifname);
455 if (!ifindex) {
456 error_setg_errno(errp, errno, "failed to get ifindex for '%s'",
457 opts->ifname);
458 return -1;
459 }
460
461 queues = opts->has_queues ? opts->queues : 1;
462 if (queues < 1) {
463 error_setg(errp, "invalid number of queues (%" PRIi64 ") for '%s'",
464 queues, opts->ifname);
465 return -1;
466 }
467
468 if ((opts->has_inhibit && opts->inhibit) != !!opts->sock_fds) {
469 error_setg(errp, "'inhibit=on' requires 'sock-fds' and vice versa");
470 return -1;
471 }
472
473 if (opts->sock_fds) {
474 sock_fds = parse_socket_fds(opts->sock_fds, queues, errp);
475 if (!sock_fds) {
476 return -1;
477 }
478 }
479
480 for (i = 0; i < queues; i++) {
481 nc = qemu_new_net_client(&net_af_xdp_info, peer, "af-xdp", name);
482 qemu_set_info_str(nc, "af-xdp%"PRIi64" to %s", i, opts->ifname);
483 nc->queue_index = i;
484
485 if (!nc0) {
486 nc0 = nc;
487 }
488
489 s = DO_UPCAST(AFXDPState, nc, nc);
490
491 pstrcpy(s->ifname, sizeof(s->ifname), opts->ifname);
492 s->ifindex = ifindex;
493 s->n_queues = queues;
494
495 if (af_xdp_umem_create(s, sock_fds ? sock_fds[i] : -1, errp)
496 || af_xdp_socket_create(s, opts, errp)) {
497 /* Make sure the XDP program will be removed. */
498 s->n_queues = i;
499 error_propagate(errp, err);
500 goto err;
501 }
502 }
503
504 if (nc0) {
505 s = DO_UPCAST(AFXDPState, nc, nc0);
506 if (bpf_xdp_query_id(s->ifindex, s->xdp_flags, &prog_id) || !prog_id) {
507 error_setg_errno(errp, errno,
508 "no XDP program loaded on '%s', ifindex: %d",
509 s->ifname, s->ifindex);
510 goto err;
511 }
512 }
513
514 af_xdp_read_poll(s, true); /* Initially only poll for reads. */
515
516 return 0;
517
518 err:
519 if (nc0) {
520 qemu_del_net_client(nc0);
521 }
522
523 return -1;
524 }
525