1 /* 2 * AF_XDP network backend. 3 * 4 * Copyright (c) 2023 Red Hat, Inc. 5 * 6 * Authors: 7 * Ilya Maximets <i.maximets@ovn.org> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 14 #include "qemu/osdep.h" 15 #include <bpf/bpf.h> 16 #include <linux/if_link.h> 17 #include <linux/if_xdp.h> 18 #include <net/if.h> 19 #include <xdp/xsk.h> 20 21 #include "clients.h" 22 #include "monitor/monitor.h" 23 #include "net/net.h" 24 #include "qapi/error.h" 25 #include "qemu/cutils.h" 26 #include "qemu/error-report.h" 27 #include "qemu/iov.h" 28 #include "qemu/main-loop.h" 29 #include "qemu/memalign.h" 30 31 32 typedef struct AFXDPState { 33 NetClientState nc; 34 35 struct xsk_socket *xsk; 36 struct xsk_ring_cons rx; 37 struct xsk_ring_prod tx; 38 struct xsk_ring_cons cq; 39 struct xsk_ring_prod fq; 40 41 char ifname[IFNAMSIZ]; 42 int ifindex; 43 bool read_poll; 44 bool write_poll; 45 uint32_t outstanding_tx; 46 47 uint64_t *pool; 48 uint32_t n_pool; 49 char *buffer; 50 struct xsk_umem *umem; 51 52 uint32_t xdp_flags; 53 bool inhibit; 54 55 char *map_path; 56 int map_fd; 57 uint32_t map_start_index; 58 } AFXDPState; 59 60 #define AF_XDP_BATCH_SIZE 64 61 62 static void af_xdp_send(void *opaque); 63 static void af_xdp_writable(void *opaque); 64 65 /* Set the event-loop handlers for the af-xdp backend. */ 66 static void af_xdp_update_fd_handler(AFXDPState *s) 67 { 68 qemu_set_fd_handler(xsk_socket__fd(s->xsk), 69 s->read_poll ? af_xdp_send : NULL, 70 s->write_poll ? af_xdp_writable : NULL, 71 s); 72 } 73 74 /* Update the read handler. */ 75 static void af_xdp_read_poll(AFXDPState *s, bool enable) 76 { 77 if (s->read_poll != enable) { 78 s->read_poll = enable; 79 af_xdp_update_fd_handler(s); 80 } 81 } 82 83 /* Update the write handler. */ 84 static void af_xdp_write_poll(AFXDPState *s, bool enable) 85 { 86 if (s->write_poll != enable) { 87 s->write_poll = enable; 88 af_xdp_update_fd_handler(s); 89 } 90 } 91 92 static void af_xdp_poll(NetClientState *nc, bool enable) 93 { 94 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc); 95 96 if (s->read_poll != enable || s->write_poll != enable) { 97 s->write_poll = enable; 98 s->read_poll = enable; 99 af_xdp_update_fd_handler(s); 100 } 101 } 102 103 static void af_xdp_complete_tx(AFXDPState *s) 104 { 105 uint32_t idx = 0; 106 uint32_t done, i; 107 uint64_t *addr; 108 109 done = xsk_ring_cons__peek(&s->cq, XSK_RING_CONS__DEFAULT_NUM_DESCS, &idx); 110 111 for (i = 0; i < done; i++) { 112 addr = (void *) xsk_ring_cons__comp_addr(&s->cq, idx++); 113 s->pool[s->n_pool++] = *addr; 114 s->outstanding_tx--; 115 } 116 117 if (done) { 118 xsk_ring_cons__release(&s->cq, done); 119 } 120 } 121 122 /* 123 * The fd_write() callback, invoked if the fd is marked as writable 124 * after a poll. 125 */ 126 static void af_xdp_writable(void *opaque) 127 { 128 AFXDPState *s = opaque; 129 130 /* Try to recover buffers that are already sent. */ 131 af_xdp_complete_tx(s); 132 133 /* 134 * Unregister the handler, unless we still have packets to transmit 135 * and kernel needs a wake up. 136 */ 137 if (!s->outstanding_tx || !xsk_ring_prod__needs_wakeup(&s->tx)) { 138 af_xdp_write_poll(s, false); 139 } 140 141 /* Flush any buffered packets. */ 142 qemu_flush_queued_packets(&s->nc); 143 } 144 145 static ssize_t af_xdp_receive(NetClientState *nc, 146 const uint8_t *buf, size_t size) 147 { 148 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc); 149 struct xdp_desc *desc; 150 uint32_t idx; 151 void *data; 152 153 /* Try to recover buffers that are already sent. */ 154 af_xdp_complete_tx(s); 155 156 if (size > XSK_UMEM__DEFAULT_FRAME_SIZE) { 157 /* We can't transmit packet this size... */ 158 return size; 159 } 160 161 if (!s->n_pool || !xsk_ring_prod__reserve(&s->tx, 1, &idx)) { 162 /* 163 * Out of buffers or space in tx ring. Poll until we can write. 164 * This will also kick the Tx, if it was waiting on CQ. 165 */ 166 af_xdp_write_poll(s, true); 167 return 0; 168 } 169 170 desc = xsk_ring_prod__tx_desc(&s->tx, idx); 171 desc->addr = s->pool[--s->n_pool]; 172 desc->len = size; 173 174 data = xsk_umem__get_data(s->buffer, desc->addr); 175 memcpy(data, buf, size); 176 177 xsk_ring_prod__submit(&s->tx, 1); 178 s->outstanding_tx++; 179 180 if (xsk_ring_prod__needs_wakeup(&s->tx)) { 181 af_xdp_write_poll(s, true); 182 } 183 184 return size; 185 } 186 187 /* 188 * Complete a previous send (backend --> guest) and enable the 189 * fd_read callback. 190 */ 191 static void af_xdp_send_completed(NetClientState *nc, ssize_t len) 192 { 193 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc); 194 195 af_xdp_read_poll(s, true); 196 } 197 198 static void af_xdp_fq_refill(AFXDPState *s, uint32_t n) 199 { 200 uint32_t i, idx = 0; 201 202 /* Leave one packet for Tx, just in case. */ 203 if (s->n_pool < n + 1) { 204 n = s->n_pool; 205 } 206 207 if (!n || !xsk_ring_prod__reserve(&s->fq, n, &idx)) { 208 return; 209 } 210 211 for (i = 0; i < n; i++) { 212 *xsk_ring_prod__fill_addr(&s->fq, idx++) = s->pool[--s->n_pool]; 213 } 214 xsk_ring_prod__submit(&s->fq, n); 215 216 if (xsk_ring_prod__needs_wakeup(&s->fq)) { 217 /* Receive was blocked by not having enough buffers. Wake it up. */ 218 af_xdp_read_poll(s, true); 219 } 220 } 221 222 static void af_xdp_send(void *opaque) 223 { 224 uint32_t i, n_rx, idx = 0; 225 AFXDPState *s = opaque; 226 227 n_rx = xsk_ring_cons__peek(&s->rx, AF_XDP_BATCH_SIZE, &idx); 228 if (!n_rx) { 229 return; 230 } 231 232 for (i = 0; i < n_rx; i++) { 233 const struct xdp_desc *desc; 234 struct iovec iov; 235 236 desc = xsk_ring_cons__rx_desc(&s->rx, idx++); 237 238 iov.iov_base = xsk_umem__get_data(s->buffer, desc->addr); 239 iov.iov_len = desc->len; 240 241 s->pool[s->n_pool++] = desc->addr; 242 243 if (!qemu_sendv_packet_async(&s->nc, &iov, 1, 244 af_xdp_send_completed)) { 245 /* 246 * The peer does not receive anymore. Packet is queued, stop 247 * reading from the backend until af_xdp_send_completed(). 248 */ 249 af_xdp_read_poll(s, false); 250 251 /* Return unused descriptors to not break the ring cache. */ 252 xsk_ring_cons__cancel(&s->rx, n_rx - i - 1); 253 n_rx = i + 1; 254 break; 255 } 256 } 257 258 /* Release actually sent descriptors and try to re-fill. */ 259 xsk_ring_cons__release(&s->rx, n_rx); 260 af_xdp_fq_refill(s, AF_XDP_BATCH_SIZE); 261 } 262 263 /* Flush and close. */ 264 static void af_xdp_cleanup(NetClientState *nc) 265 { 266 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc); 267 int idx; 268 269 qemu_purge_queued_packets(nc); 270 271 af_xdp_poll(nc, false); 272 273 xsk_socket__delete(s->xsk); 274 s->xsk = NULL; 275 g_free(s->pool); 276 s->pool = NULL; 277 xsk_umem__delete(s->umem); 278 s->umem = NULL; 279 qemu_vfree(s->buffer); 280 s->buffer = NULL; 281 282 if (s->map_fd >= 0) { 283 idx = nc->queue_index + s->map_start_index; 284 if (bpf_map_delete_elem(s->map_fd, &idx)) { 285 fprintf(stderr, "af-xdp: unable to remove AF_XDP socket from map" 286 " %s\n", s->map_path); 287 } 288 close(s->map_fd); 289 s->map_fd = -1; 290 } 291 g_free(s->map_path); 292 s->map_path = NULL; 293 } 294 295 static int af_xdp_umem_create(AFXDPState *s, int sock_fd, Error **errp) 296 { 297 struct xsk_umem_config config = { 298 .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, 299 .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, 300 .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE, 301 .frame_headroom = 0, 302 }; 303 uint64_t n_descs; 304 uint64_t size; 305 int64_t i; 306 int ret; 307 308 /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */ 309 n_descs = (XSK_RING_PROD__DEFAULT_NUM_DESCS 310 + XSK_RING_CONS__DEFAULT_NUM_DESCS) * 2; 311 size = n_descs * XSK_UMEM__DEFAULT_FRAME_SIZE; 312 313 s->buffer = qemu_memalign(qemu_real_host_page_size(), size); 314 memset(s->buffer, 0, size); 315 316 if (sock_fd < 0) { 317 ret = xsk_umem__create(&s->umem, s->buffer, size, 318 &s->fq, &s->cq, &config); 319 } else { 320 ret = xsk_umem__create_with_fd(&s->umem, sock_fd, s->buffer, size, 321 &s->fq, &s->cq, &config); 322 } 323 324 if (ret) { 325 qemu_vfree(s->buffer); 326 error_setg_errno(errp, errno, 327 "failed to create umem for %s queue_index: %d", 328 s->ifname, s->nc.queue_index); 329 return -1; 330 } 331 332 s->pool = g_new(uint64_t, n_descs); 333 /* Fill the pool in the opposite order, because it's a LIFO queue. */ 334 for (i = n_descs - 1; i >= 0; i--) { 335 s->pool[i] = i * XSK_UMEM__DEFAULT_FRAME_SIZE; 336 } 337 s->n_pool = n_descs; 338 339 af_xdp_fq_refill(s, XSK_RING_PROD__DEFAULT_NUM_DESCS); 340 341 return 0; 342 } 343 344 static int af_xdp_socket_create(AFXDPState *s, 345 const NetdevAFXDPOptions *opts, Error **errp) 346 { 347 struct xsk_socket_config cfg = { 348 .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, 349 .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, 350 .libxdp_flags = 0, 351 .bind_flags = XDP_USE_NEED_WAKEUP, 352 .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST, 353 }; 354 int queue_id, error = 0; 355 356 if (s->inhibit) { 357 cfg.libxdp_flags |= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD; 358 } 359 360 if (opts->has_force_copy && opts->force_copy) { 361 cfg.bind_flags |= XDP_COPY; 362 } 363 364 queue_id = s->nc.queue_index; 365 if (opts->has_start_queue && opts->start_queue > 0) { 366 queue_id += opts->start_queue; 367 } 368 369 if (opts->has_mode) { 370 /* Specific mode requested. */ 371 cfg.xdp_flags |= (opts->mode == AFXDP_MODE_NATIVE) 372 ? XDP_FLAGS_DRV_MODE : XDP_FLAGS_SKB_MODE; 373 if (xsk_socket__create(&s->xsk, s->ifname, queue_id, 374 s->umem, &s->rx, &s->tx, &cfg)) { 375 error = errno; 376 } 377 } else { 378 /* No mode requested, try native first. */ 379 cfg.xdp_flags |= XDP_FLAGS_DRV_MODE; 380 381 if (xsk_socket__create(&s->xsk, s->ifname, queue_id, 382 s->umem, &s->rx, &s->tx, &cfg)) { 383 /* Can't use native mode, try skb. */ 384 cfg.xdp_flags &= ~XDP_FLAGS_DRV_MODE; 385 cfg.xdp_flags |= XDP_FLAGS_SKB_MODE; 386 387 if (xsk_socket__create(&s->xsk, s->ifname, queue_id, 388 s->umem, &s->rx, &s->tx, &cfg)) { 389 error = errno; 390 } 391 } 392 } 393 394 if (error) { 395 error_setg_errno(errp, error, 396 "failed to create AF_XDP socket for %s queue_id: %d", 397 s->ifname, queue_id); 398 return -1; 399 } 400 401 s->xdp_flags = cfg.xdp_flags; 402 403 return 0; 404 } 405 406 static int af_xdp_update_xsk_map(AFXDPState *s, Error **errp) 407 { 408 int xsk_fd, idx, error = 0; 409 410 if (!s->map_path) { 411 return 0; 412 } 413 414 s->map_fd = bpf_obj_get(s->map_path); 415 if (s->map_fd < 0) { 416 error = errno; 417 } else { 418 xsk_fd = xsk_socket__fd(s->xsk); 419 idx = s->nc.queue_index + s->map_start_index; 420 if (bpf_map_update_elem(s->map_fd, &idx, &xsk_fd, 0)) { 421 error = errno; 422 } 423 } 424 425 if (error) { 426 error_setg_errno(errp, error, 427 "failed to insert AF_XDP socket into map %s", 428 s->map_path); 429 return -1; 430 } 431 432 return 0; 433 } 434 435 /* NetClientInfo methods. */ 436 static NetClientInfo net_af_xdp_info = { 437 .type = NET_CLIENT_DRIVER_AF_XDP, 438 .size = sizeof(AFXDPState), 439 .receive = af_xdp_receive, 440 .poll = af_xdp_poll, 441 .cleanup = af_xdp_cleanup, 442 }; 443 444 static int *parse_socket_fds(const char *sock_fds_str, 445 int64_t n_expected, Error **errp) 446 { 447 gchar **substrings = g_strsplit(sock_fds_str, ":", -1); 448 int64_t i, n_sock_fds = g_strv_length(substrings); 449 int *sock_fds = NULL; 450 451 if (n_sock_fds != n_expected) { 452 error_setg(errp, "expected %"PRIi64" socket fds, got %"PRIi64, 453 n_expected, n_sock_fds); 454 goto exit; 455 } 456 457 sock_fds = g_new(int, n_sock_fds); 458 459 for (i = 0; i < n_sock_fds; i++) { 460 sock_fds[i] = monitor_fd_param(monitor_cur(), substrings[i], errp); 461 if (sock_fds[i] < 0) { 462 g_free(sock_fds); 463 sock_fds = NULL; 464 goto exit; 465 } 466 } 467 468 exit: 469 g_strfreev(substrings); 470 return sock_fds; 471 } 472 473 /* 474 * The exported init function. 475 * 476 * ... -netdev af-xdp,ifname="..." 477 */ 478 int net_init_af_xdp(const Netdev *netdev, 479 const char *name, NetClientState *peer, Error **errp) 480 { 481 const NetdevAFXDPOptions *opts = &netdev->u.af_xdp; 482 NetClientState *nc, *nc0 = NULL; 483 int32_t map_start_index; 484 unsigned int ifindex; 485 uint32_t prog_id = 0; 486 g_autofree int *sock_fds = NULL; 487 int64_t i, queues; 488 Error *err = NULL; 489 AFXDPState *s; 490 bool inhibit; 491 492 ifindex = if_nametoindex(opts->ifname); 493 if (!ifindex) { 494 error_setg_errno(errp, errno, "failed to get ifindex for '%s'", 495 opts->ifname); 496 return -1; 497 } 498 499 queues = opts->has_queues ? opts->queues : 1; 500 if (queues < 1) { 501 error_setg(errp, "invalid number of queues (%" PRIi64 ") for '%s'", 502 queues, opts->ifname); 503 return -1; 504 } 505 506 inhibit = opts->has_inhibit && opts->inhibit; 507 if (inhibit && !opts->sock_fds && !opts->map_path) { 508 error_setg(errp, "'inhibit=on' requires 'sock-fds' or 'map-path'"); 509 return -1; 510 } 511 if (!inhibit && (opts->sock_fds || opts->map_path)) { 512 error_setg(errp, "'sock-fds' and 'map-path' require 'inhibit=on'"); 513 return -1; 514 } 515 if (opts->sock_fds && opts->map_path) { 516 error_setg(errp, "'sock-fds' and 'map-path' are mutually exclusive"); 517 return -1; 518 } 519 if (!opts->map_path && opts->has_map_start_index) { 520 error_setg(errp, "'map-start-index' requires 'map-path'"); 521 return -1; 522 } 523 524 map_start_index = opts->has_map_start_index ? opts->map_start_index : 0; 525 if (map_start_index < 0) { 526 error_setg(errp, "'map-start-index' cannot be negative (%d)", 527 map_start_index); 528 return -1; 529 } 530 531 if (opts->sock_fds) { 532 sock_fds = parse_socket_fds(opts->sock_fds, queues, errp); 533 if (!sock_fds) { 534 return -1; 535 } 536 } 537 538 for (i = 0; i < queues; i++) { 539 nc = qemu_new_net_client(&net_af_xdp_info, peer, "af-xdp", name); 540 qemu_set_info_str(nc, "af-xdp%"PRIi64" to %s", i, opts->ifname); 541 nc->queue_index = i; 542 543 if (!nc0) { 544 nc0 = nc; 545 } 546 547 s = DO_UPCAST(AFXDPState, nc, nc); 548 549 pstrcpy(s->ifname, sizeof(s->ifname), opts->ifname); 550 s->ifindex = ifindex; 551 s->inhibit = inhibit; 552 553 s->map_path = g_strdup(opts->map_path); 554 s->map_start_index = map_start_index; 555 s->map_fd = -1; 556 557 if (af_xdp_umem_create(s, sock_fds ? sock_fds[i] : -1, &err) || 558 af_xdp_socket_create(s, opts, &err) || 559 af_xdp_update_xsk_map(s, &err)) { 560 goto err; 561 } 562 } 563 564 if (nc0 && !inhibit) { 565 s = DO_UPCAST(AFXDPState, nc, nc0); 566 if (bpf_xdp_query_id(s->ifindex, s->xdp_flags, &prog_id) || !prog_id) { 567 error_setg_errno(&err, errno, 568 "no XDP program loaded on '%s', ifindex: %d", 569 s->ifname, s->ifindex); 570 goto err; 571 } 572 } 573 574 af_xdp_read_poll(s, true); /* Initially only poll for reads. */ 575 576 return 0; 577 578 err: 579 if (nc0) { 580 qemu_del_net_client(nc0); 581 error_propagate(errp, err); 582 } 583 584 return -1; 585 } 586