xref: /openbmc/qemu/net/af-xdp.c (revision 504632dcc63145e6c5297fc1b7f1d76450dd845a)
1 /*
2  * AF_XDP network backend.
3  *
4  * Copyright (c) 2023 Red Hat, Inc.
5  *
6  * Authors:
7  *  Ilya Maximets <i.maximets@ovn.org>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 
14 #include "qemu/osdep.h"
15 #include <bpf/bpf.h>
16 #include <linux/if_link.h>
17 #include <linux/if_xdp.h>
18 #include <net/if.h>
19 #include <xdp/xsk.h>
20 
21 #include "clients.h"
22 #include "monitor/monitor.h"
23 #include "net/net.h"
24 #include "qapi/error.h"
25 #include "qemu/cutils.h"
26 #include "qemu/error-report.h"
27 #include "qemu/iov.h"
28 #include "qemu/main-loop.h"
29 #include "qemu/memalign.h"
30 
31 
32 typedef struct AFXDPState {
33     NetClientState       nc;
34 
35     struct xsk_socket    *xsk;
36     struct xsk_ring_cons rx;
37     struct xsk_ring_prod tx;
38     struct xsk_ring_cons cq;
39     struct xsk_ring_prod fq;
40 
41     char                 ifname[IFNAMSIZ];
42     int                  ifindex;
43     bool                 read_poll;
44     bool                 write_poll;
45     uint32_t             outstanding_tx;
46 
47     uint64_t             *pool;
48     uint32_t             n_pool;
49     char                 *buffer;
50     struct xsk_umem      *umem;
51 
52     uint32_t             xdp_flags;
53     bool                 inhibit;
54 
55     char                 *map_path;
56     int                  map_fd;
57     uint32_t             map_start_index;
58 } AFXDPState;
59 
60 #define AF_XDP_BATCH_SIZE 64
61 
62 static void af_xdp_send(void *opaque);
63 static void af_xdp_writable(void *opaque);
64 
65 /* Set the event-loop handlers for the af-xdp backend. */
66 static void af_xdp_update_fd_handler(AFXDPState *s)
67 {
68     qemu_set_fd_handler(xsk_socket__fd(s->xsk),
69                         s->read_poll ? af_xdp_send : NULL,
70                         s->write_poll ? af_xdp_writable : NULL,
71                         s);
72 }
73 
74 /* Update the read handler. */
75 static void af_xdp_read_poll(AFXDPState *s, bool enable)
76 {
77     if (s->read_poll != enable) {
78         s->read_poll = enable;
79         af_xdp_update_fd_handler(s);
80     }
81 }
82 
83 /* Update the write handler. */
84 static void af_xdp_write_poll(AFXDPState *s, bool enable)
85 {
86     if (s->write_poll != enable) {
87         s->write_poll = enable;
88         af_xdp_update_fd_handler(s);
89     }
90 }
91 
92 static void af_xdp_poll(NetClientState *nc, bool enable)
93 {
94     AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
95 
96     if (s->read_poll != enable || s->write_poll != enable) {
97         s->write_poll = enable;
98         s->read_poll  = enable;
99         af_xdp_update_fd_handler(s);
100     }
101 }
102 
103 static void af_xdp_complete_tx(AFXDPState *s)
104 {
105     uint32_t idx = 0;
106     uint32_t done, i;
107     uint64_t *addr;
108 
109     done = xsk_ring_cons__peek(&s->cq, XSK_RING_CONS__DEFAULT_NUM_DESCS, &idx);
110 
111     for (i = 0; i < done; i++) {
112         addr = (void *) xsk_ring_cons__comp_addr(&s->cq, idx++);
113         s->pool[s->n_pool++] = *addr;
114         s->outstanding_tx--;
115     }
116 
117     if (done) {
118         xsk_ring_cons__release(&s->cq, done);
119     }
120 }
121 
122 /*
123  * The fd_write() callback, invoked if the fd is marked as writable
124  * after a poll.
125  */
126 static void af_xdp_writable(void *opaque)
127 {
128     AFXDPState *s = opaque;
129 
130     /* Try to recover buffers that are already sent. */
131     af_xdp_complete_tx(s);
132 
133     /*
134      * Unregister the handler, unless we still have packets to transmit
135      * and kernel needs a wake up.
136      */
137     if (!s->outstanding_tx || !xsk_ring_prod__needs_wakeup(&s->tx)) {
138         af_xdp_write_poll(s, false);
139     }
140 
141     /* Flush any buffered packets. */
142     qemu_flush_queued_packets(&s->nc);
143 }
144 
145 static ssize_t af_xdp_receive(NetClientState *nc,
146                               const uint8_t *buf, size_t size)
147 {
148     AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
149     struct xdp_desc *desc;
150     uint32_t idx;
151     void *data;
152 
153     /* Try to recover buffers that are already sent. */
154     af_xdp_complete_tx(s);
155 
156     if (size > XSK_UMEM__DEFAULT_FRAME_SIZE) {
157         /* We can't transmit packet this size... */
158         return size;
159     }
160 
161     if (!s->n_pool || !xsk_ring_prod__reserve(&s->tx, 1, &idx)) {
162         /*
163          * Out of buffers or space in tx ring.  Poll until we can write.
164          * This will also kick the Tx, if it was waiting on CQ.
165          */
166         af_xdp_write_poll(s, true);
167         return 0;
168     }
169 
170     desc = xsk_ring_prod__tx_desc(&s->tx, idx);
171     desc->addr = s->pool[--s->n_pool];
172     desc->len = size;
173 
174     data = xsk_umem__get_data(s->buffer, desc->addr);
175     memcpy(data, buf, size);
176 
177     xsk_ring_prod__submit(&s->tx, 1);
178     s->outstanding_tx++;
179 
180     if (xsk_ring_prod__needs_wakeup(&s->tx)) {
181         af_xdp_write_poll(s, true);
182     }
183 
184     return size;
185 }
186 
187 /*
188  * Complete a previous send (backend --> guest) and enable the
189  * fd_read callback.
190  */
191 static void af_xdp_send_completed(NetClientState *nc, ssize_t len)
192 {
193     AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
194 
195     af_xdp_read_poll(s, true);
196 }
197 
198 static void af_xdp_fq_refill(AFXDPState *s, uint32_t n)
199 {
200     uint32_t i, idx = 0;
201 
202     /* Leave one packet for Tx, just in case. */
203     if (s->n_pool < n + 1) {
204         n = s->n_pool;
205     }
206 
207     if (!n || !xsk_ring_prod__reserve(&s->fq, n, &idx)) {
208         return;
209     }
210 
211     for (i = 0; i < n; i++) {
212         *xsk_ring_prod__fill_addr(&s->fq, idx++) = s->pool[--s->n_pool];
213     }
214     xsk_ring_prod__submit(&s->fq, n);
215 
216     if (xsk_ring_prod__needs_wakeup(&s->fq)) {
217         /* Receive was blocked by not having enough buffers.  Wake it up. */
218         af_xdp_read_poll(s, true);
219     }
220 }
221 
222 static void af_xdp_send(void *opaque)
223 {
224     uint32_t i, n_rx, idx = 0;
225     AFXDPState *s = opaque;
226 
227     n_rx = xsk_ring_cons__peek(&s->rx, AF_XDP_BATCH_SIZE, &idx);
228     if (!n_rx) {
229         return;
230     }
231 
232     for (i = 0; i < n_rx; i++) {
233         const struct xdp_desc *desc;
234         struct iovec iov;
235 
236         desc = xsk_ring_cons__rx_desc(&s->rx, idx++);
237 
238         iov.iov_base = xsk_umem__get_data(s->buffer, desc->addr);
239         iov.iov_len = desc->len;
240 
241         s->pool[s->n_pool++] = desc->addr;
242 
243         if (!qemu_sendv_packet_async(&s->nc, &iov, 1,
244                                      af_xdp_send_completed)) {
245             /*
246              * The peer does not receive anymore.  Packet is queued, stop
247              * reading from the backend until af_xdp_send_completed().
248              */
249             af_xdp_read_poll(s, false);
250 
251             /* Return unused descriptors to not break the ring cache. */
252             xsk_ring_cons__cancel(&s->rx, n_rx - i - 1);
253             n_rx = i + 1;
254             break;
255         }
256     }
257 
258     /* Release actually sent descriptors and try to re-fill. */
259     xsk_ring_cons__release(&s->rx, n_rx);
260     af_xdp_fq_refill(s, AF_XDP_BATCH_SIZE);
261 }
262 
263 /* Flush and close. */
264 static void af_xdp_cleanup(NetClientState *nc)
265 {
266     AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc);
267     int idx;
268 
269     qemu_purge_queued_packets(nc);
270 
271     af_xdp_poll(nc, false);
272 
273     xsk_socket__delete(s->xsk);
274     s->xsk = NULL;
275     g_free(s->pool);
276     s->pool = NULL;
277     xsk_umem__delete(s->umem);
278     s->umem = NULL;
279     qemu_vfree(s->buffer);
280     s->buffer = NULL;
281 
282     if (s->map_fd >= 0) {
283         idx = nc->queue_index + s->map_start_index;
284         if (bpf_map_delete_elem(s->map_fd, &idx)) {
285             fprintf(stderr, "af-xdp: unable to remove AF_XDP socket from map"
286                     " %s\n", s->map_path);
287         }
288         close(s->map_fd);
289         s->map_fd = -1;
290     }
291     g_free(s->map_path);
292     s->map_path = NULL;
293 }
294 
295 static int af_xdp_umem_create(AFXDPState *s, int sock_fd, Error **errp)
296 {
297     struct xsk_umem_config config = {
298         .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
299         .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
300         .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
301         .frame_headroom = 0,
302     };
303     uint64_t n_descs;
304     uint64_t size;
305     int64_t i;
306     int ret;
307 
308     /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */
309     n_descs = (XSK_RING_PROD__DEFAULT_NUM_DESCS
310                + XSK_RING_CONS__DEFAULT_NUM_DESCS) * 2;
311     size = n_descs * XSK_UMEM__DEFAULT_FRAME_SIZE;
312 
313     s->buffer = qemu_memalign(qemu_real_host_page_size(), size);
314     memset(s->buffer, 0, size);
315 
316     if (sock_fd < 0) {
317         ret = xsk_umem__create(&s->umem, s->buffer, size,
318                                &s->fq, &s->cq, &config);
319     } else {
320         ret = xsk_umem__create_with_fd(&s->umem, sock_fd, s->buffer, size,
321                                        &s->fq, &s->cq, &config);
322     }
323 
324     if (ret) {
325         qemu_vfree(s->buffer);
326         error_setg_errno(errp, errno,
327                          "failed to create umem for %s queue_index: %d",
328                          s->ifname, s->nc.queue_index);
329         return -1;
330     }
331 
332     s->pool = g_new(uint64_t, n_descs);
333     /* Fill the pool in the opposite order, because it's a LIFO queue. */
334     for (i = n_descs - 1; i >= 0; i--) {
335         s->pool[i] = i * XSK_UMEM__DEFAULT_FRAME_SIZE;
336     }
337     s->n_pool = n_descs;
338 
339     af_xdp_fq_refill(s, XSK_RING_PROD__DEFAULT_NUM_DESCS);
340 
341     return 0;
342 }
343 
344 static int af_xdp_socket_create(AFXDPState *s,
345                                 const NetdevAFXDPOptions *opts, Error **errp)
346 {
347     struct xsk_socket_config cfg = {
348         .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
349         .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
350         .libxdp_flags = 0,
351         .bind_flags = XDP_USE_NEED_WAKEUP,
352         .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST,
353     };
354     int queue_id, error = 0;
355 
356     if (s->inhibit) {
357         cfg.libxdp_flags |= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD;
358     }
359 
360     if (opts->has_force_copy && opts->force_copy) {
361         cfg.bind_flags |= XDP_COPY;
362     }
363 
364     queue_id = s->nc.queue_index;
365     if (opts->has_start_queue && opts->start_queue > 0) {
366         queue_id += opts->start_queue;
367     }
368 
369     if (opts->has_mode) {
370         /* Specific mode requested. */
371         cfg.xdp_flags |= (opts->mode == AFXDP_MODE_NATIVE)
372                          ? XDP_FLAGS_DRV_MODE : XDP_FLAGS_SKB_MODE;
373         if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
374                                s->umem, &s->rx, &s->tx, &cfg)) {
375             error = errno;
376         }
377     } else {
378         /* No mode requested, try native first. */
379         cfg.xdp_flags |= XDP_FLAGS_DRV_MODE;
380 
381         if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
382                                s->umem, &s->rx, &s->tx, &cfg)) {
383             /* Can't use native mode, try skb. */
384             cfg.xdp_flags &= ~XDP_FLAGS_DRV_MODE;
385             cfg.xdp_flags |= XDP_FLAGS_SKB_MODE;
386 
387             if (xsk_socket__create(&s->xsk, s->ifname, queue_id,
388                                    s->umem, &s->rx, &s->tx, &cfg)) {
389                 error = errno;
390             }
391         }
392     }
393 
394     if (error) {
395         error_setg_errno(errp, error,
396                          "failed to create AF_XDP socket for %s queue_id: %d",
397                          s->ifname, queue_id);
398         return -1;
399     }
400 
401     s->xdp_flags = cfg.xdp_flags;
402 
403     return 0;
404 }
405 
406 static int af_xdp_update_xsk_map(AFXDPState *s, Error **errp)
407 {
408     int xsk_fd, idx, error = 0;
409 
410     if (!s->map_path) {
411         return 0;
412     }
413 
414     s->map_fd = bpf_obj_get(s->map_path);
415     if (s->map_fd < 0) {
416         error = errno;
417     } else {
418         xsk_fd = xsk_socket__fd(s->xsk);
419         idx = s->nc.queue_index + s->map_start_index;
420         if (bpf_map_update_elem(s->map_fd, &idx, &xsk_fd, 0)) {
421             error = errno;
422         }
423     }
424 
425     if (error) {
426         error_setg_errno(errp, error,
427                          "failed to insert AF_XDP socket into map %s",
428                          s->map_path);
429         return -1;
430     }
431 
432     return 0;
433 }
434 
435 /* NetClientInfo methods. */
436 static NetClientInfo net_af_xdp_info = {
437     .type = NET_CLIENT_DRIVER_AF_XDP,
438     .size = sizeof(AFXDPState),
439     .receive = af_xdp_receive,
440     .poll = af_xdp_poll,
441     .cleanup = af_xdp_cleanup,
442 };
443 
444 static int *parse_socket_fds(const char *sock_fds_str,
445                              int64_t n_expected, Error **errp)
446 {
447     gchar **substrings = g_strsplit(sock_fds_str, ":", -1);
448     int64_t i, n_sock_fds = g_strv_length(substrings);
449     int *sock_fds = NULL;
450 
451     if (n_sock_fds != n_expected) {
452         error_setg(errp, "expected %"PRIi64" socket fds, got %"PRIi64,
453                    n_expected, n_sock_fds);
454         goto exit;
455     }
456 
457     sock_fds = g_new(int, n_sock_fds);
458 
459     for (i = 0; i < n_sock_fds; i++) {
460         sock_fds[i] = monitor_fd_param(monitor_cur(), substrings[i], errp);
461         if (sock_fds[i] < 0) {
462             g_free(sock_fds);
463             sock_fds = NULL;
464             goto exit;
465         }
466     }
467 
468 exit:
469     g_strfreev(substrings);
470     return sock_fds;
471 }
472 
473 /*
474  * The exported init function.
475  *
476  * ... -netdev af-xdp,ifname="..."
477  */
478 int net_init_af_xdp(const Netdev *netdev,
479                     const char *name, NetClientState *peer, Error **errp)
480 {
481     const NetdevAFXDPOptions *opts = &netdev->u.af_xdp;
482     NetClientState *nc, *nc0 = NULL;
483     int32_t map_start_index;
484     unsigned int ifindex;
485     uint32_t prog_id = 0;
486     g_autofree int *sock_fds = NULL;
487     int64_t i, queues;
488     Error *err = NULL;
489     AFXDPState *s;
490     bool inhibit;
491 
492     ifindex = if_nametoindex(opts->ifname);
493     if (!ifindex) {
494         error_setg_errno(errp, errno, "failed to get ifindex for '%s'",
495                          opts->ifname);
496         return -1;
497     }
498 
499     queues = opts->has_queues ? opts->queues : 1;
500     if (queues < 1) {
501         error_setg(errp, "invalid number of queues (%" PRIi64 ") for '%s'",
502                    queues, opts->ifname);
503         return -1;
504     }
505 
506     inhibit = opts->has_inhibit && opts->inhibit;
507     if (inhibit && !opts->sock_fds && !opts->map_path) {
508         error_setg(errp, "'inhibit=on' requires 'sock-fds' or 'map-path'");
509         return -1;
510     }
511     if (!inhibit && (opts->sock_fds || opts->map_path)) {
512         error_setg(errp, "'sock-fds' and 'map-path' require 'inhibit=on'");
513         return -1;
514     }
515     if (opts->sock_fds && opts->map_path) {
516         error_setg(errp, "'sock-fds' and 'map-path' are mutually exclusive");
517         return -1;
518     }
519     if (!opts->map_path && opts->has_map_start_index) {
520         error_setg(errp, "'map-start-index' requires 'map-path'");
521         return -1;
522     }
523 
524     map_start_index = opts->has_map_start_index ? opts->map_start_index : 0;
525     if (map_start_index < 0) {
526         error_setg(errp, "'map-start-index' cannot be negative (%d)",
527                    map_start_index);
528         return -1;
529     }
530 
531     if (opts->sock_fds) {
532         sock_fds = parse_socket_fds(opts->sock_fds, queues, errp);
533         if (!sock_fds) {
534             return -1;
535         }
536     }
537 
538     for (i = 0; i < queues; i++) {
539         nc = qemu_new_net_client(&net_af_xdp_info, peer, "af-xdp", name);
540         qemu_set_info_str(nc, "af-xdp%"PRIi64" to %s", i, opts->ifname);
541         nc->queue_index = i;
542 
543         if (!nc0) {
544             nc0 = nc;
545         }
546 
547         s = DO_UPCAST(AFXDPState, nc, nc);
548 
549         pstrcpy(s->ifname, sizeof(s->ifname), opts->ifname);
550         s->ifindex = ifindex;
551         s->inhibit = inhibit;
552 
553         s->map_path = g_strdup(opts->map_path);
554         s->map_start_index = map_start_index;
555         s->map_fd = -1;
556 
557         if (af_xdp_umem_create(s, sock_fds ? sock_fds[i] : -1, &err) ||
558             af_xdp_socket_create(s, opts, &err) ||
559             af_xdp_update_xsk_map(s, &err)) {
560             goto err;
561         }
562     }
563 
564     if (nc0 && !inhibit) {
565         s = DO_UPCAST(AFXDPState, nc, nc0);
566         if (bpf_xdp_query_id(s->ifindex, s->xdp_flags, &prog_id) || !prog_id) {
567             error_setg_errno(&err, errno,
568                              "no XDP program loaded on '%s', ifindex: %d",
569                              s->ifname, s->ifindex);
570             goto err;
571         }
572     }
573 
574     af_xdp_read_poll(s, true); /* Initially only poll for reads. */
575 
576     return 0;
577 
578 err:
579     if (nc0) {
580         qemu_del_net_client(nc0);
581         error_propagate(errp, err);
582     }
583 
584     return -1;
585 }
586