1 /* 2 * AF_XDP network backend. 3 * 4 * Copyright (c) 2023 Red Hat, Inc. 5 * 6 * Authors: 7 * Ilya Maximets <i.maximets@ovn.org> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 14 #include "qemu/osdep.h" 15 #include <bpf/bpf.h> 16 #include <linux/if_link.h> 17 #include <linux/if_xdp.h> 18 #include <net/if.h> 19 #include <xdp/xsk.h> 20 21 #include "clients.h" 22 #include "monitor/monitor.h" 23 #include "net/net.h" 24 #include "qapi/error.h" 25 #include "qemu/cutils.h" 26 #include "qemu/error-report.h" 27 #include "qemu/iov.h" 28 #include "qemu/main-loop.h" 29 #include "qemu/memalign.h" 30 31 32 typedef struct AFXDPState { 33 NetClientState nc; 34 35 struct xsk_socket *xsk; 36 struct xsk_ring_cons rx; 37 struct xsk_ring_prod tx; 38 struct xsk_ring_cons cq; 39 struct xsk_ring_prod fq; 40 41 char ifname[IFNAMSIZ]; 42 int ifindex; 43 bool read_poll; 44 bool write_poll; 45 uint32_t outstanding_tx; 46 47 uint64_t *pool; 48 uint32_t n_pool; 49 char *buffer; 50 struct xsk_umem *umem; 51 52 uint32_t n_queues; 53 uint32_t xdp_flags; 54 bool inhibit; 55 } AFXDPState; 56 57 #define AF_XDP_BATCH_SIZE 64 58 59 static void af_xdp_send(void *opaque); 60 static void af_xdp_writable(void *opaque); 61 62 /* Set the event-loop handlers for the af-xdp backend. */ 63 static void af_xdp_update_fd_handler(AFXDPState *s) 64 { 65 qemu_set_fd_handler(xsk_socket__fd(s->xsk), 66 s->read_poll ? af_xdp_send : NULL, 67 s->write_poll ? af_xdp_writable : NULL, 68 s); 69 } 70 71 /* Update the read handler. */ 72 static void af_xdp_read_poll(AFXDPState *s, bool enable) 73 { 74 if (s->read_poll != enable) { 75 s->read_poll = enable; 76 af_xdp_update_fd_handler(s); 77 } 78 } 79 80 /* Update the write handler. */ 81 static void af_xdp_write_poll(AFXDPState *s, bool enable) 82 { 83 if (s->write_poll != enable) { 84 s->write_poll = enable; 85 af_xdp_update_fd_handler(s); 86 } 87 } 88 89 static void af_xdp_poll(NetClientState *nc, bool enable) 90 { 91 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc); 92 93 if (s->read_poll != enable || s->write_poll != enable) { 94 s->write_poll = enable; 95 s->read_poll = enable; 96 af_xdp_update_fd_handler(s); 97 } 98 } 99 100 static void af_xdp_complete_tx(AFXDPState *s) 101 { 102 uint32_t idx = 0; 103 uint32_t done, i; 104 uint64_t *addr; 105 106 done = xsk_ring_cons__peek(&s->cq, XSK_RING_CONS__DEFAULT_NUM_DESCS, &idx); 107 108 for (i = 0; i < done; i++) { 109 addr = (void *) xsk_ring_cons__comp_addr(&s->cq, idx++); 110 s->pool[s->n_pool++] = *addr; 111 s->outstanding_tx--; 112 } 113 114 if (done) { 115 xsk_ring_cons__release(&s->cq, done); 116 } 117 } 118 119 /* 120 * The fd_write() callback, invoked if the fd is marked as writable 121 * after a poll. 122 */ 123 static void af_xdp_writable(void *opaque) 124 { 125 AFXDPState *s = opaque; 126 127 /* Try to recover buffers that are already sent. */ 128 af_xdp_complete_tx(s); 129 130 /* 131 * Unregister the handler, unless we still have packets to transmit 132 * and kernel needs a wake up. 133 */ 134 if (!s->outstanding_tx || !xsk_ring_prod__needs_wakeup(&s->tx)) { 135 af_xdp_write_poll(s, false); 136 } 137 138 /* Flush any buffered packets. */ 139 qemu_flush_queued_packets(&s->nc); 140 } 141 142 static ssize_t af_xdp_receive(NetClientState *nc, 143 const uint8_t *buf, size_t size) 144 { 145 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc); 146 struct xdp_desc *desc; 147 uint32_t idx; 148 void *data; 149 150 /* Try to recover buffers that are already sent. */ 151 af_xdp_complete_tx(s); 152 153 if (size > XSK_UMEM__DEFAULT_FRAME_SIZE) { 154 /* We can't transmit packet this size... */ 155 return size; 156 } 157 158 if (!s->n_pool || !xsk_ring_prod__reserve(&s->tx, 1, &idx)) { 159 /* 160 * Out of buffers or space in tx ring. Poll until we can write. 161 * This will also kick the Tx, if it was waiting on CQ. 162 */ 163 af_xdp_write_poll(s, true); 164 return 0; 165 } 166 167 desc = xsk_ring_prod__tx_desc(&s->tx, idx); 168 desc->addr = s->pool[--s->n_pool]; 169 desc->len = size; 170 171 data = xsk_umem__get_data(s->buffer, desc->addr); 172 memcpy(data, buf, size); 173 174 xsk_ring_prod__submit(&s->tx, 1); 175 s->outstanding_tx++; 176 177 if (xsk_ring_prod__needs_wakeup(&s->tx)) { 178 af_xdp_write_poll(s, true); 179 } 180 181 return size; 182 } 183 184 /* 185 * Complete a previous send (backend --> guest) and enable the 186 * fd_read callback. 187 */ 188 static void af_xdp_send_completed(NetClientState *nc, ssize_t len) 189 { 190 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc); 191 192 af_xdp_read_poll(s, true); 193 } 194 195 static void af_xdp_fq_refill(AFXDPState *s, uint32_t n) 196 { 197 uint32_t i, idx = 0; 198 199 /* Leave one packet for Tx, just in case. */ 200 if (s->n_pool < n + 1) { 201 n = s->n_pool; 202 } 203 204 if (!n || !xsk_ring_prod__reserve(&s->fq, n, &idx)) { 205 return; 206 } 207 208 for (i = 0; i < n; i++) { 209 *xsk_ring_prod__fill_addr(&s->fq, idx++) = s->pool[--s->n_pool]; 210 } 211 xsk_ring_prod__submit(&s->fq, n); 212 213 if (xsk_ring_prod__needs_wakeup(&s->fq)) { 214 /* Receive was blocked by not having enough buffers. Wake it up. */ 215 af_xdp_read_poll(s, true); 216 } 217 } 218 219 static void af_xdp_send(void *opaque) 220 { 221 uint32_t i, n_rx, idx = 0; 222 AFXDPState *s = opaque; 223 224 n_rx = xsk_ring_cons__peek(&s->rx, AF_XDP_BATCH_SIZE, &idx); 225 if (!n_rx) { 226 return; 227 } 228 229 for (i = 0; i < n_rx; i++) { 230 const struct xdp_desc *desc; 231 struct iovec iov; 232 233 desc = xsk_ring_cons__rx_desc(&s->rx, idx++); 234 235 iov.iov_base = xsk_umem__get_data(s->buffer, desc->addr); 236 iov.iov_len = desc->len; 237 238 s->pool[s->n_pool++] = desc->addr; 239 240 if (!qemu_sendv_packet_async(&s->nc, &iov, 1, 241 af_xdp_send_completed)) { 242 /* 243 * The peer does not receive anymore. Packet is queued, stop 244 * reading from the backend until af_xdp_send_completed(). 245 */ 246 af_xdp_read_poll(s, false); 247 248 /* Return unused descriptors to not break the ring cache. */ 249 xsk_ring_cons__cancel(&s->rx, n_rx - i - 1); 250 n_rx = i + 1; 251 break; 252 } 253 } 254 255 /* Release actually sent descriptors and try to re-fill. */ 256 xsk_ring_cons__release(&s->rx, n_rx); 257 af_xdp_fq_refill(s, AF_XDP_BATCH_SIZE); 258 } 259 260 /* Flush and close. */ 261 static void af_xdp_cleanup(NetClientState *nc) 262 { 263 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc); 264 265 qemu_purge_queued_packets(nc); 266 267 af_xdp_poll(nc, false); 268 269 xsk_socket__delete(s->xsk); 270 s->xsk = NULL; 271 g_free(s->pool); 272 s->pool = NULL; 273 xsk_umem__delete(s->umem); 274 s->umem = NULL; 275 qemu_vfree(s->buffer); 276 s->buffer = NULL; 277 278 /* Remove the program if it's the last open queue. */ 279 if (!s->inhibit && nc->queue_index == s->n_queues - 1 && s->xdp_flags 280 && bpf_xdp_detach(s->ifindex, s->xdp_flags, NULL) != 0) { 281 fprintf(stderr, 282 "af-xdp: unable to remove XDP program from '%s', ifindex: %d\n", 283 s->ifname, s->ifindex); 284 } 285 } 286 287 static int af_xdp_umem_create(AFXDPState *s, int sock_fd, Error **errp) 288 { 289 struct xsk_umem_config config = { 290 .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, 291 .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, 292 .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE, 293 .frame_headroom = 0, 294 }; 295 uint64_t n_descs; 296 uint64_t size; 297 int64_t i; 298 int ret; 299 300 /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */ 301 n_descs = (XSK_RING_PROD__DEFAULT_NUM_DESCS 302 + XSK_RING_CONS__DEFAULT_NUM_DESCS) * 2; 303 size = n_descs * XSK_UMEM__DEFAULT_FRAME_SIZE; 304 305 s->buffer = qemu_memalign(qemu_real_host_page_size(), size); 306 memset(s->buffer, 0, size); 307 308 if (sock_fd < 0) { 309 ret = xsk_umem__create(&s->umem, s->buffer, size, 310 &s->fq, &s->cq, &config); 311 } else { 312 ret = xsk_umem__create_with_fd(&s->umem, sock_fd, s->buffer, size, 313 &s->fq, &s->cq, &config); 314 } 315 316 if (ret) { 317 qemu_vfree(s->buffer); 318 error_setg_errno(errp, errno, 319 "failed to create umem for %s queue_index: %d", 320 s->ifname, s->nc.queue_index); 321 return -1; 322 } 323 324 s->pool = g_new(uint64_t, n_descs); 325 /* Fill the pool in the opposite order, because it's a LIFO queue. */ 326 for (i = n_descs; i >= 0; i--) { 327 s->pool[i] = i * XSK_UMEM__DEFAULT_FRAME_SIZE; 328 } 329 s->n_pool = n_descs; 330 331 af_xdp_fq_refill(s, XSK_RING_PROD__DEFAULT_NUM_DESCS); 332 333 return 0; 334 } 335 336 static int af_xdp_socket_create(AFXDPState *s, 337 const NetdevAFXDPOptions *opts, Error **errp) 338 { 339 struct xsk_socket_config cfg = { 340 .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, 341 .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, 342 .libxdp_flags = 0, 343 .bind_flags = XDP_USE_NEED_WAKEUP, 344 .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST, 345 }; 346 int queue_id, error = 0; 347 348 s->inhibit = opts->has_inhibit && opts->inhibit; 349 if (s->inhibit) { 350 cfg.libxdp_flags |= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD; 351 } 352 353 if (opts->has_force_copy && opts->force_copy) { 354 cfg.bind_flags |= XDP_COPY; 355 } 356 357 queue_id = s->nc.queue_index; 358 if (opts->has_start_queue && opts->start_queue > 0) { 359 queue_id += opts->start_queue; 360 } 361 362 if (opts->has_mode) { 363 /* Specific mode requested. */ 364 cfg.xdp_flags |= (opts->mode == AFXDP_MODE_NATIVE) 365 ? XDP_FLAGS_DRV_MODE : XDP_FLAGS_SKB_MODE; 366 if (xsk_socket__create(&s->xsk, s->ifname, queue_id, 367 s->umem, &s->rx, &s->tx, &cfg)) { 368 error = errno; 369 } 370 } else { 371 /* No mode requested, try native first. */ 372 cfg.xdp_flags |= XDP_FLAGS_DRV_MODE; 373 374 if (xsk_socket__create(&s->xsk, s->ifname, queue_id, 375 s->umem, &s->rx, &s->tx, &cfg)) { 376 /* Can't use native mode, try skb. */ 377 cfg.xdp_flags &= ~XDP_FLAGS_DRV_MODE; 378 cfg.xdp_flags |= XDP_FLAGS_SKB_MODE; 379 380 if (xsk_socket__create(&s->xsk, s->ifname, queue_id, 381 s->umem, &s->rx, &s->tx, &cfg)) { 382 error = errno; 383 } 384 } 385 } 386 387 if (error) { 388 error_setg_errno(errp, error, 389 "failed to create AF_XDP socket for %s queue_id: %d", 390 s->ifname, queue_id); 391 return -1; 392 } 393 394 s->xdp_flags = cfg.xdp_flags; 395 396 return 0; 397 } 398 399 /* NetClientInfo methods. */ 400 static NetClientInfo net_af_xdp_info = { 401 .type = NET_CLIENT_DRIVER_AF_XDP, 402 .size = sizeof(AFXDPState), 403 .receive = af_xdp_receive, 404 .poll = af_xdp_poll, 405 .cleanup = af_xdp_cleanup, 406 }; 407 408 static int *parse_socket_fds(const char *sock_fds_str, 409 int64_t n_expected, Error **errp) 410 { 411 gchar **substrings = g_strsplit(sock_fds_str, ":", -1); 412 int64_t i, n_sock_fds = g_strv_length(substrings); 413 int *sock_fds = NULL; 414 415 if (n_sock_fds != n_expected) { 416 error_setg(errp, "expected %"PRIi64" socket fds, got %"PRIi64, 417 n_expected, n_sock_fds); 418 goto exit; 419 } 420 421 sock_fds = g_new(int, n_sock_fds); 422 423 for (i = 0; i < n_sock_fds; i++) { 424 sock_fds[i] = monitor_fd_param(monitor_cur(), substrings[i], errp); 425 if (sock_fds[i] < 0) { 426 g_free(sock_fds); 427 sock_fds = NULL; 428 goto exit; 429 } 430 } 431 432 exit: 433 g_strfreev(substrings); 434 return sock_fds; 435 } 436 437 /* 438 * The exported init function. 439 * 440 * ... -netdev af-xdp,ifname="..." 441 */ 442 int net_init_af_xdp(const Netdev *netdev, 443 const char *name, NetClientState *peer, Error **errp) 444 { 445 const NetdevAFXDPOptions *opts = &netdev->u.af_xdp; 446 NetClientState *nc, *nc0 = NULL; 447 unsigned int ifindex; 448 uint32_t prog_id = 0; 449 int *sock_fds = NULL; 450 int64_t i, queues; 451 Error *err = NULL; 452 AFXDPState *s; 453 454 ifindex = if_nametoindex(opts->ifname); 455 if (!ifindex) { 456 error_setg_errno(errp, errno, "failed to get ifindex for '%s'", 457 opts->ifname); 458 return -1; 459 } 460 461 queues = opts->has_queues ? opts->queues : 1; 462 if (queues < 1) { 463 error_setg(errp, "invalid number of queues (%" PRIi64 ") for '%s'", 464 queues, opts->ifname); 465 return -1; 466 } 467 468 if ((opts->has_inhibit && opts->inhibit) != !!opts->sock_fds) { 469 error_setg(errp, "'inhibit=on' requires 'sock-fds' and vice versa"); 470 return -1; 471 } 472 473 if (opts->sock_fds) { 474 sock_fds = parse_socket_fds(opts->sock_fds, queues, errp); 475 if (!sock_fds) { 476 return -1; 477 } 478 } 479 480 for (i = 0; i < queues; i++) { 481 nc = qemu_new_net_client(&net_af_xdp_info, peer, "af-xdp", name); 482 qemu_set_info_str(nc, "af-xdp%"PRIi64" to %s", i, opts->ifname); 483 nc->queue_index = i; 484 485 if (!nc0) { 486 nc0 = nc; 487 } 488 489 s = DO_UPCAST(AFXDPState, nc, nc); 490 491 pstrcpy(s->ifname, sizeof(s->ifname), opts->ifname); 492 s->ifindex = ifindex; 493 s->n_queues = queues; 494 495 if (af_xdp_umem_create(s, sock_fds ? sock_fds[i] : -1, errp) 496 || af_xdp_socket_create(s, opts, errp)) { 497 /* Make sure the XDP program will be removed. */ 498 s->n_queues = i; 499 error_propagate(errp, err); 500 goto err; 501 } 502 } 503 504 if (nc0) { 505 s = DO_UPCAST(AFXDPState, nc, nc0); 506 if (bpf_xdp_query_id(s->ifindex, s->xdp_flags, &prog_id) || !prog_id) { 507 error_setg_errno(errp, errno, 508 "no XDP program loaded on '%s', ifindex: %d", 509 s->ifname, s->ifindex); 510 goto err; 511 } 512 } 513 514 af_xdp_read_poll(s, true); /* Initially only poll for reads. */ 515 516 return 0; 517 518 err: 519 g_free(sock_fds); 520 if (nc0) { 521 qemu_del_net_client(nc0); 522 } 523 524 return -1; 525 } 526