1 /*
2 * AF_XDP network backend.
3 *
4 * Copyright (c) 2023 Red Hat, Inc.
5 *
6 * Authors:
7 * Ilya Maximets <i.maximets@ovn.org>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
11 */
12
13
14 #include "qemu/osdep.h"
15 #include <bpf/bpf.h>
16 #include <linux/if_link.h>
17 #include <linux/if_xdp.h>
18 #include <net/if.h>
19 #include <xdp/xsk.h>
20
21 #include "clients.h"
22 #include "monitor/monitor.h"
23 #include "net/net.h"
24 #include "qapi/error.h"
25 #include "qemu/cutils.h"
26 #include "qemu/error-report.h"
27 #include "qemu/iov.h"
28 #include "qemu/main-loop.h"
29 #include "qemu/memalign.h"
30
31
32 typedef struct AFXDPState {
33 NetClientState nc;
34
35 struct xsk_socket *xsk;
36 struct xsk_ring_cons rx;
37 struct xsk_ring_prod tx;
38 struct xsk_ring_cons cq;
39 struct xsk_ring_prod fq;
40
41 char ifname[IFNAMSIZ];
42 int ifindex;
43 bool read_poll;
44 bool write_poll;
45 uint32_t outstanding_tx;
46
47 uint64_t *pool;
48 uint32_t n_pool;
49 char *buffer;
50 struct xsk_umem *umem;
51
52 uint32_t xdp_flags;
53 bool inhibit;
54
55 char *map_path;
56 int map_fd;
57 uint32_t map_start_index;
58 } AFXDPState;
59
60 #define AF_XDP_BATCH_SIZE 64
61
62 static void af_xdp_send(void *opaque);
63 static void af_xdp_writable(void *opaque);
64
65 /* Set the event-loop handlers for the af-xdp backend. */
af_xdp_update_fd_handler(AFXDPState * s)66 static void af_xdp_update_fd_handler(AFXDPState *s)
67 {
68 qemu_set_fd_handler(xsk_socket__fd(s->xsk),
69 s->read_poll ? af_xdp_send : NULL,
70 s->write_poll ? af_xdp_writable : NULL,
71 s);
72 }
73
74 /* Update the read handler. */
af_xdp_read_poll(AFXDPState * s,bool enable)75 static void af_xdp_read_poll(AFXDPState *s, bool enable)
76 {
77 if (s->read_poll != enable) {
78 s->read_poll = enable;
79 af_xdp_update_fd_handler(s);
80 }
81 }
82
83 /* Update the write handler. */
af_xdp_write_poll(AFXDPState * s,bool enable)84 static void af_xdp_write_poll(AFXDPState *s, bool enable)
85 {
86 if (s->write_poll != enable) {
87 s->write_poll = enable;
88 af_xdp_update_fd_handler(s);
89 }
90 }
91
af_xdp_poll(NetClientState * nc,bool enable)92 static void af_xdp_poll(NetClientState *nc, bool enable)
93 {
94 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
95
96 if (s->read_poll != enable || s->write_poll != enable) {
97 s->write_poll = enable;
98 s->read_poll = enable;
99 af_xdp_update_fd_handler(s);
100 }
101 }
102
af_xdp_complete_tx(AFXDPState * s)103 static void af_xdp_complete_tx(AFXDPState *s)
104 {
105 uint32_t idx = 0;
106 uint32_t done, i;
107 uint64_t *addr;
108
109 done = xsk_ring_cons__peek(&s->cq, XSK_RING_CONS__DEFAULT_NUM_DESCS, &idx);
110
111 for (i = 0; i < done; i++) {
112 addr = (void *) xsk_ring_cons__comp_addr(&s->cq, idx++);
113 s->pool[s->n_pool++] = *addr;
114 s->outstanding_tx--;
115 }
116
117 if (done) {
118 xsk_ring_cons__release(&s->cq, done);
119 }
120 }
121
122 /*
123 * The fd_write() callback, invoked if the fd is marked as writable
124 * after a poll.
125 */
af_xdp_writable(void * opaque)126 static void af_xdp_writable(void *opaque)
127 {
128 AFXDPState *s = opaque;
129
130 /* Try to recover buffers that are already sent. */
131 af_xdp_complete_tx(s);
132
133 /*
134 * Unregister the handler, unless we still have packets to transmit
135 * and kernel needs a wake up.
136 */
137 if (!s->outstanding_tx || !xsk_ring_prod__needs_wakeup(&s->tx)) {
138 af_xdp_write_poll(s, false);
139 }
140
141 /* Flush any buffered packets. */
142 qemu_flush_queued_packets(&s->nc);
143 }
144
af_xdp_receive(NetClientState * nc,const uint8_t * buf,size_t size)145 static ssize_t af_xdp_receive(NetClientState *nc,
146 const uint8_t *buf, size_t size)
147 {
148 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
149 struct xdp_desc *desc;
150 uint32_t idx;
151 void *data;
152
153 /* Try to recover buffers that are already sent. */
154 af_xdp_complete_tx(s);
155
156 if (size > XSK_UMEM__DEFAULT_FRAME_SIZE) {
157 /* We can't transmit packet this size... */
158 return size;
159 }
160
161 if (!s->n_pool || !xsk_ring_prod__reserve(&s->tx, 1, &idx)) {
162 /*
163 * Out of buffers or space in tx ring. Poll until we can write.
164 * This will also kick the Tx, if it was waiting on CQ.
165 */
166 af_xdp_write_poll(s, true);
167 return 0;
168 }
169
170 desc = xsk_ring_prod__tx_desc(&s->tx, idx);
171 desc->addr = s->pool[--s->n_pool];
172 desc->len = size;
173
174 data = xsk_umem__get_data(s->buffer, desc->addr);
175 memcpy(data, buf, size);
176
177 xsk_ring_prod__submit(&s->tx, 1);
178 s->outstanding_tx++;
179
180 if (xsk_ring_prod__needs_wakeup(&s->tx)) {
181 af_xdp_write_poll(s, true);
182 }
183
184 return size;
185 }
186
187 /*
188 * Complete a previous send (backend --> guest) and enable the
189 * fd_read callback.
190 */
af_xdp_send_completed(NetClientState * nc,ssize_t len)191 static void af_xdp_send_completed(NetClientState *nc, ssize_t len)
192 {
193 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
194
195 af_xdp_read_poll(s, true);
196 }
197
af_xdp_fq_refill(AFXDPState * s,uint32_t n)198 static void af_xdp_fq_refill(AFXDPState *s, uint32_t n)
199 {
200 uint32_t i, idx = 0;
201
202 /* Leave one packet for Tx, just in case. */
203 if (s->n_pool < n + 1) {
204 n = s->n_pool;
205 }
206
207 if (!n || !xsk_ring_prod__reserve(&s->fq, n, &idx)) {
208 return;
209 }
210
211 for (i = 0; i < n; i++) {
212 *xsk_ring_prod__fill_addr(&s->fq, idx++) = s->pool[--s->n_pool];
213 }
214 xsk_ring_prod__submit(&s->fq, n);
215
216 if (xsk_ring_prod__needs_wakeup(&s->fq)) {
217 /* Receive was blocked by not having enough buffers. Wake it up. */
218 af_xdp_read_poll(s, true);
219 }
220 }
221
af_xdp_send(void * opaque)222 static void af_xdp_send(void *opaque)
223 {
224 uint32_t i, n_rx, idx = 0;
225 AFXDPState *s = opaque;
226
227 n_rx = xsk_ring_cons__peek(&s->rx, AF_XDP_BATCH_SIZE, &idx);
228 if (!n_rx) {
229 return;
230 }
231
232 for (i = 0; i < n_rx; i++) {
233 const struct xdp_desc *desc;
234 struct iovec iov;
235
236 desc = xsk_ring_cons__rx_desc(&s->rx, idx++);
237
238 iov.iov_base = xsk_umem__get_data(s->buffer, desc->addr);
239 iov.iov_len = desc->len;
240
241 s->pool[s->n_pool++] = desc->addr;
242
243 if (!qemu_sendv_packet_async(&s->nc, &iov, 1,
244 af_xdp_send_completed)) {
245 /*
246 * The peer does not receive anymore. Packet is queued, stop
247 * reading from the backend until af_xdp_send_completed().
248 */
249 af_xdp_read_poll(s, false);
250
251 /* Return unused descriptors to not break the ring cache. */
252 xsk_ring_cons__cancel(&s->rx, n_rx - i - 1);
253 n_rx = i + 1;
254 break;
255 }
256 }
257
258 /* Release actually sent descriptors and try to re-fill. */
259 xsk_ring_cons__release(&s->rx, n_rx);
260 af_xdp_fq_refill(s, AF_XDP_BATCH_SIZE);
261 }
262
263 /* Flush and close. */
af_xdp_cleanup(NetClientState * nc)264 static void af_xdp_cleanup(NetClientState *nc)
265 {
266 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
267 int idx;
268
269 qemu_purge_queued_packets(nc);
270
271 af_xdp_poll(nc, false);
272
273 xsk_socket__delete(s->xsk);
274 s->xsk = NULL;
275 g_free(s->pool);
276 s->pool = NULL;
277 xsk_umem__delete(s->umem);
278 s->umem = NULL;
279 qemu_vfree(s->buffer);
280 s->buffer = NULL;
281
282 if (s->map_fd >= 0) {
283 idx = nc->queue_index + s->map_start_index;
284 if (bpf_map_delete_elem(s->map_fd, &idx)) {
285 fprintf(stderr, "af-xdp: unable to remove AF_XDP socket from map"
286 " %s\n", s->map_path);
287 }
288 close(s->map_fd);
289 s->map_fd = -1;
290 }
291 g_free(s->map_path);
292 s->map_path = NULL;
293 }
294
af_xdp_umem_create(AFXDPState * s,int sock_fd,Error ** errp)295 static int af_xdp_umem_create(AFXDPState *s, int sock_fd, Error **errp)
296 {
297 struct xsk_umem_config config = {
298 .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
299 .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
300 .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
301 .frame_headroom = 0,
302 };
303 uint64_t n_descs;
304 uint64_t size;
305 int64_t i;
306 int ret;
307
308 /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */
309 n_descs = (XSK_RING_PROD__DEFAULT_NUM_DESCS
310 + XSK_RING_CONS__DEFAULT_NUM_DESCS) * 2;
311 size = n_descs * XSK_UMEM__DEFAULT_FRAME_SIZE;
312
313 s->buffer = qemu_memalign(qemu_real_host_page_size(), size);
314 memset(s->buffer, 0, size);
315
316 if (sock_fd < 0) {
317 ret = xsk_umem__create(&s->umem, s->buffer, size,
318 &s->fq, &s->cq, &config);
319 } else {
320 ret = xsk_umem__create_with_fd(&s->umem, sock_fd, s->buffer, size,
321 &s->fq, &s->cq, &config);
322 }
323
324 if (ret) {
325 qemu_vfree(s->buffer);
326 error_setg_errno(errp, errno,
327 "failed to create umem for %s queue_index: %d",
328 s->ifname, s->nc.queue_index);
329 return -1;
330 }
331
332 s->pool = g_new(uint64_t, n_descs);
333 /* Fill the pool in the opposite order, because it's a LIFO queue. */
334 for (i = n_descs - 1; i >= 0; i--) {
335 s->pool[i] = i * XSK_UMEM__DEFAULT_FRAME_SIZE;
336 }
337 s->n_pool = n_descs;
338
339 af_xdp_fq_refill(s, XSK_RING_PROD__DEFAULT_NUM_DESCS);
340
341 return 0;
342 }
343
af_xdp_socket_create(AFXDPState * s,const NetdevAFXDPOptions * opts,Error ** errp)344 static int af_xdp_socket_create(AFXDPState *s,
345 const NetdevAFXDPOptions *opts, Error **errp)
346 {
347 struct xsk_socket_config cfg = {
348 .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
349 .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
350 .libxdp_flags = 0,
351 .bind_flags = XDP_USE_NEED_WAKEUP,
352 .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST,
353 };
354 int queue_id, error = 0;
355
356 if (s->inhibit) {
357 cfg.libxdp_flags |= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD;
358 }
359
360 if (opts->has_force_copy && opts->force_copy) {
361 cfg.bind_flags |= XDP_COPY;
362 }
363
364 queue_id = s->nc.queue_index;
365 if (opts->has_start_queue && opts->start_queue > 0) {
366 queue_id += opts->start_queue;
367 }
368
369 if (opts->has_mode) {
370 /* Specific mode requested. */
371 cfg.xdp_flags |= (opts->mode == AFXDP_MODE_NATIVE)
372 ? XDP_FLAGS_DRV_MODE : XDP_FLAGS_SKB_MODE;
373 if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
374 s->umem, &s->rx, &s->tx, &cfg)) {
375 error = errno;
376 }
377 } else {
378 /* No mode requested, try native first. */
379 cfg.xdp_flags |= XDP_FLAGS_DRV_MODE;
380
381 if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
382 s->umem, &s->rx, &s->tx, &cfg)) {
383 /* Can't use native mode, try skb. */
384 cfg.xdp_flags &= ~XDP_FLAGS_DRV_MODE;
385 cfg.xdp_flags |= XDP_FLAGS_SKB_MODE;
386
387 if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
388 s->umem, &s->rx, &s->tx, &cfg)) {
389 error = errno;
390 }
391 }
392 }
393
394 if (error) {
395 error_setg_errno(errp, error,
396 "failed to create AF_XDP socket for %s queue_id: %d",
397 s->ifname, queue_id);
398 return -1;
399 }
400
401 s->xdp_flags = cfg.xdp_flags;
402
403 return 0;
404 }
405
af_xdp_update_xsk_map(AFXDPState * s,Error ** errp)406 static int af_xdp_update_xsk_map(AFXDPState *s, Error **errp)
407 {
408 int xsk_fd, idx, error = 0;
409
410 if (!s->map_path) {
411 return 0;
412 }
413
414 s->map_fd = bpf_obj_get(s->map_path);
415 if (s->map_fd < 0) {
416 error = errno;
417 } else {
418 xsk_fd = xsk_socket__fd(s->xsk);
419 idx = s->nc.queue_index + s->map_start_index;
420 if (bpf_map_update_elem(s->map_fd, &idx, &xsk_fd, 0)) {
421 error = errno;
422 }
423 }
424
425 if (error) {
426 error_setg_errno(errp, error,
427 "failed to insert AF_XDP socket into map %s",
428 s->map_path);
429 return -1;
430 }
431
432 return 0;
433 }
434
435 /* NetClientInfo methods. */
436 static NetClientInfo net_af_xdp_info = {
437 .type = NET_CLIENT_DRIVER_AF_XDP,
438 .size = sizeof(AFXDPState),
439 .receive = af_xdp_receive,
440 .poll = af_xdp_poll,
441 .cleanup = af_xdp_cleanup,
442 };
443
parse_socket_fds(const char * sock_fds_str,int64_t n_expected,Error ** errp)444 static int *parse_socket_fds(const char *sock_fds_str,
445 int64_t n_expected, Error **errp)
446 {
447 gchar **substrings = g_strsplit(sock_fds_str, ":", -1);
448 int64_t i, n_sock_fds = g_strv_length(substrings);
449 int *sock_fds = NULL;
450
451 if (n_sock_fds != n_expected) {
452 error_setg(errp, "expected %"PRIi64" socket fds, got %"PRIi64,
453 n_expected, n_sock_fds);
454 goto exit;
455 }
456
457 sock_fds = g_new(int, n_sock_fds);
458
459 for (i = 0; i < n_sock_fds; i++) {
460 sock_fds[i] = monitor_fd_param(monitor_cur(), substrings[i], errp);
461 if (sock_fds[i] < 0) {
462 g_free(sock_fds);
463 sock_fds = NULL;
464 goto exit;
465 }
466 }
467
468 exit:
469 g_strfreev(substrings);
470 return sock_fds;
471 }
472
473 /*
474 * The exported init function.
475 *
476 * ... -netdev af-xdp,ifname="..."
477 */
net_init_af_xdp(const Netdev * netdev,const char * name,NetClientState * peer,Error ** errp)478 int net_init_af_xdp(const Netdev *netdev,
479 const char *name, NetClientState *peer, Error **errp)
480 {
481 const NetdevAFXDPOptions *opts = &netdev->u.af_xdp;
482 NetClientState *nc, *nc0 = NULL;
483 int32_t map_start_index;
484 unsigned int ifindex;
485 uint32_t prog_id = 0;
486 g_autofree int *sock_fds = NULL;
487 int64_t i, queues;
488 Error *err = NULL;
489 AFXDPState *s;
490 bool inhibit;
491
492 ifindex = if_nametoindex(opts->ifname);
493 if (!ifindex) {
494 error_setg_errno(errp, errno, "failed to get ifindex for '%s'",
495 opts->ifname);
496 return -1;
497 }
498
499 queues = opts->has_queues ? opts->queues : 1;
500 if (queues < 1) {
501 error_setg(errp, "invalid number of queues (%" PRIi64 ") for '%s'",
502 queues, opts->ifname);
503 return -1;
504 }
505
506 inhibit = opts->has_inhibit && opts->inhibit;
507 if (inhibit && !opts->sock_fds && !opts->map_path) {
508 error_setg(errp, "'inhibit=on' requires 'sock-fds' or 'map-path'");
509 return -1;
510 }
511 if (!inhibit && (opts->sock_fds || opts->map_path)) {
512 error_setg(errp, "'sock-fds' and 'map-path' require 'inhibit=on'");
513 return -1;
514 }
515 if (opts->sock_fds && opts->map_path) {
516 error_setg(errp, "'sock-fds' and 'map-path' are mutually exclusive");
517 return -1;
518 }
519 if (!opts->map_path && opts->has_map_start_index) {
520 error_setg(errp, "'map-start-index' requires 'map-path'");
521 return -1;
522 }
523
524 map_start_index = opts->has_map_start_index ? opts->map_start_index : 0;
525 if (map_start_index < 0) {
526 error_setg(errp, "'map-start-index' cannot be negative (%d)",
527 map_start_index);
528 return -1;
529 }
530
531 if (opts->sock_fds) {
532 sock_fds = parse_socket_fds(opts->sock_fds, queues, errp);
533 if (!sock_fds) {
534 return -1;
535 }
536 }
537
538 for (i = 0; i < queues; i++) {
539 nc = qemu_new_net_client(&net_af_xdp_info, peer, "af-xdp", name);
540 qemu_set_info_str(nc, "af-xdp%"PRIi64" to %s", i, opts->ifname);
541 nc->queue_index = i;
542
543 if (!nc0) {
544 nc0 = nc;
545 }
546
547 s = DO_UPCAST(AFXDPState, nc, nc);
548
549 pstrcpy(s->ifname, sizeof(s->ifname), opts->ifname);
550 s->ifindex = ifindex;
551 s->inhibit = inhibit;
552
553 s->map_path = g_strdup(opts->map_path);
554 s->map_start_index = map_start_index;
555 s->map_fd = -1;
556
557 if (af_xdp_umem_create(s, sock_fds ? sock_fds[i] : -1, &err) ||
558 af_xdp_socket_create(s, opts, &err) ||
559 af_xdp_update_xsk_map(s, &err)) {
560 goto err;
561 }
562 }
563
564 if (nc0 && !inhibit) {
565 s = DO_UPCAST(AFXDPState, nc, nc0);
566 if (bpf_xdp_query_id(s->ifindex, s->xdp_flags, &prog_id) || !prog_id) {
567 error_setg_errno(&err, errno,
568 "no XDP program loaded on '%s', ifindex: %d",
569 s->ifname, s->ifindex);
570 goto err;
571 }
572 }
573
574 af_xdp_read_poll(s, true); /* Initially only poll for reads. */
575
576 return 0;
577
578 err:
579 if (nc0) {
580 qemu_del_net_client(nc0);
581 error_propagate(errp, err);
582 }
583
584 return -1;
585 }
586