1 /* 2 * AF_XDP network backend. 3 * 4 * Copyright (c) 2023 Red Hat, Inc. 5 * 6 * Authors: 7 * Ilya Maximets <i.maximets@ovn.org> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 14 #include "qemu/osdep.h" 15 #include <bpf/bpf.h> 16 #include <inttypes.h> 17 #include <linux/if_link.h> 18 #include <linux/if_xdp.h> 19 #include <net/if.h> 20 #include <xdp/xsk.h> 21 22 #include "clients.h" 23 #include "monitor/monitor.h" 24 #include "net/net.h" 25 #include "qapi/error.h" 26 #include "qemu/cutils.h" 27 #include "qemu/error-report.h" 28 #include "qemu/iov.h" 29 #include "qemu/main-loop.h" 30 #include "qemu/memalign.h" 31 32 33 typedef struct AFXDPState { 34 NetClientState nc; 35 36 struct xsk_socket *xsk; 37 struct xsk_ring_cons rx; 38 struct xsk_ring_prod tx; 39 struct xsk_ring_cons cq; 40 struct xsk_ring_prod fq; 41 42 char ifname[IFNAMSIZ]; 43 int ifindex; 44 bool read_poll; 45 bool write_poll; 46 uint32_t outstanding_tx; 47 48 uint64_t *pool; 49 uint32_t n_pool; 50 char *buffer; 51 struct xsk_umem *umem; 52 53 uint32_t n_queues; 54 uint32_t xdp_flags; 55 bool inhibit; 56 } AFXDPState; 57 58 #define AF_XDP_BATCH_SIZE 64 59 60 static void af_xdp_send(void *opaque); 61 static void af_xdp_writable(void *opaque); 62 63 /* Set the event-loop handlers for the af-xdp backend. */ 64 static void af_xdp_update_fd_handler(AFXDPState *s) 65 { 66 qemu_set_fd_handler(xsk_socket__fd(s->xsk), 67 s->read_poll ? af_xdp_send : NULL, 68 s->write_poll ? af_xdp_writable : NULL, 69 s); 70 } 71 72 /* Update the read handler. */ 73 static void af_xdp_read_poll(AFXDPState *s, bool enable) 74 { 75 if (s->read_poll != enable) { 76 s->read_poll = enable; 77 af_xdp_update_fd_handler(s); 78 } 79 } 80 81 /* Update the write handler. */ 82 static void af_xdp_write_poll(AFXDPState *s, bool enable) 83 { 84 if (s->write_poll != enable) { 85 s->write_poll = enable; 86 af_xdp_update_fd_handler(s); 87 } 88 } 89 90 static void af_xdp_poll(NetClientState *nc, bool enable) 91 { 92 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc); 93 94 if (s->read_poll != enable || s->write_poll != enable) { 95 s->write_poll = enable; 96 s->read_poll = enable; 97 af_xdp_update_fd_handler(s); 98 } 99 } 100 101 static void af_xdp_complete_tx(AFXDPState *s) 102 { 103 uint32_t idx = 0; 104 uint32_t done, i; 105 uint64_t *addr; 106 107 done = xsk_ring_cons__peek(&s->cq, XSK_RING_CONS__DEFAULT_NUM_DESCS, &idx); 108 109 for (i = 0; i < done; i++) { 110 addr = (void *) xsk_ring_cons__comp_addr(&s->cq, idx++); 111 s->pool[s->n_pool++] = *addr; 112 s->outstanding_tx--; 113 } 114 115 if (done) { 116 xsk_ring_cons__release(&s->cq, done); 117 } 118 } 119 120 /* 121 * The fd_write() callback, invoked if the fd is marked as writable 122 * after a poll. 123 */ 124 static void af_xdp_writable(void *opaque) 125 { 126 AFXDPState *s = opaque; 127 128 /* Try to recover buffers that are already sent. */ 129 af_xdp_complete_tx(s); 130 131 /* 132 * Unregister the handler, unless we still have packets to transmit 133 * and kernel needs a wake up. 134 */ 135 if (!s->outstanding_tx || !xsk_ring_prod__needs_wakeup(&s->tx)) { 136 af_xdp_write_poll(s, false); 137 } 138 139 /* Flush any buffered packets. */ 140 qemu_flush_queued_packets(&s->nc); 141 } 142 143 static ssize_t af_xdp_receive(NetClientState *nc, 144 const uint8_t *buf, size_t size) 145 { 146 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc); 147 struct xdp_desc *desc; 148 uint32_t idx; 149 void *data; 150 151 /* Try to recover buffers that are already sent. */ 152 af_xdp_complete_tx(s); 153 154 if (size > XSK_UMEM__DEFAULT_FRAME_SIZE) { 155 /* We can't transmit packet this size... */ 156 return size; 157 } 158 159 if (!s->n_pool || !xsk_ring_prod__reserve(&s->tx, 1, &idx)) { 160 /* 161 * Out of buffers or space in tx ring. Poll until we can write. 162 * This will also kick the Tx, if it was waiting on CQ. 163 */ 164 af_xdp_write_poll(s, true); 165 return 0; 166 } 167 168 desc = xsk_ring_prod__tx_desc(&s->tx, idx); 169 desc->addr = s->pool[--s->n_pool]; 170 desc->len = size; 171 172 data = xsk_umem__get_data(s->buffer, desc->addr); 173 memcpy(data, buf, size); 174 175 xsk_ring_prod__submit(&s->tx, 1); 176 s->outstanding_tx++; 177 178 if (xsk_ring_prod__needs_wakeup(&s->tx)) { 179 af_xdp_write_poll(s, true); 180 } 181 182 return size; 183 } 184 185 /* 186 * Complete a previous send (backend --> guest) and enable the 187 * fd_read callback. 188 */ 189 static void af_xdp_send_completed(NetClientState *nc, ssize_t len) 190 { 191 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc); 192 193 af_xdp_read_poll(s, true); 194 } 195 196 static void af_xdp_fq_refill(AFXDPState *s, uint32_t n) 197 { 198 uint32_t i, idx = 0; 199 200 /* Leave one packet for Tx, just in case. */ 201 if (s->n_pool < n + 1) { 202 n = s->n_pool; 203 } 204 205 if (!n || !xsk_ring_prod__reserve(&s->fq, n, &idx)) { 206 return; 207 } 208 209 for (i = 0; i < n; i++) { 210 *xsk_ring_prod__fill_addr(&s->fq, idx++) = s->pool[--s->n_pool]; 211 } 212 xsk_ring_prod__submit(&s->fq, n); 213 214 if (xsk_ring_prod__needs_wakeup(&s->fq)) { 215 /* Receive was blocked by not having enough buffers. Wake it up. */ 216 af_xdp_read_poll(s, true); 217 } 218 } 219 220 static void af_xdp_send(void *opaque) 221 { 222 uint32_t i, n_rx, idx = 0; 223 AFXDPState *s = opaque; 224 225 n_rx = xsk_ring_cons__peek(&s->rx, AF_XDP_BATCH_SIZE, &idx); 226 if (!n_rx) { 227 return; 228 } 229 230 for (i = 0; i < n_rx; i++) { 231 const struct xdp_desc *desc; 232 struct iovec iov; 233 234 desc = xsk_ring_cons__rx_desc(&s->rx, idx++); 235 236 iov.iov_base = xsk_umem__get_data(s->buffer, desc->addr); 237 iov.iov_len = desc->len; 238 239 s->pool[s->n_pool++] = desc->addr; 240 241 if (!qemu_sendv_packet_async(&s->nc, &iov, 1, 242 af_xdp_send_completed)) { 243 /* 244 * The peer does not receive anymore. Packet is queued, stop 245 * reading from the backend until af_xdp_send_completed(). 246 */ 247 af_xdp_read_poll(s, false); 248 249 /* Return unused descriptors to not break the ring cache. */ 250 xsk_ring_cons__cancel(&s->rx, n_rx - i - 1); 251 n_rx = i + 1; 252 break; 253 } 254 } 255 256 /* Release actually sent descriptors and try to re-fill. */ 257 xsk_ring_cons__release(&s->rx, n_rx); 258 af_xdp_fq_refill(s, AF_XDP_BATCH_SIZE); 259 } 260 261 /* Flush and close. */ 262 static void af_xdp_cleanup(NetClientState *nc) 263 { 264 AFXDPState *s = DO_UPCAST(AFXDPState, nc, nc); 265 266 qemu_purge_queued_packets(nc); 267 268 af_xdp_poll(nc, false); 269 270 xsk_socket__delete(s->xsk); 271 s->xsk = NULL; 272 g_free(s->pool); 273 s->pool = NULL; 274 xsk_umem__delete(s->umem); 275 s->umem = NULL; 276 qemu_vfree(s->buffer); 277 s->buffer = NULL; 278 279 /* Remove the program if it's the last open queue. */ 280 if (!s->inhibit && nc->queue_index == s->n_queues - 1 && s->xdp_flags 281 && bpf_xdp_detach(s->ifindex, s->xdp_flags, NULL) != 0) { 282 fprintf(stderr, 283 "af-xdp: unable to remove XDP program from '%s', ifindex: %d\n", 284 s->ifname, s->ifindex); 285 } 286 } 287 288 static int af_xdp_umem_create(AFXDPState *s, int sock_fd, Error **errp) 289 { 290 struct xsk_umem_config config = { 291 .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, 292 .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, 293 .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE, 294 .frame_headroom = 0, 295 }; 296 uint64_t n_descs; 297 uint64_t size; 298 int64_t i; 299 int ret; 300 301 /* Number of descriptors if all 4 queues (rx, tx, cq, fq) are full. */ 302 n_descs = (XSK_RING_PROD__DEFAULT_NUM_DESCS 303 + XSK_RING_CONS__DEFAULT_NUM_DESCS) * 2; 304 size = n_descs * XSK_UMEM__DEFAULT_FRAME_SIZE; 305 306 s->buffer = qemu_memalign(qemu_real_host_page_size(), size); 307 memset(s->buffer, 0, size); 308 309 if (sock_fd < 0) { 310 ret = xsk_umem__create(&s->umem, s->buffer, size, 311 &s->fq, &s->cq, &config); 312 } else { 313 ret = xsk_umem__create_with_fd(&s->umem, sock_fd, s->buffer, size, 314 &s->fq, &s->cq, &config); 315 } 316 317 if (ret) { 318 qemu_vfree(s->buffer); 319 error_setg_errno(errp, errno, 320 "failed to create umem for %s queue_index: %d", 321 s->ifname, s->nc.queue_index); 322 return -1; 323 } 324 325 s->pool = g_new(uint64_t, n_descs); 326 /* Fill the pool in the opposite order, because it's a LIFO queue. */ 327 for (i = n_descs; i >= 0; i--) { 328 s->pool[i] = i * XSK_UMEM__DEFAULT_FRAME_SIZE; 329 } 330 s->n_pool = n_descs; 331 332 af_xdp_fq_refill(s, XSK_RING_PROD__DEFAULT_NUM_DESCS); 333 334 return 0; 335 } 336 337 static int af_xdp_socket_create(AFXDPState *s, 338 const NetdevAFXDPOptions *opts, Error **errp) 339 { 340 struct xsk_socket_config cfg = { 341 .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS, 342 .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS, 343 .libxdp_flags = 0, 344 .bind_flags = XDP_USE_NEED_WAKEUP, 345 .xdp_flags = XDP_FLAGS_UPDATE_IF_NOEXIST, 346 }; 347 int queue_id, error = 0; 348 349 s->inhibit = opts->has_inhibit && opts->inhibit; 350 if (s->inhibit) { 351 cfg.libxdp_flags |= XSK_LIBXDP_FLAGS__INHIBIT_PROG_LOAD; 352 } 353 354 if (opts->has_force_copy && opts->force_copy) { 355 cfg.bind_flags |= XDP_COPY; 356 } 357 358 queue_id = s->nc.queue_index; 359 if (opts->has_start_queue && opts->start_queue > 0) { 360 queue_id += opts->start_queue; 361 } 362 363 if (opts->has_mode) { 364 /* Specific mode requested. */ 365 cfg.xdp_flags |= (opts->mode == AFXDP_MODE_NATIVE) 366 ? XDP_FLAGS_DRV_MODE : XDP_FLAGS_SKB_MODE; 367 if (xsk_socket__create(&s->xsk, s->ifname, queue_id, 368 s->umem, &s->rx, &s->tx, &cfg)) { 369 error = errno; 370 } 371 } else { 372 /* No mode requested, try native first. */ 373 cfg.xdp_flags |= XDP_FLAGS_DRV_MODE; 374 375 if (xsk_socket__create(&s->xsk, s->ifname, queue_id, 376 s->umem, &s->rx, &s->tx, &cfg)) { 377 /* Can't use native mode, try skb. */ 378 cfg.xdp_flags &= ~XDP_FLAGS_DRV_MODE; 379 cfg.xdp_flags |= XDP_FLAGS_SKB_MODE; 380 381 if (xsk_socket__create(&s->xsk, s->ifname, queue_id, 382 s->umem, &s->rx, &s->tx, &cfg)) { 383 error = errno; 384 } 385 } 386 } 387 388 if (error) { 389 error_setg_errno(errp, error, 390 "failed to create AF_XDP socket for %s queue_id: %d", 391 s->ifname, queue_id); 392 return -1; 393 } 394 395 s->xdp_flags = cfg.xdp_flags; 396 397 return 0; 398 } 399 400 /* NetClientInfo methods. */ 401 static NetClientInfo net_af_xdp_info = { 402 .type = NET_CLIENT_DRIVER_AF_XDP, 403 .size = sizeof(AFXDPState), 404 .receive = af_xdp_receive, 405 .poll = af_xdp_poll, 406 .cleanup = af_xdp_cleanup, 407 }; 408 409 static int *parse_socket_fds(const char *sock_fds_str, 410 int64_t n_expected, Error **errp) 411 { 412 gchar **substrings = g_strsplit(sock_fds_str, ":", -1); 413 int64_t i, n_sock_fds = g_strv_length(substrings); 414 int *sock_fds = NULL; 415 416 if (n_sock_fds != n_expected) { 417 error_setg(errp, "expected %"PRIi64" socket fds, got %"PRIi64, 418 n_expected, n_sock_fds); 419 goto exit; 420 } 421 422 sock_fds = g_new(int, n_sock_fds); 423 424 for (i = 0; i < n_sock_fds; i++) { 425 sock_fds[i] = monitor_fd_param(monitor_cur(), substrings[i], errp); 426 if (sock_fds[i] < 0) { 427 g_free(sock_fds); 428 sock_fds = NULL; 429 goto exit; 430 } 431 } 432 433 exit: 434 g_strfreev(substrings); 435 return sock_fds; 436 } 437 438 /* 439 * The exported init function. 440 * 441 * ... -netdev af-xdp,ifname="..." 442 */ 443 int net_init_af_xdp(const Netdev *netdev, 444 const char *name, NetClientState *peer, Error **errp) 445 { 446 const NetdevAFXDPOptions *opts = &netdev->u.af_xdp; 447 NetClientState *nc, *nc0 = NULL; 448 unsigned int ifindex; 449 uint32_t prog_id = 0; 450 int *sock_fds = NULL; 451 int64_t i, queues; 452 Error *err = NULL; 453 AFXDPState *s; 454 455 ifindex = if_nametoindex(opts->ifname); 456 if (!ifindex) { 457 error_setg_errno(errp, errno, "failed to get ifindex for '%s'", 458 opts->ifname); 459 return -1; 460 } 461 462 queues = opts->has_queues ? opts->queues : 1; 463 if (queues < 1) { 464 error_setg(errp, "invalid number of queues (%" PRIi64 ") for '%s'", 465 queues, opts->ifname); 466 return -1; 467 } 468 469 if ((opts->has_inhibit && opts->inhibit) != !!opts->sock_fds) { 470 error_setg(errp, "'inhibit=on' requires 'sock-fds' and vice versa"); 471 return -1; 472 } 473 474 if (opts->sock_fds) { 475 sock_fds = parse_socket_fds(opts->sock_fds, queues, errp); 476 if (!sock_fds) { 477 return -1; 478 } 479 } 480 481 for (i = 0; i < queues; i++) { 482 nc = qemu_new_net_client(&net_af_xdp_info, peer, "af-xdp", name); 483 qemu_set_info_str(nc, "af-xdp%"PRIi64" to %s", i, opts->ifname); 484 nc->queue_index = i; 485 486 if (!nc0) { 487 nc0 = nc; 488 } 489 490 s = DO_UPCAST(AFXDPState, nc, nc); 491 492 pstrcpy(s->ifname, sizeof(s->ifname), opts->ifname); 493 s->ifindex = ifindex; 494 s->n_queues = queues; 495 496 if (af_xdp_umem_create(s, sock_fds ? sock_fds[i] : -1, errp) 497 || af_xdp_socket_create(s, opts, errp)) { 498 /* Make sure the XDP program will be removed. */ 499 s->n_queues = i; 500 error_propagate(errp, err); 501 goto err; 502 } 503 } 504 505 if (nc0) { 506 s = DO_UPCAST(AFXDPState, nc, nc0); 507 if (bpf_xdp_query_id(s->ifindex, s->xdp_flags, &prog_id) || !prog_id) { 508 error_setg_errno(errp, errno, 509 "no XDP program loaded on '%s', ifindex: %d", 510 s->ifname, s->ifindex); 511 goto err; 512 } 513 } 514 515 af_xdp_read_poll(s, true); /* Initially only poll for reads. */ 516 517 return 0; 518 519 err: 520 g_free(sock_fds); 521 if (nc0) { 522 qemu_del_net_client(nc0); 523 } 524 525 return -1; 526 } 527