1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * Copyright Red Hat, Inc. 2015-2016 6 * 7 * Authors: 8 * Michael R. Hines <mrhines@us.ibm.com> 9 * Jiuxing Liu <jl@us.ibm.com> 10 * Daniel P. Berrange <berrange@redhat.com> 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or 13 * later. See the COPYING file in the top-level directory. 14 * 15 */ 16 17 #include "qemu/osdep.h" 18 #include "qapi/error.h" 19 #include "qemu/cutils.h" 20 #include "exec/target_page.h" 21 #include "rdma.h" 22 #include "migration.h" 23 #include "migration-stats.h" 24 #include "qemu-file.h" 25 #include "ram.h" 26 #include "qemu/error-report.h" 27 #include "qemu/main-loop.h" 28 #include "qemu/module.h" 29 #include "qemu/rcu.h" 30 #include "qemu/sockets.h" 31 #include "qemu/bitmap.h" 32 #include "qemu/coroutine.h" 33 #include "exec/memory.h" 34 #include <sys/socket.h> 35 #include <netdb.h> 36 #include <arpa/inet.h> 37 #include <rdma/rdma_cma.h> 38 #include "trace.h" 39 #include "qom/object.h" 40 #include "options.h" 41 #include <poll.h> 42 43 #define RDMA_RESOLVE_TIMEOUT_MS 10000 44 45 /* Do not merge data if larger than this. */ 46 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 47 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 48 49 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 50 51 /* 52 * This is only for non-live state being migrated. 53 * Instead of RDMA_WRITE messages, we use RDMA_SEND 54 * messages for that state, which requires a different 55 * delivery design than main memory. 56 */ 57 #define RDMA_SEND_INCREMENT 32768 58 59 /* 60 * Maximum size infiniband SEND message 61 */ 62 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 63 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 64 65 #define RDMA_CONTROL_VERSION_CURRENT 1 66 /* 67 * Capabilities for negotiation. 68 */ 69 #define RDMA_CAPABILITY_PIN_ALL 0x01 70 71 /* 72 * Add the other flags above to this list of known capabilities 73 * as they are introduced. 74 */ 75 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 76 77 /* 78 * A work request ID is 64-bits and we split up these bits 79 * into 3 parts: 80 * 81 * bits 0-15 : type of control message, 2^16 82 * bits 16-29: ram block index, 2^14 83 * bits 30-63: ram block chunk number, 2^34 84 * 85 * The last two bit ranges are only used for RDMA writes, 86 * in order to track their completion and potentially 87 * also track unregistration status of the message. 88 */ 89 #define RDMA_WRID_TYPE_SHIFT 0UL 90 #define RDMA_WRID_BLOCK_SHIFT 16UL 91 #define RDMA_WRID_CHUNK_SHIFT 30UL 92 93 #define RDMA_WRID_TYPE_MASK \ 94 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 95 96 #define RDMA_WRID_BLOCK_MASK \ 97 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 98 99 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 100 101 /* 102 * RDMA migration protocol: 103 * 1. RDMA Writes (data messages, i.e. RAM) 104 * 2. IB Send/Recv (control channel messages) 105 */ 106 enum { 107 RDMA_WRID_NONE = 0, 108 RDMA_WRID_RDMA_WRITE = 1, 109 RDMA_WRID_SEND_CONTROL = 2000, 110 RDMA_WRID_RECV_CONTROL = 4000, 111 }; 112 113 /* 114 * Work request IDs for IB SEND messages only (not RDMA writes). 115 * This is used by the migration protocol to transmit 116 * control messages (such as device state and registration commands) 117 * 118 * We could use more WRs, but we have enough for now. 119 */ 120 enum { 121 RDMA_WRID_READY = 0, 122 RDMA_WRID_DATA, 123 RDMA_WRID_CONTROL, 124 RDMA_WRID_MAX, 125 }; 126 127 /* 128 * SEND/RECV IB Control Messages. 129 */ 130 enum { 131 RDMA_CONTROL_NONE = 0, 132 RDMA_CONTROL_ERROR, 133 RDMA_CONTROL_READY, /* ready to receive */ 134 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 135 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 136 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 137 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 138 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 139 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 140 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 141 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 142 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 143 }; 144 145 146 /* 147 * Memory and MR structures used to represent an IB Send/Recv work request. 148 * This is *not* used for RDMA writes, only IB Send/Recv. 149 */ 150 typedef struct { 151 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 152 struct ibv_mr *control_mr; /* registration metadata */ 153 size_t control_len; /* length of the message */ 154 uint8_t *control_curr; /* start of unconsumed bytes */ 155 } RDMAWorkRequestData; 156 157 /* 158 * Negotiate RDMA capabilities during connection-setup time. 159 */ 160 typedef struct { 161 uint32_t version; 162 uint32_t flags; 163 } RDMACapabilities; 164 165 static void caps_to_network(RDMACapabilities *cap) 166 { 167 cap->version = htonl(cap->version); 168 cap->flags = htonl(cap->flags); 169 } 170 171 static void network_to_caps(RDMACapabilities *cap) 172 { 173 cap->version = ntohl(cap->version); 174 cap->flags = ntohl(cap->flags); 175 } 176 177 /* 178 * Representation of a RAMBlock from an RDMA perspective. 179 * This is not transmitted, only local. 180 * This and subsequent structures cannot be linked lists 181 * because we're using a single IB message to transmit 182 * the information. It's small anyway, so a list is overkill. 183 */ 184 typedef struct RDMALocalBlock { 185 char *block_name; 186 uint8_t *local_host_addr; /* local virtual address */ 187 uint64_t remote_host_addr; /* remote virtual address */ 188 uint64_t offset; 189 uint64_t length; 190 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 191 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 192 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 193 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 194 int index; /* which block are we */ 195 unsigned int src_index; /* (Only used on dest) */ 196 bool is_ram_block; 197 int nb_chunks; 198 unsigned long *transit_bitmap; 199 unsigned long *unregister_bitmap; 200 } RDMALocalBlock; 201 202 /* 203 * Also represents a RAMblock, but only on the dest. 204 * This gets transmitted by the dest during connection-time 205 * to the source VM and then is used to populate the 206 * corresponding RDMALocalBlock with 207 * the information needed to perform the actual RDMA. 208 */ 209 typedef struct QEMU_PACKED RDMADestBlock { 210 uint64_t remote_host_addr; 211 uint64_t offset; 212 uint64_t length; 213 uint32_t remote_rkey; 214 uint32_t padding; 215 } RDMADestBlock; 216 217 static const char *control_desc(unsigned int rdma_control) 218 { 219 static const char *strs[] = { 220 [RDMA_CONTROL_NONE] = "NONE", 221 [RDMA_CONTROL_ERROR] = "ERROR", 222 [RDMA_CONTROL_READY] = "READY", 223 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 224 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 225 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 226 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 227 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 228 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 229 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 230 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 231 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 232 }; 233 234 if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) { 235 return "??BAD CONTROL VALUE??"; 236 } 237 238 return strs[rdma_control]; 239 } 240 241 #if !defined(htonll) 242 static uint64_t htonll(uint64_t v) 243 { 244 union { uint32_t lv[2]; uint64_t llv; } u; 245 u.lv[0] = htonl(v >> 32); 246 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 247 return u.llv; 248 } 249 #endif 250 251 #if !defined(ntohll) 252 static uint64_t ntohll(uint64_t v) 253 { 254 union { uint32_t lv[2]; uint64_t llv; } u; 255 u.llv = v; 256 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 257 } 258 #endif 259 260 static void dest_block_to_network(RDMADestBlock *db) 261 { 262 db->remote_host_addr = htonll(db->remote_host_addr); 263 db->offset = htonll(db->offset); 264 db->length = htonll(db->length); 265 db->remote_rkey = htonl(db->remote_rkey); 266 } 267 268 static void network_to_dest_block(RDMADestBlock *db) 269 { 270 db->remote_host_addr = ntohll(db->remote_host_addr); 271 db->offset = ntohll(db->offset); 272 db->length = ntohll(db->length); 273 db->remote_rkey = ntohl(db->remote_rkey); 274 } 275 276 /* 277 * Virtual address of the above structures used for transmitting 278 * the RAMBlock descriptions at connection-time. 279 * This structure is *not* transmitted. 280 */ 281 typedef struct RDMALocalBlocks { 282 int nb_blocks; 283 bool init; /* main memory init complete */ 284 RDMALocalBlock *block; 285 } RDMALocalBlocks; 286 287 /* 288 * Main data structure for RDMA state. 289 * While there is only one copy of this structure being allocated right now, 290 * this is the place where one would start if you wanted to consider 291 * having more than one RDMA connection open at the same time. 292 */ 293 typedef struct RDMAContext { 294 char *host; 295 int port; 296 297 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 298 299 /* 300 * This is used by *_exchange_send() to figure out whether or not 301 * the initial "READY" message has already been received or not. 302 * This is because other functions may potentially poll() and detect 303 * the READY message before send() does, in which case we need to 304 * know if it completed. 305 */ 306 int control_ready_expected; 307 308 /* number of outstanding writes */ 309 int nb_sent; 310 311 /* store info about current buffer so that we can 312 merge it with future sends */ 313 uint64_t current_addr; 314 uint64_t current_length; 315 /* index of ram block the current buffer belongs to */ 316 int current_index; 317 /* index of the chunk in the current ram block */ 318 int current_chunk; 319 320 bool pin_all; 321 322 /* 323 * infiniband-specific variables for opening the device 324 * and maintaining connection state and so forth. 325 * 326 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 327 * cm_id->verbs, cm_id->channel, and cm_id->qp. 328 */ 329 struct rdma_cm_id *cm_id; /* connection manager ID */ 330 struct rdma_cm_id *listen_id; 331 bool connected; 332 333 struct ibv_context *verbs; 334 struct rdma_event_channel *channel; 335 struct ibv_qp *qp; /* queue pair */ 336 struct ibv_comp_channel *recv_comp_channel; /* recv completion channel */ 337 struct ibv_comp_channel *send_comp_channel; /* send completion channel */ 338 struct ibv_pd *pd; /* protection domain */ 339 struct ibv_cq *recv_cq; /* recvieve completion queue */ 340 struct ibv_cq *send_cq; /* send completion queue */ 341 342 /* 343 * If a previous write failed (perhaps because of a failed 344 * memory registration, then do not attempt any future work 345 * and remember the error state. 346 */ 347 bool errored; 348 bool error_reported; 349 bool received_error; 350 351 /* 352 * Description of ram blocks used throughout the code. 353 */ 354 RDMALocalBlocks local_ram_blocks; 355 RDMADestBlock *dest_blocks; 356 357 /* Index of the next RAMBlock received during block registration */ 358 unsigned int next_src_index; 359 360 /* 361 * Migration on *destination* started. 362 * Then use coroutine yield function. 363 * Source runs in a thread, so we don't care. 364 */ 365 int migration_started_on_destination; 366 367 int total_registrations; 368 int total_writes; 369 370 int unregister_current, unregister_next; 371 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 372 373 GHashTable *blockmap; 374 375 /* the RDMAContext for return path */ 376 struct RDMAContext *return_path; 377 bool is_return_path; 378 } RDMAContext; 379 380 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" 381 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA) 382 383 384 385 struct QIOChannelRDMA { 386 QIOChannel parent; 387 RDMAContext *rdmain; 388 RDMAContext *rdmaout; 389 QEMUFile *file; 390 bool blocking; /* XXX we don't actually honour this yet */ 391 }; 392 393 /* 394 * Main structure for IB Send/Recv control messages. 395 * This gets prepended at the beginning of every Send/Recv. 396 */ 397 typedef struct QEMU_PACKED { 398 uint32_t len; /* Total length of data portion */ 399 uint32_t type; /* which control command to perform */ 400 uint32_t repeat; /* number of commands in data portion of same type */ 401 uint32_t padding; 402 } RDMAControlHeader; 403 404 static void control_to_network(RDMAControlHeader *control) 405 { 406 control->type = htonl(control->type); 407 control->len = htonl(control->len); 408 control->repeat = htonl(control->repeat); 409 } 410 411 static void network_to_control(RDMAControlHeader *control) 412 { 413 control->type = ntohl(control->type); 414 control->len = ntohl(control->len); 415 control->repeat = ntohl(control->repeat); 416 } 417 418 /* 419 * Register a single Chunk. 420 * Information sent by the source VM to inform the dest 421 * to register an single chunk of memory before we can perform 422 * the actual RDMA operation. 423 */ 424 typedef struct QEMU_PACKED { 425 union QEMU_PACKED { 426 uint64_t current_addr; /* offset into the ram_addr_t space */ 427 uint64_t chunk; /* chunk to lookup if unregistering */ 428 } key; 429 uint32_t current_index; /* which ramblock the chunk belongs to */ 430 uint32_t padding; 431 uint64_t chunks; /* how many sequential chunks to register */ 432 } RDMARegister; 433 434 static bool rdma_errored(RDMAContext *rdma) 435 { 436 if (rdma->errored && !rdma->error_reported) { 437 error_report("RDMA is in an error state waiting migration" 438 " to abort!"); 439 rdma->error_reported = true; 440 } 441 return rdma->errored; 442 } 443 444 static void register_to_network(RDMAContext *rdma, RDMARegister *reg) 445 { 446 RDMALocalBlock *local_block; 447 local_block = &rdma->local_ram_blocks.block[reg->current_index]; 448 449 if (local_block->is_ram_block) { 450 /* 451 * current_addr as passed in is an address in the local ram_addr_t 452 * space, we need to translate this for the destination 453 */ 454 reg->key.current_addr -= local_block->offset; 455 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset; 456 } 457 reg->key.current_addr = htonll(reg->key.current_addr); 458 reg->current_index = htonl(reg->current_index); 459 reg->chunks = htonll(reg->chunks); 460 } 461 462 static void network_to_register(RDMARegister *reg) 463 { 464 reg->key.current_addr = ntohll(reg->key.current_addr); 465 reg->current_index = ntohl(reg->current_index); 466 reg->chunks = ntohll(reg->chunks); 467 } 468 469 typedef struct QEMU_PACKED { 470 uint32_t value; /* if zero, we will madvise() */ 471 uint32_t block_idx; /* which ram block index */ 472 uint64_t offset; /* Address in remote ram_addr_t space */ 473 uint64_t length; /* length of the chunk */ 474 } RDMACompress; 475 476 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp) 477 { 478 comp->value = htonl(comp->value); 479 /* 480 * comp->offset as passed in is an address in the local ram_addr_t 481 * space, we need to translate this for the destination 482 */ 483 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset; 484 comp->offset += rdma->dest_blocks[comp->block_idx].offset; 485 comp->block_idx = htonl(comp->block_idx); 486 comp->offset = htonll(comp->offset); 487 comp->length = htonll(comp->length); 488 } 489 490 static void network_to_compress(RDMACompress *comp) 491 { 492 comp->value = ntohl(comp->value); 493 comp->block_idx = ntohl(comp->block_idx); 494 comp->offset = ntohll(comp->offset); 495 comp->length = ntohll(comp->length); 496 } 497 498 /* 499 * The result of the dest's memory registration produces an "rkey" 500 * which the source VM must reference in order to perform 501 * the RDMA operation. 502 */ 503 typedef struct QEMU_PACKED { 504 uint32_t rkey; 505 uint32_t padding; 506 uint64_t host_addr; 507 } RDMARegisterResult; 508 509 static void result_to_network(RDMARegisterResult *result) 510 { 511 result->rkey = htonl(result->rkey); 512 result->host_addr = htonll(result->host_addr); 513 }; 514 515 static void network_to_result(RDMARegisterResult *result) 516 { 517 result->rkey = ntohl(result->rkey); 518 result->host_addr = ntohll(result->host_addr); 519 }; 520 521 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 522 uint8_t *data, RDMAControlHeader *resp, 523 int *resp_idx, 524 int (*callback)(RDMAContext *rdma, 525 Error **errp), 526 Error **errp); 527 528 static inline uint64_t ram_chunk_index(const uint8_t *start, 529 const uint8_t *host) 530 { 531 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 532 } 533 534 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 535 uint64_t i) 536 { 537 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 538 (i << RDMA_REG_CHUNK_SHIFT)); 539 } 540 541 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 542 uint64_t i) 543 { 544 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 545 (1UL << RDMA_REG_CHUNK_SHIFT); 546 547 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 548 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 549 } 550 551 return result; 552 } 553 554 static void rdma_add_block(RDMAContext *rdma, const char *block_name, 555 void *host_addr, 556 ram_addr_t block_offset, uint64_t length) 557 { 558 RDMALocalBlocks *local = &rdma->local_ram_blocks; 559 RDMALocalBlock *block; 560 RDMALocalBlock *old = local->block; 561 562 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1); 563 564 if (local->nb_blocks) { 565 if (rdma->blockmap) { 566 for (int x = 0; x < local->nb_blocks; x++) { 567 g_hash_table_remove(rdma->blockmap, 568 (void *)(uintptr_t)old[x].offset); 569 g_hash_table_insert(rdma->blockmap, 570 (void *)(uintptr_t)old[x].offset, 571 &local->block[x]); 572 } 573 } 574 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 575 g_free(old); 576 } 577 578 block = &local->block[local->nb_blocks]; 579 580 block->block_name = g_strdup(block_name); 581 block->local_host_addr = host_addr; 582 block->offset = block_offset; 583 block->length = length; 584 block->index = local->nb_blocks; 585 block->src_index = ~0U; /* Filled in by the receipt of the block list */ 586 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 587 block->transit_bitmap = bitmap_new(block->nb_chunks); 588 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 589 block->unregister_bitmap = bitmap_new(block->nb_chunks); 590 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 591 block->remote_keys = g_new0(uint32_t, block->nb_chunks); 592 593 block->is_ram_block = local->init ? false : true; 594 595 if (rdma->blockmap) { 596 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block); 597 } 598 599 trace_rdma_add_block(block_name, local->nb_blocks, 600 (uintptr_t) block->local_host_addr, 601 block->offset, block->length, 602 (uintptr_t) (block->local_host_addr + block->length), 603 BITS_TO_LONGS(block->nb_chunks) * 604 sizeof(unsigned long) * 8, 605 block->nb_chunks); 606 607 local->nb_blocks++; 608 } 609 610 /* 611 * Memory regions need to be registered with the device and queue pairs setup 612 * in advanced before the migration starts. This tells us where the RAM blocks 613 * are so that we can register them individually. 614 */ 615 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque) 616 { 617 const char *block_name = qemu_ram_get_idstr(rb); 618 void *host_addr = qemu_ram_get_host_addr(rb); 619 ram_addr_t block_offset = qemu_ram_get_offset(rb); 620 ram_addr_t length = qemu_ram_get_used_length(rb); 621 rdma_add_block(opaque, block_name, host_addr, block_offset, length); 622 return 0; 623 } 624 625 /* 626 * Identify the RAMBlocks and their quantity. They will be references to 627 * identify chunk boundaries inside each RAMBlock and also be referenced 628 * during dynamic page registration. 629 */ 630 static void qemu_rdma_init_ram_blocks(RDMAContext *rdma) 631 { 632 RDMALocalBlocks *local = &rdma->local_ram_blocks; 633 int ret; 634 635 assert(rdma->blockmap == NULL); 636 memset(local, 0, sizeof *local); 637 ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma); 638 assert(!ret); 639 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 640 rdma->dest_blocks = g_new0(RDMADestBlock, 641 rdma->local_ram_blocks.nb_blocks); 642 local->init = true; 643 } 644 645 /* 646 * Note: If used outside of cleanup, the caller must ensure that the destination 647 * block structures are also updated 648 */ 649 static void rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) 650 { 651 RDMALocalBlocks *local = &rdma->local_ram_blocks; 652 RDMALocalBlock *old = local->block; 653 654 if (rdma->blockmap) { 655 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset); 656 } 657 if (block->pmr) { 658 for (int j = 0; j < block->nb_chunks; j++) { 659 if (!block->pmr[j]) { 660 continue; 661 } 662 ibv_dereg_mr(block->pmr[j]); 663 rdma->total_registrations--; 664 } 665 g_free(block->pmr); 666 block->pmr = NULL; 667 } 668 669 if (block->mr) { 670 ibv_dereg_mr(block->mr); 671 rdma->total_registrations--; 672 block->mr = NULL; 673 } 674 675 g_free(block->transit_bitmap); 676 block->transit_bitmap = NULL; 677 678 g_free(block->unregister_bitmap); 679 block->unregister_bitmap = NULL; 680 681 g_free(block->remote_keys); 682 block->remote_keys = NULL; 683 684 g_free(block->block_name); 685 block->block_name = NULL; 686 687 if (rdma->blockmap) { 688 for (int x = 0; x < local->nb_blocks; x++) { 689 g_hash_table_remove(rdma->blockmap, 690 (void *)(uintptr_t)old[x].offset); 691 } 692 } 693 694 if (local->nb_blocks > 1) { 695 696 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1); 697 698 if (block->index) { 699 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 700 } 701 702 if (block->index < (local->nb_blocks - 1)) { 703 memcpy(local->block + block->index, old + (block->index + 1), 704 sizeof(RDMALocalBlock) * 705 (local->nb_blocks - (block->index + 1))); 706 for (int x = block->index; x < local->nb_blocks - 1; x++) { 707 local->block[x].index--; 708 } 709 } 710 } else { 711 assert(block == local->block); 712 local->block = NULL; 713 } 714 715 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr, 716 block->offset, block->length, 717 (uintptr_t)(block->local_host_addr + block->length), 718 BITS_TO_LONGS(block->nb_chunks) * 719 sizeof(unsigned long) * 8, block->nb_chunks); 720 721 g_free(old); 722 723 local->nb_blocks--; 724 725 if (local->nb_blocks && rdma->blockmap) { 726 for (int x = 0; x < local->nb_blocks; x++) { 727 g_hash_table_insert(rdma->blockmap, 728 (void *)(uintptr_t)local->block[x].offset, 729 &local->block[x]); 730 } 731 } 732 } 733 734 /* 735 * Trace RDMA device open, with device details. 736 */ 737 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 738 { 739 struct ibv_port_attr port; 740 741 if (ibv_query_port(verbs, 1, &port)) { 742 trace_qemu_rdma_dump_id_failed(who); 743 return; 744 } 745 746 trace_qemu_rdma_dump_id(who, 747 verbs->device->name, 748 verbs->device->dev_name, 749 verbs->device->dev_path, 750 verbs->device->ibdev_path, 751 port.link_layer, 752 port.link_layer == IBV_LINK_LAYER_INFINIBAND ? "Infiniband" 753 : port.link_layer == IBV_LINK_LAYER_ETHERNET ? "Ethernet" 754 : "Unknown"); 755 } 756 757 /* 758 * Trace RDMA gid addressing information. 759 * Useful for understanding the RDMA device hierarchy in the kernel. 760 */ 761 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 762 { 763 char sgid[33]; 764 char dgid[33]; 765 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 766 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 767 trace_qemu_rdma_dump_gid(who, sgid, dgid); 768 } 769 770 /* 771 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 772 * We will try the next addrinfo struct, and fail if there are 773 * no other valid addresses to bind against. 774 * 775 * If user is listening on '[::]', then we will not have a opened a device 776 * yet and have no way of verifying if the device is RoCE or not. 777 * 778 * In this case, the source VM will throw an error for ALL types of 779 * connections (both IPv4 and IPv6) if the destination machine does not have 780 * a regular infiniband network available for use. 781 * 782 * The only way to guarantee that an error is thrown for broken kernels is 783 * for the management software to choose a *specific* interface at bind time 784 * and validate what time of hardware it is. 785 * 786 * Unfortunately, this puts the user in a fix: 787 * 788 * If the source VM connects with an IPv4 address without knowing that the 789 * destination has bound to '[::]' the migration will unconditionally fail 790 * unless the management software is explicitly listening on the IPv4 791 * address while using a RoCE-based device. 792 * 793 * If the source VM connects with an IPv6 address, then we're OK because we can 794 * throw an error on the source (and similarly on the destination). 795 * 796 * But in mixed environments, this will be broken for a while until it is fixed 797 * inside linux. 798 * 799 * We do provide a *tiny* bit of help in this function: We can list all of the 800 * devices in the system and check to see if all the devices are RoCE or 801 * Infiniband. 802 * 803 * If we detect that we have a *pure* RoCE environment, then we can safely 804 * thrown an error even if the management software has specified '[::]' as the 805 * bind address. 806 * 807 * However, if there is are multiple hetergeneous devices, then we cannot make 808 * this assumption and the user just has to be sure they know what they are 809 * doing. 810 * 811 * Patches are being reviewed on linux-rdma. 812 */ 813 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp) 814 { 815 /* This bug only exists in linux, to our knowledge. */ 816 #ifdef CONFIG_LINUX 817 struct ibv_port_attr port_attr; 818 819 /* 820 * Verbs are only NULL if management has bound to '[::]'. 821 * 822 * Let's iterate through all the devices and see if there any pure IB 823 * devices (non-ethernet). 824 * 825 * If not, then we can safely proceed with the migration. 826 * Otherwise, there are no guarantees until the bug is fixed in linux. 827 */ 828 if (!verbs) { 829 int num_devices; 830 struct ibv_device **dev_list = ibv_get_device_list(&num_devices); 831 bool roce_found = false; 832 bool ib_found = false; 833 834 for (int x = 0; x < num_devices; x++) { 835 verbs = ibv_open_device(dev_list[x]); 836 /* 837 * ibv_open_device() is not documented to set errno. If 838 * it does, it's somebody else's doc bug. If it doesn't, 839 * the use of errno below is wrong. 840 * TODO Find out whether ibv_open_device() sets errno. 841 */ 842 if (!verbs) { 843 if (errno == EPERM) { 844 continue; 845 } else { 846 error_setg_errno(errp, errno, 847 "could not open RDMA device context"); 848 return -1; 849 } 850 } 851 852 if (ibv_query_port(verbs, 1, &port_attr)) { 853 ibv_close_device(verbs); 854 error_setg(errp, 855 "RDMA ERROR: Could not query initial IB port"); 856 return -1; 857 } 858 859 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 860 ib_found = true; 861 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 862 roce_found = true; 863 } 864 865 ibv_close_device(verbs); 866 867 } 868 869 if (roce_found) { 870 if (ib_found) { 871 warn_report("migrations may fail:" 872 " IPv6 over RoCE / iWARP in linux" 873 " is broken. But since you appear to have a" 874 " mixed RoCE / IB environment, be sure to only" 875 " migrate over the IB fabric until the kernel " 876 " fixes the bug."); 877 } else { 878 error_setg(errp, "RDMA ERROR: " 879 "You only have RoCE / iWARP devices in your systems" 880 " and your management software has specified '[::]'" 881 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 882 return -1; 883 } 884 } 885 886 return 0; 887 } 888 889 /* 890 * If we have a verbs context, that means that some other than '[::]' was 891 * used by the management software for binding. In which case we can 892 * actually warn the user about a potentially broken kernel. 893 */ 894 895 /* IB ports start with 1, not 0 */ 896 if (ibv_query_port(verbs, 1, &port_attr)) { 897 error_setg(errp, "RDMA ERROR: Could not query initial IB port"); 898 return -1; 899 } 900 901 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 902 error_setg(errp, "RDMA ERROR: " 903 "Linux kernel's RoCE / iWARP does not support IPv6 " 904 "(but patches on linux-rdma in progress)"); 905 return -1; 906 } 907 908 #endif 909 910 return 0; 911 } 912 913 /* 914 * Figure out which RDMA device corresponds to the requested IP hostname 915 * Also create the initial connection manager identifiers for opening 916 * the connection. 917 */ 918 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 919 { 920 Error *err = NULL; 921 int ret; 922 struct rdma_addrinfo *res; 923 char port_str[16]; 924 struct rdma_cm_event *cm_event; 925 char ip[40] = "unknown"; 926 927 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 928 error_setg(errp, "RDMA ERROR: RDMA hostname has not been set"); 929 return -1; 930 } 931 932 /* create CM channel */ 933 rdma->channel = rdma_create_event_channel(); 934 if (!rdma->channel) { 935 error_setg(errp, "RDMA ERROR: could not create CM channel"); 936 return -1; 937 } 938 939 /* create CM id */ 940 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 941 if (ret < 0) { 942 error_setg(errp, "RDMA ERROR: could not create channel id"); 943 goto err_resolve_create_id; 944 } 945 946 snprintf(port_str, 16, "%d", rdma->port); 947 port_str[15] = '\0'; 948 949 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 950 if (ret) { 951 error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s", 952 rdma->host); 953 goto err_resolve_get_addr; 954 } 955 956 /* Try all addresses, saving the first error in @err */ 957 for (struct rdma_addrinfo *e = res; e != NULL; e = e->ai_next) { 958 Error **local_errp = err ? NULL : &err; 959 960 inet_ntop(e->ai_family, 961 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 962 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 963 964 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 965 RDMA_RESOLVE_TIMEOUT_MS); 966 if (ret >= 0) { 967 if (e->ai_family == AF_INET6) { 968 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, 969 local_errp); 970 if (ret < 0) { 971 continue; 972 } 973 } 974 error_free(err); 975 goto route; 976 } 977 } 978 979 rdma_freeaddrinfo(res); 980 if (err) { 981 error_propagate(errp, err); 982 } else { 983 error_setg(errp, "RDMA ERROR: could not resolve address %s", 984 rdma->host); 985 } 986 goto err_resolve_get_addr; 987 988 route: 989 rdma_freeaddrinfo(res); 990 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 991 992 ret = rdma_get_cm_event(rdma->channel, &cm_event); 993 if (ret < 0) { 994 error_setg(errp, "RDMA ERROR: could not perform event_addr_resolved"); 995 goto err_resolve_get_addr; 996 } 997 998 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 999 error_setg(errp, 1000 "RDMA ERROR: result not equal to event_addr_resolved %s", 1001 rdma_event_str(cm_event->event)); 1002 rdma_ack_cm_event(cm_event); 1003 goto err_resolve_get_addr; 1004 } 1005 rdma_ack_cm_event(cm_event); 1006 1007 /* resolve route */ 1008 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 1009 if (ret < 0) { 1010 error_setg(errp, "RDMA ERROR: could not resolve rdma route"); 1011 goto err_resolve_get_addr; 1012 } 1013 1014 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1015 if (ret < 0) { 1016 error_setg(errp, "RDMA ERROR: could not perform event_route_resolved"); 1017 goto err_resolve_get_addr; 1018 } 1019 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 1020 error_setg(errp, "RDMA ERROR: " 1021 "result not equal to event_route_resolved: %s", 1022 rdma_event_str(cm_event->event)); 1023 rdma_ack_cm_event(cm_event); 1024 goto err_resolve_get_addr; 1025 } 1026 rdma_ack_cm_event(cm_event); 1027 rdma->verbs = rdma->cm_id->verbs; 1028 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 1029 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 1030 return 0; 1031 1032 err_resolve_get_addr: 1033 rdma_destroy_id(rdma->cm_id); 1034 rdma->cm_id = NULL; 1035 err_resolve_create_id: 1036 rdma_destroy_event_channel(rdma->channel); 1037 rdma->channel = NULL; 1038 return -1; 1039 } 1040 1041 /* 1042 * Create protection domain and completion queues 1043 */ 1044 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma, Error **errp) 1045 { 1046 /* allocate pd */ 1047 rdma->pd = ibv_alloc_pd(rdma->verbs); 1048 if (!rdma->pd) { 1049 error_setg(errp, "failed to allocate protection domain"); 1050 return -1; 1051 } 1052 1053 /* create receive completion channel */ 1054 rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs); 1055 if (!rdma->recv_comp_channel) { 1056 error_setg(errp, "failed to allocate receive completion channel"); 1057 goto err_alloc_pd_cq; 1058 } 1059 1060 /* 1061 * Completion queue can be filled by read work requests. 1062 */ 1063 rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1064 NULL, rdma->recv_comp_channel, 0); 1065 if (!rdma->recv_cq) { 1066 error_setg(errp, "failed to allocate receive completion queue"); 1067 goto err_alloc_pd_cq; 1068 } 1069 1070 /* create send completion channel */ 1071 rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs); 1072 if (!rdma->send_comp_channel) { 1073 error_setg(errp, "failed to allocate send completion channel"); 1074 goto err_alloc_pd_cq; 1075 } 1076 1077 rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1078 NULL, rdma->send_comp_channel, 0); 1079 if (!rdma->send_cq) { 1080 error_setg(errp, "failed to allocate send completion queue"); 1081 goto err_alloc_pd_cq; 1082 } 1083 1084 return 0; 1085 1086 err_alloc_pd_cq: 1087 if (rdma->pd) { 1088 ibv_dealloc_pd(rdma->pd); 1089 } 1090 if (rdma->recv_comp_channel) { 1091 ibv_destroy_comp_channel(rdma->recv_comp_channel); 1092 } 1093 if (rdma->send_comp_channel) { 1094 ibv_destroy_comp_channel(rdma->send_comp_channel); 1095 } 1096 if (rdma->recv_cq) { 1097 ibv_destroy_cq(rdma->recv_cq); 1098 rdma->recv_cq = NULL; 1099 } 1100 rdma->pd = NULL; 1101 rdma->recv_comp_channel = NULL; 1102 rdma->send_comp_channel = NULL; 1103 return -1; 1104 1105 } 1106 1107 /* 1108 * Create queue pairs. 1109 */ 1110 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1111 { 1112 struct ibv_qp_init_attr attr = { 0 }; 1113 1114 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1115 attr.cap.max_recv_wr = 3; 1116 attr.cap.max_send_sge = 1; 1117 attr.cap.max_recv_sge = 1; 1118 attr.send_cq = rdma->send_cq; 1119 attr.recv_cq = rdma->recv_cq; 1120 attr.qp_type = IBV_QPT_RC; 1121 1122 if (rdma_create_qp(rdma->cm_id, rdma->pd, &attr) < 0) { 1123 return -1; 1124 } 1125 1126 rdma->qp = rdma->cm_id->qp; 1127 return 0; 1128 } 1129 1130 /* Check whether On-Demand Paging is supported by RDAM device */ 1131 static bool rdma_support_odp(struct ibv_context *dev) 1132 { 1133 struct ibv_device_attr_ex attr = {0}; 1134 1135 if (ibv_query_device_ex(dev, NULL, &attr)) { 1136 return false; 1137 } 1138 1139 if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) { 1140 return true; 1141 } 1142 1143 return false; 1144 } 1145 1146 /* 1147 * ibv_advise_mr to avoid RNR NAK error as far as possible. 1148 * The responder mr registering with ODP will sent RNR NAK back to 1149 * the requester in the face of the page fault. 1150 */ 1151 static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr, 1152 uint32_t len, uint32_t lkey, 1153 const char *name, bool wr) 1154 { 1155 #ifdef HAVE_IBV_ADVISE_MR 1156 int ret; 1157 int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE : 1158 IBV_ADVISE_MR_ADVICE_PREFETCH; 1159 struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len}; 1160 1161 ret = ibv_advise_mr(pd, advice, 1162 IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1); 1163 /* ignore the error */ 1164 trace_qemu_rdma_advise_mr(name, len, addr, strerror(ret)); 1165 #endif 1166 } 1167 1168 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma, Error **errp) 1169 { 1170 int i; 1171 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1172 1173 for (i = 0; i < local->nb_blocks; i++) { 1174 int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE; 1175 1176 local->block[i].mr = 1177 ibv_reg_mr(rdma->pd, 1178 local->block[i].local_host_addr, 1179 local->block[i].length, access 1180 ); 1181 /* 1182 * ibv_reg_mr() is not documented to set errno. If it does, 1183 * it's somebody else's doc bug. If it doesn't, the use of 1184 * errno below is wrong. 1185 * TODO Find out whether ibv_reg_mr() sets errno. 1186 */ 1187 if (!local->block[i].mr && 1188 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) { 1189 access |= IBV_ACCESS_ON_DEMAND; 1190 /* register ODP mr */ 1191 local->block[i].mr = 1192 ibv_reg_mr(rdma->pd, 1193 local->block[i].local_host_addr, 1194 local->block[i].length, access); 1195 trace_qemu_rdma_register_odp_mr(local->block[i].block_name); 1196 1197 if (local->block[i].mr) { 1198 qemu_rdma_advise_prefetch_mr(rdma->pd, 1199 (uintptr_t)local->block[i].local_host_addr, 1200 local->block[i].length, 1201 local->block[i].mr->lkey, 1202 local->block[i].block_name, 1203 true); 1204 } 1205 } 1206 1207 if (!local->block[i].mr) { 1208 error_setg_errno(errp, errno, 1209 "Failed to register local dest ram block!"); 1210 goto err; 1211 } 1212 rdma->total_registrations++; 1213 } 1214 1215 return 0; 1216 1217 err: 1218 for (i--; i >= 0; i--) { 1219 ibv_dereg_mr(local->block[i].mr); 1220 local->block[i].mr = NULL; 1221 rdma->total_registrations--; 1222 } 1223 1224 return -1; 1225 1226 } 1227 1228 /* 1229 * Find the ram block that corresponds to the page requested to be 1230 * transmitted by QEMU. 1231 * 1232 * Once the block is found, also identify which 'chunk' within that 1233 * block that the page belongs to. 1234 */ 1235 static void qemu_rdma_search_ram_block(RDMAContext *rdma, 1236 uintptr_t block_offset, 1237 uint64_t offset, 1238 uint64_t length, 1239 uint64_t *block_index, 1240 uint64_t *chunk_index) 1241 { 1242 uint64_t current_addr = block_offset + offset; 1243 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1244 (void *) block_offset); 1245 assert(block); 1246 assert(current_addr >= block->offset); 1247 assert((current_addr + length) <= (block->offset + block->length)); 1248 1249 *block_index = block->index; 1250 *chunk_index = ram_chunk_index(block->local_host_addr, 1251 block->local_host_addr + (current_addr - block->offset)); 1252 } 1253 1254 /* 1255 * Register a chunk with IB. If the chunk was already registered 1256 * previously, then skip. 1257 * 1258 * Also return the keys associated with the registration needed 1259 * to perform the actual RDMA operation. 1260 */ 1261 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1262 RDMALocalBlock *block, uintptr_t host_addr, 1263 uint32_t *lkey, uint32_t *rkey, int chunk, 1264 uint8_t *chunk_start, uint8_t *chunk_end) 1265 { 1266 if (block->mr) { 1267 if (lkey) { 1268 *lkey = block->mr->lkey; 1269 } 1270 if (rkey) { 1271 *rkey = block->mr->rkey; 1272 } 1273 return 0; 1274 } 1275 1276 /* allocate memory to store chunk MRs */ 1277 if (!block->pmr) { 1278 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks); 1279 } 1280 1281 /* 1282 * If 'rkey', then we're the destination, so grant access to the source. 1283 * 1284 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1285 */ 1286 if (!block->pmr[chunk]) { 1287 uint64_t len = chunk_end - chunk_start; 1288 int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE : 1289 0; 1290 1291 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1292 1293 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access); 1294 /* 1295 * ibv_reg_mr() is not documented to set errno. If it does, 1296 * it's somebody else's doc bug. If it doesn't, the use of 1297 * errno below is wrong. 1298 * TODO Find out whether ibv_reg_mr() sets errno. 1299 */ 1300 if (!block->pmr[chunk] && 1301 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) { 1302 access |= IBV_ACCESS_ON_DEMAND; 1303 /* register ODP mr */ 1304 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access); 1305 trace_qemu_rdma_register_odp_mr(block->block_name); 1306 1307 if (block->pmr[chunk]) { 1308 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start, 1309 len, block->pmr[chunk]->lkey, 1310 block->block_name, rkey); 1311 1312 } 1313 } 1314 } 1315 if (!block->pmr[chunk]) { 1316 return -1; 1317 } 1318 rdma->total_registrations++; 1319 1320 if (lkey) { 1321 *lkey = block->pmr[chunk]->lkey; 1322 } 1323 if (rkey) { 1324 *rkey = block->pmr[chunk]->rkey; 1325 } 1326 return 0; 1327 } 1328 1329 /* 1330 * Register (at connection time) the memory used for control 1331 * channel messages. 1332 */ 1333 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1334 { 1335 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1336 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1337 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1338 if (rdma->wr_data[idx].control_mr) { 1339 rdma->total_registrations++; 1340 return 0; 1341 } 1342 return -1; 1343 } 1344 1345 /* 1346 * Perform a non-optimized memory unregistration after every transfer 1347 * for demonstration purposes, only if pin-all is not requested. 1348 * 1349 * Potential optimizations: 1350 * 1. Start a new thread to run this function continuously 1351 - for bit clearing 1352 - and for receipt of unregister messages 1353 * 2. Use an LRU. 1354 * 3. Use workload hints. 1355 */ 1356 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1357 { 1358 Error *err = NULL; 1359 1360 while (rdma->unregistrations[rdma->unregister_current]) { 1361 int ret; 1362 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1363 uint64_t chunk = 1364 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1365 uint64_t index = 1366 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1367 RDMALocalBlock *block = 1368 &(rdma->local_ram_blocks.block[index]); 1369 RDMARegister reg = { .current_index = index }; 1370 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1371 }; 1372 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1373 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1374 .repeat = 1, 1375 }; 1376 1377 trace_qemu_rdma_unregister_waiting_proc(chunk, 1378 rdma->unregister_current); 1379 1380 rdma->unregistrations[rdma->unregister_current] = 0; 1381 rdma->unregister_current++; 1382 1383 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1384 rdma->unregister_current = 0; 1385 } 1386 1387 1388 /* 1389 * Unregistration is speculative (because migration is single-threaded 1390 * and we cannot break the protocol's inifinband message ordering). 1391 * Thus, if the memory is currently being used for transmission, 1392 * then abort the attempt to unregister and try again 1393 * later the next time a completion is received for this memory. 1394 */ 1395 clear_bit(chunk, block->unregister_bitmap); 1396 1397 if (test_bit(chunk, block->transit_bitmap)) { 1398 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1399 continue; 1400 } 1401 1402 trace_qemu_rdma_unregister_waiting_send(chunk); 1403 1404 ret = ibv_dereg_mr(block->pmr[chunk]); 1405 block->pmr[chunk] = NULL; 1406 block->remote_keys[chunk] = 0; 1407 1408 if (ret != 0) { 1409 error_report("unregistration chunk failed: %s", 1410 strerror(ret)); 1411 return -1; 1412 } 1413 rdma->total_registrations--; 1414 1415 reg.key.chunk = chunk; 1416 register_to_network(rdma, ®); 1417 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1418 &resp, NULL, NULL, &err); 1419 if (ret < 0) { 1420 error_report_err(err); 1421 return -1; 1422 } 1423 1424 trace_qemu_rdma_unregister_waiting_complete(chunk); 1425 } 1426 1427 return 0; 1428 } 1429 1430 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1431 uint64_t chunk) 1432 { 1433 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1434 1435 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1436 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1437 1438 return result; 1439 } 1440 1441 /* 1442 * Consult the connection manager to see a work request 1443 * (of any kind) has completed. 1444 * Return the work request ID that completed. 1445 */ 1446 static int qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq, 1447 uint64_t *wr_id_out, uint32_t *byte_len) 1448 { 1449 int ret; 1450 struct ibv_wc wc; 1451 uint64_t wr_id; 1452 1453 ret = ibv_poll_cq(cq, 1, &wc); 1454 1455 if (!ret) { 1456 *wr_id_out = RDMA_WRID_NONE; 1457 return 0; 1458 } 1459 1460 if (ret < 0) { 1461 return -1; 1462 } 1463 1464 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1465 1466 if (wc.status != IBV_WC_SUCCESS) { 1467 return -1; 1468 } 1469 1470 if (rdma->control_ready_expected && 1471 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1472 trace_qemu_rdma_poll_recv(wr_id - RDMA_WRID_RECV_CONTROL, wr_id, 1473 rdma->nb_sent); 1474 rdma->control_ready_expected = 0; 1475 } 1476 1477 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1478 uint64_t chunk = 1479 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1480 uint64_t index = 1481 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1482 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1483 1484 trace_qemu_rdma_poll_write(wr_id, rdma->nb_sent, 1485 index, chunk, block->local_host_addr, 1486 (void *)(uintptr_t)block->remote_host_addr); 1487 1488 clear_bit(chunk, block->transit_bitmap); 1489 1490 if (rdma->nb_sent > 0) { 1491 rdma->nb_sent--; 1492 } 1493 } else { 1494 trace_qemu_rdma_poll_other(wr_id, rdma->nb_sent); 1495 } 1496 1497 *wr_id_out = wc.wr_id; 1498 if (byte_len) { 1499 *byte_len = wc.byte_len; 1500 } 1501 1502 return 0; 1503 } 1504 1505 /* Wait for activity on the completion channel. 1506 * Returns 0 on success, none-0 on error. 1507 */ 1508 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma, 1509 struct ibv_comp_channel *comp_channel) 1510 { 1511 struct rdma_cm_event *cm_event; 1512 1513 /* 1514 * Coroutine doesn't start until migration_fd_process_incoming() 1515 * so don't yield unless we know we're running inside of a coroutine. 1516 */ 1517 if (rdma->migration_started_on_destination && 1518 migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) { 1519 yield_until_fd_readable(comp_channel->fd); 1520 } else { 1521 /* This is the source side, we're in a separate thread 1522 * or destination prior to migration_fd_process_incoming() 1523 * after postcopy, the destination also in a separate thread. 1524 * we can't yield; so we have to poll the fd. 1525 * But we need to be able to handle 'cancel' or an error 1526 * without hanging forever. 1527 */ 1528 while (!rdma->errored && !rdma->received_error) { 1529 GPollFD pfds[2]; 1530 pfds[0].fd = comp_channel->fd; 1531 pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1532 pfds[0].revents = 0; 1533 1534 pfds[1].fd = rdma->channel->fd; 1535 pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1536 pfds[1].revents = 0; 1537 1538 /* 0.1s timeout, should be fine for a 'cancel' */ 1539 switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) { 1540 case 2: 1541 case 1: /* fd active */ 1542 if (pfds[0].revents) { 1543 return 0; 1544 } 1545 1546 if (pfds[1].revents) { 1547 if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) { 1548 return -1; 1549 } 1550 1551 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 1552 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 1553 rdma_ack_cm_event(cm_event); 1554 return -1; 1555 } 1556 rdma_ack_cm_event(cm_event); 1557 } 1558 break; 1559 1560 case 0: /* Timeout, go around again */ 1561 break; 1562 1563 default: /* Error of some type - 1564 * I don't trust errno from qemu_poll_ns 1565 */ 1566 return -1; 1567 } 1568 1569 if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) { 1570 /* Bail out and let the cancellation happen */ 1571 return -1; 1572 } 1573 } 1574 } 1575 1576 if (rdma->received_error) { 1577 return -1; 1578 } 1579 return -rdma->errored; 1580 } 1581 1582 static struct ibv_comp_channel *to_channel(RDMAContext *rdma, uint64_t wrid) 1583 { 1584 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel : 1585 rdma->recv_comp_channel; 1586 } 1587 1588 static struct ibv_cq *to_cq(RDMAContext *rdma, uint64_t wrid) 1589 { 1590 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq; 1591 } 1592 1593 /* 1594 * Block until the next work request has completed. 1595 * 1596 * First poll to see if a work request has already completed, 1597 * otherwise block. 1598 * 1599 * If we encounter completed work requests for IDs other than 1600 * the one we're interested in, then that's generally an error. 1601 * 1602 * The only exception is actual RDMA Write completions. These 1603 * completions only need to be recorded, but do not actually 1604 * need further processing. 1605 */ 1606 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, 1607 uint64_t wrid_requested, 1608 uint32_t *byte_len) 1609 { 1610 int num_cq_events = 0, ret; 1611 struct ibv_cq *cq; 1612 void *cq_ctx; 1613 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1614 struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested); 1615 struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested); 1616 1617 if (ibv_req_notify_cq(poll_cq, 0)) { 1618 return -1; 1619 } 1620 /* poll cq first */ 1621 while (wr_id != wrid_requested) { 1622 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len); 1623 if (ret < 0) { 1624 return -1; 1625 } 1626 1627 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1628 1629 if (wr_id == RDMA_WRID_NONE) { 1630 break; 1631 } 1632 if (wr_id != wrid_requested) { 1633 trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id); 1634 } 1635 } 1636 1637 if (wr_id == wrid_requested) { 1638 return 0; 1639 } 1640 1641 while (1) { 1642 ret = qemu_rdma_wait_comp_channel(rdma, ch); 1643 if (ret < 0) { 1644 goto err_block_for_wrid; 1645 } 1646 1647 ret = ibv_get_cq_event(ch, &cq, &cq_ctx); 1648 if (ret < 0) { 1649 goto err_block_for_wrid; 1650 } 1651 1652 num_cq_events++; 1653 1654 if (ibv_req_notify_cq(cq, 0)) { 1655 goto err_block_for_wrid; 1656 } 1657 1658 while (wr_id != wrid_requested) { 1659 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len); 1660 if (ret < 0) { 1661 goto err_block_for_wrid; 1662 } 1663 1664 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1665 1666 if (wr_id == RDMA_WRID_NONE) { 1667 break; 1668 } 1669 if (wr_id != wrid_requested) { 1670 trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id); 1671 } 1672 } 1673 1674 if (wr_id == wrid_requested) { 1675 goto success_block_for_wrid; 1676 } 1677 } 1678 1679 success_block_for_wrid: 1680 if (num_cq_events) { 1681 ibv_ack_cq_events(cq, num_cq_events); 1682 } 1683 return 0; 1684 1685 err_block_for_wrid: 1686 if (num_cq_events) { 1687 ibv_ack_cq_events(cq, num_cq_events); 1688 } 1689 1690 rdma->errored = true; 1691 return -1; 1692 } 1693 1694 /* 1695 * Post a SEND message work request for the control channel 1696 * containing some data and block until the post completes. 1697 */ 1698 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1699 RDMAControlHeader *head, 1700 Error **errp) 1701 { 1702 int ret; 1703 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1704 struct ibv_send_wr *bad_wr; 1705 struct ibv_sge sge = { 1706 .addr = (uintptr_t)(wr->control), 1707 .length = head->len + sizeof(RDMAControlHeader), 1708 .lkey = wr->control_mr->lkey, 1709 }; 1710 struct ibv_send_wr send_wr = { 1711 .wr_id = RDMA_WRID_SEND_CONTROL, 1712 .opcode = IBV_WR_SEND, 1713 .send_flags = IBV_SEND_SIGNALED, 1714 .sg_list = &sge, 1715 .num_sge = 1, 1716 }; 1717 1718 trace_qemu_rdma_post_send_control(control_desc(head->type)); 1719 1720 /* 1721 * We don't actually need to do a memcpy() in here if we used 1722 * the "sge" properly, but since we're only sending control messages 1723 * (not RAM in a performance-critical path), then its OK for now. 1724 * 1725 * The copy makes the RDMAControlHeader simpler to manipulate 1726 * for the time being. 1727 */ 1728 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1729 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1730 control_to_network((void *) wr->control); 1731 1732 if (buf) { 1733 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1734 } 1735 1736 1737 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1738 1739 if (ret > 0) { 1740 error_setg(errp, "Failed to use post IB SEND for control"); 1741 return -1; 1742 } 1743 1744 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1745 if (ret < 0) { 1746 error_setg(errp, "rdma migration: send polling control error"); 1747 return -1; 1748 } 1749 1750 return 0; 1751 } 1752 1753 /* 1754 * Post a RECV work request in anticipation of some future receipt 1755 * of data on the control channel. 1756 */ 1757 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx, 1758 Error **errp) 1759 { 1760 struct ibv_recv_wr *bad_wr; 1761 struct ibv_sge sge = { 1762 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1763 .length = RDMA_CONTROL_MAX_BUFFER, 1764 .lkey = rdma->wr_data[idx].control_mr->lkey, 1765 }; 1766 1767 struct ibv_recv_wr recv_wr = { 1768 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1769 .sg_list = &sge, 1770 .num_sge = 1, 1771 }; 1772 1773 1774 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1775 error_setg(errp, "error posting control recv"); 1776 return -1; 1777 } 1778 1779 return 0; 1780 } 1781 1782 /* 1783 * Block and wait for a RECV control channel message to arrive. 1784 */ 1785 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1786 RDMAControlHeader *head, uint32_t expecting, int idx, 1787 Error **errp) 1788 { 1789 uint32_t byte_len; 1790 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1791 &byte_len); 1792 1793 if (ret < 0) { 1794 error_setg(errp, "rdma migration: recv polling control error!"); 1795 return -1; 1796 } 1797 1798 network_to_control((void *) rdma->wr_data[idx].control); 1799 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1800 1801 trace_qemu_rdma_exchange_get_response_start(control_desc(expecting)); 1802 1803 if (expecting == RDMA_CONTROL_NONE) { 1804 trace_qemu_rdma_exchange_get_response_none(control_desc(head->type), 1805 head->type); 1806 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1807 error_setg(errp, "Was expecting a %s (%d) control message" 1808 ", but got: %s (%d), length: %d", 1809 control_desc(expecting), expecting, 1810 control_desc(head->type), head->type, head->len); 1811 if (head->type == RDMA_CONTROL_ERROR) { 1812 rdma->received_error = true; 1813 } 1814 return -1; 1815 } 1816 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1817 error_setg(errp, "too long length: %d", head->len); 1818 return -1; 1819 } 1820 if (sizeof(*head) + head->len != byte_len) { 1821 error_setg(errp, "Malformed length: %d byte_len %d", 1822 head->len, byte_len); 1823 return -1; 1824 } 1825 1826 return 0; 1827 } 1828 1829 /* 1830 * When a RECV work request has completed, the work request's 1831 * buffer is pointed at the header. 1832 * 1833 * This will advance the pointer to the data portion 1834 * of the control message of the work request's buffer that 1835 * was populated after the work request finished. 1836 */ 1837 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1838 RDMAControlHeader *head) 1839 { 1840 rdma->wr_data[idx].control_len = head->len; 1841 rdma->wr_data[idx].control_curr = 1842 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1843 } 1844 1845 /* 1846 * This is an 'atomic' high-level operation to deliver a single, unified 1847 * control-channel message. 1848 * 1849 * Additionally, if the user is expecting some kind of reply to this message, 1850 * they can request a 'resp' response message be filled in by posting an 1851 * additional work request on behalf of the user and waiting for an additional 1852 * completion. 1853 * 1854 * The extra (optional) response is used during registration to us from having 1855 * to perform an *additional* exchange of message just to provide a response by 1856 * instead piggy-backing on the acknowledgement. 1857 */ 1858 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1859 uint8_t *data, RDMAControlHeader *resp, 1860 int *resp_idx, 1861 int (*callback)(RDMAContext *rdma, 1862 Error **errp), 1863 Error **errp) 1864 { 1865 int ret; 1866 1867 /* 1868 * Wait until the dest is ready before attempting to deliver the message 1869 * by waiting for a READY message. 1870 */ 1871 if (rdma->control_ready_expected) { 1872 RDMAControlHeader resp_ignored; 1873 1874 ret = qemu_rdma_exchange_get_response(rdma, &resp_ignored, 1875 RDMA_CONTROL_READY, 1876 RDMA_WRID_READY, errp); 1877 if (ret < 0) { 1878 return -1; 1879 } 1880 } 1881 1882 /* 1883 * If the user is expecting a response, post a WR in anticipation of it. 1884 */ 1885 if (resp) { 1886 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA, errp); 1887 if (ret < 0) { 1888 return -1; 1889 } 1890 } 1891 1892 /* 1893 * Post a WR to replace the one we just consumed for the READY message. 1894 */ 1895 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp); 1896 if (ret < 0) { 1897 return -1; 1898 } 1899 1900 /* 1901 * Deliver the control message that was requested. 1902 */ 1903 ret = qemu_rdma_post_send_control(rdma, data, head, errp); 1904 1905 if (ret < 0) { 1906 return -1; 1907 } 1908 1909 /* 1910 * If we're expecting a response, block and wait for it. 1911 */ 1912 if (resp) { 1913 if (callback) { 1914 trace_qemu_rdma_exchange_send_issue_callback(); 1915 ret = callback(rdma, errp); 1916 if (ret < 0) { 1917 return -1; 1918 } 1919 } 1920 1921 trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type)); 1922 ret = qemu_rdma_exchange_get_response(rdma, resp, 1923 resp->type, RDMA_WRID_DATA, 1924 errp); 1925 1926 if (ret < 0) { 1927 return -1; 1928 } 1929 1930 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1931 if (resp_idx) { 1932 *resp_idx = RDMA_WRID_DATA; 1933 } 1934 trace_qemu_rdma_exchange_send_received(control_desc(resp->type)); 1935 } 1936 1937 rdma->control_ready_expected = 1; 1938 1939 return 0; 1940 } 1941 1942 /* 1943 * This is an 'atomic' high-level operation to receive a single, unified 1944 * control-channel message. 1945 */ 1946 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 1947 uint32_t expecting, Error **errp) 1948 { 1949 RDMAControlHeader ready = { 1950 .len = 0, 1951 .type = RDMA_CONTROL_READY, 1952 .repeat = 1, 1953 }; 1954 int ret; 1955 1956 /* 1957 * Inform the source that we're ready to receive a message. 1958 */ 1959 ret = qemu_rdma_post_send_control(rdma, NULL, &ready, errp); 1960 1961 if (ret < 0) { 1962 return -1; 1963 } 1964 1965 /* 1966 * Block and wait for the message. 1967 */ 1968 ret = qemu_rdma_exchange_get_response(rdma, head, 1969 expecting, RDMA_WRID_READY, errp); 1970 1971 if (ret < 0) { 1972 return -1; 1973 } 1974 1975 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 1976 1977 /* 1978 * Post a new RECV work request to replace the one we just consumed. 1979 */ 1980 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp); 1981 if (ret < 0) { 1982 return -1; 1983 } 1984 1985 return 0; 1986 } 1987 1988 /* 1989 * Write an actual chunk of memory using RDMA. 1990 * 1991 * If we're using dynamic registration on the dest-side, we have to 1992 * send a registration command first. 1993 */ 1994 static int qemu_rdma_write_one(RDMAContext *rdma, 1995 int current_index, uint64_t current_addr, 1996 uint64_t length, Error **errp) 1997 { 1998 struct ibv_sge sge; 1999 struct ibv_send_wr send_wr = { 0 }; 2000 struct ibv_send_wr *bad_wr; 2001 int reg_result_idx, ret, count = 0; 2002 uint64_t chunk, chunks; 2003 uint8_t *chunk_start, *chunk_end; 2004 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 2005 RDMARegister reg; 2006 RDMARegisterResult *reg_result; 2007 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 2008 RDMAControlHeader head = { .len = sizeof(RDMARegister), 2009 .type = RDMA_CONTROL_REGISTER_REQUEST, 2010 .repeat = 1, 2011 }; 2012 2013 retry: 2014 sge.addr = (uintptr_t)(block->local_host_addr + 2015 (current_addr - block->offset)); 2016 sge.length = length; 2017 2018 chunk = ram_chunk_index(block->local_host_addr, 2019 (uint8_t *)(uintptr_t)sge.addr); 2020 chunk_start = ram_chunk_start(block, chunk); 2021 2022 if (block->is_ram_block) { 2023 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 2024 2025 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 2026 chunks--; 2027 } 2028 } else { 2029 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 2030 2031 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 2032 chunks--; 2033 } 2034 } 2035 2036 trace_qemu_rdma_write_one_top(chunks + 1, 2037 (chunks + 1) * 2038 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 2039 2040 chunk_end = ram_chunk_end(block, chunk + chunks); 2041 2042 2043 while (test_bit(chunk, block->transit_bitmap)) { 2044 (void)count; 2045 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 2046 sge.addr, length, rdma->nb_sent, block->nb_chunks); 2047 2048 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2049 2050 if (ret < 0) { 2051 error_setg(errp, "Failed to Wait for previous write to complete " 2052 "block %d chunk %" PRIu64 2053 " current %" PRIu64 " len %" PRIu64 " %d", 2054 current_index, chunk, sge.addr, length, rdma->nb_sent); 2055 return -1; 2056 } 2057 } 2058 2059 if (!rdma->pin_all || !block->is_ram_block) { 2060 if (!block->remote_keys[chunk]) { 2061 /* 2062 * This chunk has not yet been registered, so first check to see 2063 * if the entire chunk is zero. If so, tell the other size to 2064 * memset() + madvise() the entire chunk without RDMA. 2065 */ 2066 2067 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) { 2068 RDMACompress comp = { 2069 .offset = current_addr, 2070 .value = 0, 2071 .block_idx = current_index, 2072 .length = length, 2073 }; 2074 2075 head.len = sizeof(comp); 2076 head.type = RDMA_CONTROL_COMPRESS; 2077 2078 trace_qemu_rdma_write_one_zero(chunk, sge.length, 2079 current_index, current_addr); 2080 2081 compress_to_network(rdma, &comp); 2082 ret = qemu_rdma_exchange_send(rdma, &head, 2083 (uint8_t *) &comp, NULL, NULL, NULL, errp); 2084 2085 if (ret < 0) { 2086 return -1; 2087 } 2088 2089 /* 2090 * TODO: Here we are sending something, but we are not 2091 * accounting for anything transferred. The following is wrong: 2092 * 2093 * stat64_add(&mig_stats.rdma_bytes, sge.length); 2094 * 2095 * because we are using some kind of compression. I 2096 * would think that head.len would be the more similar 2097 * thing to a correct value. 2098 */ 2099 stat64_add(&mig_stats.zero_pages, 2100 sge.length / qemu_target_page_size()); 2101 return 1; 2102 } 2103 2104 /* 2105 * Otherwise, tell other side to register. 2106 */ 2107 reg.current_index = current_index; 2108 if (block->is_ram_block) { 2109 reg.key.current_addr = current_addr; 2110 } else { 2111 reg.key.chunk = chunk; 2112 } 2113 reg.chunks = chunks; 2114 2115 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 2116 current_addr); 2117 2118 register_to_network(rdma, ®); 2119 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 2120 &resp, ®_result_idx, NULL, errp); 2121 if (ret < 0) { 2122 return -1; 2123 } 2124 2125 /* try to overlap this single registration with the one we sent. */ 2126 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2127 &sge.lkey, NULL, chunk, 2128 chunk_start, chunk_end)) { 2129 error_setg(errp, "cannot get lkey"); 2130 return -1; 2131 } 2132 2133 reg_result = (RDMARegisterResult *) 2134 rdma->wr_data[reg_result_idx].control_curr; 2135 2136 network_to_result(reg_result); 2137 2138 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 2139 reg_result->rkey, chunk); 2140 2141 block->remote_keys[chunk] = reg_result->rkey; 2142 block->remote_host_addr = reg_result->host_addr; 2143 } else { 2144 /* already registered before */ 2145 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2146 &sge.lkey, NULL, chunk, 2147 chunk_start, chunk_end)) { 2148 error_setg(errp, "cannot get lkey!"); 2149 return -1; 2150 } 2151 } 2152 2153 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 2154 } else { 2155 send_wr.wr.rdma.rkey = block->remote_rkey; 2156 2157 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2158 &sge.lkey, NULL, chunk, 2159 chunk_start, chunk_end)) { 2160 error_setg(errp, "cannot get lkey!"); 2161 return -1; 2162 } 2163 } 2164 2165 /* 2166 * Encode the ram block index and chunk within this wrid. 2167 * We will use this information at the time of completion 2168 * to figure out which bitmap to check against and then which 2169 * chunk in the bitmap to look for. 2170 */ 2171 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 2172 current_index, chunk); 2173 2174 send_wr.opcode = IBV_WR_RDMA_WRITE; 2175 send_wr.send_flags = IBV_SEND_SIGNALED; 2176 send_wr.sg_list = &sge; 2177 send_wr.num_sge = 1; 2178 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2179 (current_addr - block->offset); 2180 2181 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2182 sge.length); 2183 2184 /* 2185 * ibv_post_send() does not return negative error numbers, 2186 * per the specification they are positive - no idea why. 2187 */ 2188 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2189 2190 if (ret == ENOMEM) { 2191 trace_qemu_rdma_write_one_queue_full(); 2192 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2193 if (ret < 0) { 2194 error_setg(errp, "rdma migration: failed to make " 2195 "room in full send queue!"); 2196 return -1; 2197 } 2198 2199 goto retry; 2200 2201 } else if (ret > 0) { 2202 error_setg_errno(errp, ret, 2203 "rdma migration: post rdma write failed"); 2204 return -1; 2205 } 2206 2207 set_bit(chunk, block->transit_bitmap); 2208 stat64_add(&mig_stats.normal_pages, sge.length / qemu_target_page_size()); 2209 /* 2210 * We are adding to transferred the amount of data written, but no 2211 * overhead at all. I will assume that RDMA is magicaly and don't 2212 * need to transfer (at least) the addresses where it wants to 2213 * write the pages. Here it looks like it should be something 2214 * like: 2215 * sizeof(send_wr) + sge.length 2216 * but this being RDMA, who knows. 2217 */ 2218 stat64_add(&mig_stats.rdma_bytes, sge.length); 2219 ram_transferred_add(sge.length); 2220 rdma->total_writes++; 2221 2222 return 0; 2223 } 2224 2225 /* 2226 * Push out any unwritten RDMA operations. 2227 * 2228 * We support sending out multiple chunks at the same time. 2229 * Not all of them need to get signaled in the completion queue. 2230 */ 2231 static int qemu_rdma_write_flush(RDMAContext *rdma, Error **errp) 2232 { 2233 int ret; 2234 2235 if (!rdma->current_length) { 2236 return 0; 2237 } 2238 2239 ret = qemu_rdma_write_one(rdma, rdma->current_index, rdma->current_addr, 2240 rdma->current_length, errp); 2241 2242 if (ret < 0) { 2243 return -1; 2244 } 2245 2246 if (ret == 0) { 2247 rdma->nb_sent++; 2248 trace_qemu_rdma_write_flush(rdma->nb_sent); 2249 } 2250 2251 rdma->current_length = 0; 2252 rdma->current_addr = 0; 2253 2254 return 0; 2255 } 2256 2257 static inline bool qemu_rdma_buffer_mergeable(RDMAContext *rdma, 2258 uint64_t offset, uint64_t len) 2259 { 2260 RDMALocalBlock *block; 2261 uint8_t *host_addr; 2262 uint8_t *chunk_end; 2263 2264 if (rdma->current_index < 0) { 2265 return false; 2266 } 2267 2268 if (rdma->current_chunk < 0) { 2269 return false; 2270 } 2271 2272 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2273 host_addr = block->local_host_addr + (offset - block->offset); 2274 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2275 2276 if (rdma->current_length == 0) { 2277 return false; 2278 } 2279 2280 /* 2281 * Only merge into chunk sequentially. 2282 */ 2283 if (offset != (rdma->current_addr + rdma->current_length)) { 2284 return false; 2285 } 2286 2287 if (offset < block->offset) { 2288 return false; 2289 } 2290 2291 if ((offset + len) > (block->offset + block->length)) { 2292 return false; 2293 } 2294 2295 if ((host_addr + len) > chunk_end) { 2296 return false; 2297 } 2298 2299 return true; 2300 } 2301 2302 /* 2303 * We're not actually writing here, but doing three things: 2304 * 2305 * 1. Identify the chunk the buffer belongs to. 2306 * 2. If the chunk is full or the buffer doesn't belong to the current 2307 * chunk, then start a new chunk and flush() the old chunk. 2308 * 3. To keep the hardware busy, we also group chunks into batches 2309 * and only require that a batch gets acknowledged in the completion 2310 * queue instead of each individual chunk. 2311 */ 2312 static int qemu_rdma_write(RDMAContext *rdma, 2313 uint64_t block_offset, uint64_t offset, 2314 uint64_t len, Error **errp) 2315 { 2316 uint64_t current_addr = block_offset + offset; 2317 uint64_t index = rdma->current_index; 2318 uint64_t chunk = rdma->current_chunk; 2319 2320 /* If we cannot merge it, we flush the current buffer first. */ 2321 if (!qemu_rdma_buffer_mergeable(rdma, current_addr, len)) { 2322 if (qemu_rdma_write_flush(rdma, errp) < 0) { 2323 return -1; 2324 } 2325 rdma->current_length = 0; 2326 rdma->current_addr = current_addr; 2327 2328 qemu_rdma_search_ram_block(rdma, block_offset, 2329 offset, len, &index, &chunk); 2330 rdma->current_index = index; 2331 rdma->current_chunk = chunk; 2332 } 2333 2334 /* merge it */ 2335 rdma->current_length += len; 2336 2337 /* flush it if buffer is too large */ 2338 if (rdma->current_length >= RDMA_MERGE_MAX) { 2339 return qemu_rdma_write_flush(rdma, errp); 2340 } 2341 2342 return 0; 2343 } 2344 2345 static void qemu_rdma_cleanup(RDMAContext *rdma) 2346 { 2347 Error *err = NULL; 2348 2349 if (rdma->cm_id && rdma->connected) { 2350 if ((rdma->errored || 2351 migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) && 2352 !rdma->received_error) { 2353 RDMAControlHeader head = { .len = 0, 2354 .type = RDMA_CONTROL_ERROR, 2355 .repeat = 1, 2356 }; 2357 warn_report("Early error. Sending error."); 2358 if (qemu_rdma_post_send_control(rdma, NULL, &head, &err) < 0) { 2359 warn_report_err(err); 2360 } 2361 } 2362 2363 rdma_disconnect(rdma->cm_id); 2364 trace_qemu_rdma_cleanup_disconnect(); 2365 rdma->connected = false; 2366 } 2367 2368 if (rdma->channel) { 2369 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 2370 } 2371 g_free(rdma->dest_blocks); 2372 rdma->dest_blocks = NULL; 2373 2374 for (int i = 0; i < RDMA_WRID_MAX; i++) { 2375 if (rdma->wr_data[i].control_mr) { 2376 rdma->total_registrations--; 2377 ibv_dereg_mr(rdma->wr_data[i].control_mr); 2378 } 2379 rdma->wr_data[i].control_mr = NULL; 2380 } 2381 2382 if (rdma->local_ram_blocks.block) { 2383 while (rdma->local_ram_blocks.nb_blocks) { 2384 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); 2385 } 2386 } 2387 2388 if (rdma->qp) { 2389 rdma_destroy_qp(rdma->cm_id); 2390 rdma->qp = NULL; 2391 } 2392 if (rdma->recv_cq) { 2393 ibv_destroy_cq(rdma->recv_cq); 2394 rdma->recv_cq = NULL; 2395 } 2396 if (rdma->send_cq) { 2397 ibv_destroy_cq(rdma->send_cq); 2398 rdma->send_cq = NULL; 2399 } 2400 if (rdma->recv_comp_channel) { 2401 ibv_destroy_comp_channel(rdma->recv_comp_channel); 2402 rdma->recv_comp_channel = NULL; 2403 } 2404 if (rdma->send_comp_channel) { 2405 ibv_destroy_comp_channel(rdma->send_comp_channel); 2406 rdma->send_comp_channel = NULL; 2407 } 2408 if (rdma->pd) { 2409 ibv_dealloc_pd(rdma->pd); 2410 rdma->pd = NULL; 2411 } 2412 if (rdma->cm_id) { 2413 rdma_destroy_id(rdma->cm_id); 2414 rdma->cm_id = NULL; 2415 } 2416 2417 /* the destination side, listen_id and channel is shared */ 2418 if (rdma->listen_id) { 2419 if (!rdma->is_return_path) { 2420 rdma_destroy_id(rdma->listen_id); 2421 } 2422 rdma->listen_id = NULL; 2423 2424 if (rdma->channel) { 2425 if (!rdma->is_return_path) { 2426 rdma_destroy_event_channel(rdma->channel); 2427 } 2428 rdma->channel = NULL; 2429 } 2430 } 2431 2432 if (rdma->channel) { 2433 rdma_destroy_event_channel(rdma->channel); 2434 rdma->channel = NULL; 2435 } 2436 g_free(rdma->host); 2437 rdma->host = NULL; 2438 } 2439 2440 2441 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp) 2442 { 2443 int ret; 2444 2445 /* 2446 * Will be validated against destination's actual capabilities 2447 * after the connect() completes. 2448 */ 2449 rdma->pin_all = pin_all; 2450 2451 ret = qemu_rdma_resolve_host(rdma, errp); 2452 if (ret < 0) { 2453 goto err_rdma_source_init; 2454 } 2455 2456 ret = qemu_rdma_alloc_pd_cq(rdma, errp); 2457 if (ret < 0) { 2458 goto err_rdma_source_init; 2459 } 2460 2461 ret = qemu_rdma_alloc_qp(rdma); 2462 if (ret < 0) { 2463 error_setg(errp, "RDMA ERROR: rdma migration: error allocating qp!"); 2464 goto err_rdma_source_init; 2465 } 2466 2467 qemu_rdma_init_ram_blocks(rdma); 2468 2469 /* Build the hash that maps from offset to RAMBlock */ 2470 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 2471 for (int i = 0; i < rdma->local_ram_blocks.nb_blocks; i++) { 2472 g_hash_table_insert(rdma->blockmap, 2473 (void *)(uintptr_t)rdma->local_ram_blocks.block[i].offset, 2474 &rdma->local_ram_blocks.block[i]); 2475 } 2476 2477 for (int i = 0; i < RDMA_WRID_MAX; i++) { 2478 ret = qemu_rdma_reg_control(rdma, i); 2479 if (ret < 0) { 2480 error_setg(errp, "RDMA ERROR: rdma migration: error " 2481 "registering %d control!", i); 2482 goto err_rdma_source_init; 2483 } 2484 } 2485 2486 return 0; 2487 2488 err_rdma_source_init: 2489 qemu_rdma_cleanup(rdma); 2490 return -1; 2491 } 2492 2493 static int qemu_get_cm_event_timeout(RDMAContext *rdma, 2494 struct rdma_cm_event **cm_event, 2495 long msec, Error **errp) 2496 { 2497 int ret; 2498 struct pollfd poll_fd = { 2499 .fd = rdma->channel->fd, 2500 .events = POLLIN, 2501 .revents = 0 2502 }; 2503 2504 do { 2505 ret = poll(&poll_fd, 1, msec); 2506 } while (ret < 0 && errno == EINTR); 2507 2508 if (ret == 0) { 2509 error_setg(errp, "RDMA ERROR: poll cm event timeout"); 2510 return -1; 2511 } else if (ret < 0) { 2512 error_setg(errp, "RDMA ERROR: failed to poll cm event, errno=%i", 2513 errno); 2514 return -1; 2515 } else if (poll_fd.revents & POLLIN) { 2516 if (rdma_get_cm_event(rdma->channel, cm_event) < 0) { 2517 error_setg(errp, "RDMA ERROR: failed to get cm event"); 2518 return -1; 2519 } 2520 return 0; 2521 } else { 2522 error_setg(errp, "RDMA ERROR: no POLLIN event, revent=%x", 2523 poll_fd.revents); 2524 return -1; 2525 } 2526 } 2527 2528 static int qemu_rdma_connect(RDMAContext *rdma, bool return_path, 2529 Error **errp) 2530 { 2531 RDMACapabilities cap = { 2532 .version = RDMA_CONTROL_VERSION_CURRENT, 2533 .flags = 0, 2534 }; 2535 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2536 .retry_count = 5, 2537 .private_data = &cap, 2538 .private_data_len = sizeof(cap), 2539 }; 2540 struct rdma_cm_event *cm_event; 2541 int ret; 2542 2543 /* 2544 * Only negotiate the capability with destination if the user 2545 * on the source first requested the capability. 2546 */ 2547 if (rdma->pin_all) { 2548 trace_qemu_rdma_connect_pin_all_requested(); 2549 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2550 } 2551 2552 caps_to_network(&cap); 2553 2554 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp); 2555 if (ret < 0) { 2556 goto err_rdma_source_connect; 2557 } 2558 2559 ret = rdma_connect(rdma->cm_id, &conn_param); 2560 if (ret < 0) { 2561 error_setg_errno(errp, errno, 2562 "RDMA ERROR: connecting to destination!"); 2563 goto err_rdma_source_connect; 2564 } 2565 2566 if (return_path) { 2567 ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp); 2568 } else { 2569 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2570 if (ret < 0) { 2571 error_setg_errno(errp, errno, 2572 "RDMA ERROR: failed to get cm event"); 2573 } 2574 } 2575 if (ret < 0) { 2576 goto err_rdma_source_connect; 2577 } 2578 2579 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2580 error_setg(errp, "RDMA ERROR: connecting to destination!"); 2581 rdma_ack_cm_event(cm_event); 2582 goto err_rdma_source_connect; 2583 } 2584 rdma->connected = true; 2585 2586 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2587 network_to_caps(&cap); 2588 2589 /* 2590 * Verify that the *requested* capabilities are supported by the destination 2591 * and disable them otherwise. 2592 */ 2593 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2594 warn_report("RDMA: Server cannot support pinning all memory. " 2595 "Will register memory dynamically."); 2596 rdma->pin_all = false; 2597 } 2598 2599 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2600 2601 rdma_ack_cm_event(cm_event); 2602 2603 rdma->control_ready_expected = 1; 2604 rdma->nb_sent = 0; 2605 return 0; 2606 2607 err_rdma_source_connect: 2608 qemu_rdma_cleanup(rdma); 2609 return -1; 2610 } 2611 2612 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2613 { 2614 Error *err = NULL; 2615 int ret; 2616 struct rdma_cm_id *listen_id; 2617 char ip[40] = "unknown"; 2618 struct rdma_addrinfo *res, *e; 2619 char port_str[16]; 2620 int reuse = 1; 2621 2622 for (int i = 0; i < RDMA_WRID_MAX; i++) { 2623 rdma->wr_data[i].control_len = 0; 2624 rdma->wr_data[i].control_curr = NULL; 2625 } 2626 2627 if (!rdma->host || !rdma->host[0]) { 2628 error_setg(errp, "RDMA ERROR: RDMA host is not set!"); 2629 rdma->errored = true; 2630 return -1; 2631 } 2632 /* create CM channel */ 2633 rdma->channel = rdma_create_event_channel(); 2634 if (!rdma->channel) { 2635 error_setg(errp, "RDMA ERROR: could not create rdma event channel"); 2636 rdma->errored = true; 2637 return -1; 2638 } 2639 2640 /* create CM id */ 2641 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2642 if (ret < 0) { 2643 error_setg(errp, "RDMA ERROR: could not create cm_id!"); 2644 goto err_dest_init_create_listen_id; 2645 } 2646 2647 snprintf(port_str, 16, "%d", rdma->port); 2648 port_str[15] = '\0'; 2649 2650 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2651 if (ret) { 2652 error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s", 2653 rdma->host); 2654 goto err_dest_init_bind_addr; 2655 } 2656 2657 ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR, 2658 &reuse, sizeof reuse); 2659 if (ret < 0) { 2660 error_setg(errp, "RDMA ERROR: Error: could not set REUSEADDR option"); 2661 goto err_dest_init_bind_addr; 2662 } 2663 2664 /* Try all addresses, saving the first error in @err */ 2665 for (e = res; e != NULL; e = e->ai_next) { 2666 Error **local_errp = err ? NULL : &err; 2667 2668 inet_ntop(e->ai_family, 2669 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2670 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2671 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2672 if (ret < 0) { 2673 continue; 2674 } 2675 if (e->ai_family == AF_INET6) { 2676 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, 2677 local_errp); 2678 if (ret < 0) { 2679 continue; 2680 } 2681 } 2682 error_free(err); 2683 break; 2684 } 2685 2686 rdma_freeaddrinfo(res); 2687 if (!e) { 2688 if (err) { 2689 error_propagate(errp, err); 2690 } else { 2691 error_setg(errp, "RDMA ERROR: Error: could not rdma_bind_addr!"); 2692 } 2693 goto err_dest_init_bind_addr; 2694 } 2695 2696 rdma->listen_id = listen_id; 2697 qemu_rdma_dump_gid("dest_init", listen_id); 2698 return 0; 2699 2700 err_dest_init_bind_addr: 2701 rdma_destroy_id(listen_id); 2702 err_dest_init_create_listen_id: 2703 rdma_destroy_event_channel(rdma->channel); 2704 rdma->channel = NULL; 2705 rdma->errored = true; 2706 return -1; 2707 2708 } 2709 2710 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path, 2711 RDMAContext *rdma) 2712 { 2713 for (int i = 0; i < RDMA_WRID_MAX; i++) { 2714 rdma_return_path->wr_data[i].control_len = 0; 2715 rdma_return_path->wr_data[i].control_curr = NULL; 2716 } 2717 2718 /*the CM channel and CM id is shared*/ 2719 rdma_return_path->channel = rdma->channel; 2720 rdma_return_path->listen_id = rdma->listen_id; 2721 2722 rdma->return_path = rdma_return_path; 2723 rdma_return_path->return_path = rdma; 2724 rdma_return_path->is_return_path = true; 2725 } 2726 2727 static RDMAContext *qemu_rdma_data_init(InetSocketAddress *saddr, Error **errp) 2728 { 2729 RDMAContext *rdma = NULL; 2730 2731 rdma = g_new0(RDMAContext, 1); 2732 rdma->current_index = -1; 2733 rdma->current_chunk = -1; 2734 2735 rdma->host = g_strdup(saddr->host); 2736 rdma->port = atoi(saddr->port); 2737 return rdma; 2738 } 2739 2740 /* 2741 * QEMUFile interface to the control channel. 2742 * SEND messages for control only. 2743 * VM's ram is handled with regular RDMA messages. 2744 */ 2745 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, 2746 const struct iovec *iov, 2747 size_t niov, 2748 int *fds, 2749 size_t nfds, 2750 int flags, 2751 Error **errp) 2752 { 2753 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2754 RDMAContext *rdma; 2755 int ret; 2756 ssize_t done = 0; 2757 size_t len; 2758 2759 RCU_READ_LOCK_GUARD(); 2760 rdma = qatomic_rcu_read(&rioc->rdmaout); 2761 2762 if (!rdma) { 2763 error_setg(errp, "RDMA control channel output is not set"); 2764 return -1; 2765 } 2766 2767 if (rdma->errored) { 2768 error_setg(errp, 2769 "RDMA is in an error state waiting migration to abort!"); 2770 return -1; 2771 } 2772 2773 /* 2774 * Push out any writes that 2775 * we're queued up for VM's ram. 2776 */ 2777 ret = qemu_rdma_write_flush(rdma, errp); 2778 if (ret < 0) { 2779 rdma->errored = true; 2780 return -1; 2781 } 2782 2783 for (int i = 0; i < niov; i++) { 2784 size_t remaining = iov[i].iov_len; 2785 uint8_t * data = (void *)iov[i].iov_base; 2786 while (remaining) { 2787 RDMAControlHeader head = {}; 2788 2789 len = MIN(remaining, RDMA_SEND_INCREMENT); 2790 remaining -= len; 2791 2792 head.len = len; 2793 head.type = RDMA_CONTROL_QEMU_FILE; 2794 2795 ret = qemu_rdma_exchange_send(rdma, &head, 2796 data, NULL, NULL, NULL, errp); 2797 2798 if (ret < 0) { 2799 rdma->errored = true; 2800 return -1; 2801 } 2802 2803 data += len; 2804 done += len; 2805 } 2806 } 2807 2808 return done; 2809 } 2810 2811 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2812 size_t size, int idx) 2813 { 2814 size_t len = 0; 2815 2816 if (rdma->wr_data[idx].control_len) { 2817 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2818 2819 len = MIN(size, rdma->wr_data[idx].control_len); 2820 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2821 rdma->wr_data[idx].control_curr += len; 2822 rdma->wr_data[idx].control_len -= len; 2823 } 2824 2825 return len; 2826 } 2827 2828 /* 2829 * QEMUFile interface to the control channel. 2830 * RDMA links don't use bytestreams, so we have to 2831 * return bytes to QEMUFile opportunistically. 2832 */ 2833 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, 2834 const struct iovec *iov, 2835 size_t niov, 2836 int **fds, 2837 size_t *nfds, 2838 int flags, 2839 Error **errp) 2840 { 2841 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2842 RDMAContext *rdma; 2843 RDMAControlHeader head; 2844 int ret; 2845 ssize_t done = 0; 2846 size_t len; 2847 2848 RCU_READ_LOCK_GUARD(); 2849 rdma = qatomic_rcu_read(&rioc->rdmain); 2850 2851 if (!rdma) { 2852 error_setg(errp, "RDMA control channel input is not set"); 2853 return -1; 2854 } 2855 2856 if (rdma->errored) { 2857 error_setg(errp, 2858 "RDMA is in an error state waiting migration to abort!"); 2859 return -1; 2860 } 2861 2862 for (int i = 0; i < niov; i++) { 2863 size_t want = iov[i].iov_len; 2864 uint8_t *data = (void *)iov[i].iov_base; 2865 2866 /* 2867 * First, we hold on to the last SEND message we 2868 * were given and dish out the bytes until we run 2869 * out of bytes. 2870 */ 2871 len = qemu_rdma_fill(rdma, data, want, 0); 2872 done += len; 2873 want -= len; 2874 /* Got what we needed, so go to next iovec */ 2875 if (want == 0) { 2876 continue; 2877 } 2878 2879 /* If we got any data so far, then don't wait 2880 * for more, just return what we have */ 2881 if (done > 0) { 2882 break; 2883 } 2884 2885 2886 /* We've got nothing at all, so lets wait for 2887 * more to arrive 2888 */ 2889 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE, 2890 errp); 2891 2892 if (ret < 0) { 2893 rdma->errored = true; 2894 return -1; 2895 } 2896 2897 /* 2898 * SEND was received with new bytes, now try again. 2899 */ 2900 len = qemu_rdma_fill(rdma, data, want, 0); 2901 done += len; 2902 want -= len; 2903 2904 /* Still didn't get enough, so lets just return */ 2905 if (want) { 2906 if (done == 0) { 2907 return QIO_CHANNEL_ERR_BLOCK; 2908 } else { 2909 break; 2910 } 2911 } 2912 } 2913 return done; 2914 } 2915 2916 /* 2917 * Block until all the outstanding chunks have been delivered by the hardware. 2918 */ 2919 static int qemu_rdma_drain_cq(RDMAContext *rdma) 2920 { 2921 Error *err = NULL; 2922 2923 if (qemu_rdma_write_flush(rdma, &err) < 0) { 2924 error_report_err(err); 2925 return -1; 2926 } 2927 2928 while (rdma->nb_sent) { 2929 if (qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL) < 0) { 2930 error_report("rdma migration: complete polling error!"); 2931 return -1; 2932 } 2933 } 2934 2935 qemu_rdma_unregister_waiting(rdma); 2936 2937 return 0; 2938 } 2939 2940 2941 static int qio_channel_rdma_set_blocking(QIOChannel *ioc, 2942 bool blocking, 2943 Error **errp) 2944 { 2945 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2946 /* XXX we should make readv/writev actually honour this :-) */ 2947 rioc->blocking = blocking; 2948 return 0; 2949 } 2950 2951 2952 typedef struct QIOChannelRDMASource QIOChannelRDMASource; 2953 struct QIOChannelRDMASource { 2954 GSource parent; 2955 QIOChannelRDMA *rioc; 2956 GIOCondition condition; 2957 }; 2958 2959 static gboolean 2960 qio_channel_rdma_source_prepare(GSource *source, 2961 gint *timeout) 2962 { 2963 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2964 RDMAContext *rdma; 2965 GIOCondition cond = 0; 2966 *timeout = -1; 2967 2968 RCU_READ_LOCK_GUARD(); 2969 if (rsource->condition == G_IO_IN) { 2970 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 2971 } else { 2972 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 2973 } 2974 2975 if (!rdma) { 2976 error_report("RDMAContext is NULL when prepare Gsource"); 2977 return FALSE; 2978 } 2979 2980 if (rdma->wr_data[0].control_len) { 2981 cond |= G_IO_IN; 2982 } 2983 cond |= G_IO_OUT; 2984 2985 return cond & rsource->condition; 2986 } 2987 2988 static gboolean 2989 qio_channel_rdma_source_check(GSource *source) 2990 { 2991 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2992 RDMAContext *rdma; 2993 GIOCondition cond = 0; 2994 2995 RCU_READ_LOCK_GUARD(); 2996 if (rsource->condition == G_IO_IN) { 2997 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 2998 } else { 2999 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 3000 } 3001 3002 if (!rdma) { 3003 error_report("RDMAContext is NULL when check Gsource"); 3004 return FALSE; 3005 } 3006 3007 if (rdma->wr_data[0].control_len) { 3008 cond |= G_IO_IN; 3009 } 3010 cond |= G_IO_OUT; 3011 3012 return cond & rsource->condition; 3013 } 3014 3015 static gboolean 3016 qio_channel_rdma_source_dispatch(GSource *source, 3017 GSourceFunc callback, 3018 gpointer user_data) 3019 { 3020 QIOChannelFunc func = (QIOChannelFunc)callback; 3021 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 3022 RDMAContext *rdma; 3023 GIOCondition cond = 0; 3024 3025 RCU_READ_LOCK_GUARD(); 3026 if (rsource->condition == G_IO_IN) { 3027 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 3028 } else { 3029 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 3030 } 3031 3032 if (!rdma) { 3033 error_report("RDMAContext is NULL when dispatch Gsource"); 3034 return FALSE; 3035 } 3036 3037 if (rdma->wr_data[0].control_len) { 3038 cond |= G_IO_IN; 3039 } 3040 cond |= G_IO_OUT; 3041 3042 return (*func)(QIO_CHANNEL(rsource->rioc), 3043 (cond & rsource->condition), 3044 user_data); 3045 } 3046 3047 static void 3048 qio_channel_rdma_source_finalize(GSource *source) 3049 { 3050 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source; 3051 3052 object_unref(OBJECT(ssource->rioc)); 3053 } 3054 3055 static GSourceFuncs qio_channel_rdma_source_funcs = { 3056 qio_channel_rdma_source_prepare, 3057 qio_channel_rdma_source_check, 3058 qio_channel_rdma_source_dispatch, 3059 qio_channel_rdma_source_finalize 3060 }; 3061 3062 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, 3063 GIOCondition condition) 3064 { 3065 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3066 QIOChannelRDMASource *ssource; 3067 GSource *source; 3068 3069 source = g_source_new(&qio_channel_rdma_source_funcs, 3070 sizeof(QIOChannelRDMASource)); 3071 ssource = (QIOChannelRDMASource *)source; 3072 3073 ssource->rioc = rioc; 3074 object_ref(OBJECT(rioc)); 3075 3076 ssource->condition = condition; 3077 3078 return source; 3079 } 3080 3081 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, 3082 AioContext *read_ctx, 3083 IOHandler *io_read, 3084 AioContext *write_ctx, 3085 IOHandler *io_write, 3086 void *opaque) 3087 { 3088 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3089 if (io_read) { 3090 aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd, 3091 io_read, io_write, NULL, NULL, opaque); 3092 aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd, 3093 io_read, io_write, NULL, NULL, opaque); 3094 } else { 3095 aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd, 3096 io_read, io_write, NULL, NULL, opaque); 3097 aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd, 3098 io_read, io_write, NULL, NULL, opaque); 3099 } 3100 } 3101 3102 struct rdma_close_rcu { 3103 struct rcu_head rcu; 3104 RDMAContext *rdmain; 3105 RDMAContext *rdmaout; 3106 }; 3107 3108 /* callback from qio_channel_rdma_close via call_rcu */ 3109 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu) 3110 { 3111 if (rcu->rdmain) { 3112 qemu_rdma_cleanup(rcu->rdmain); 3113 } 3114 3115 if (rcu->rdmaout) { 3116 qemu_rdma_cleanup(rcu->rdmaout); 3117 } 3118 3119 g_free(rcu->rdmain); 3120 g_free(rcu->rdmaout); 3121 g_free(rcu); 3122 } 3123 3124 static int qio_channel_rdma_close(QIOChannel *ioc, 3125 Error **errp) 3126 { 3127 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3128 RDMAContext *rdmain, *rdmaout; 3129 struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1); 3130 3131 trace_qemu_rdma_close(); 3132 3133 rdmain = rioc->rdmain; 3134 if (rdmain) { 3135 qatomic_rcu_set(&rioc->rdmain, NULL); 3136 } 3137 3138 rdmaout = rioc->rdmaout; 3139 if (rdmaout) { 3140 qatomic_rcu_set(&rioc->rdmaout, NULL); 3141 } 3142 3143 rcu->rdmain = rdmain; 3144 rcu->rdmaout = rdmaout; 3145 call_rcu(rcu, qio_channel_rdma_close_rcu, rcu); 3146 3147 return 0; 3148 } 3149 3150 static int 3151 qio_channel_rdma_shutdown(QIOChannel *ioc, 3152 QIOChannelShutdown how, 3153 Error **errp) 3154 { 3155 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3156 RDMAContext *rdmain, *rdmaout; 3157 3158 RCU_READ_LOCK_GUARD(); 3159 3160 rdmain = qatomic_rcu_read(&rioc->rdmain); 3161 rdmaout = qatomic_rcu_read(&rioc->rdmain); 3162 3163 switch (how) { 3164 case QIO_CHANNEL_SHUTDOWN_READ: 3165 if (rdmain) { 3166 rdmain->errored = true; 3167 } 3168 break; 3169 case QIO_CHANNEL_SHUTDOWN_WRITE: 3170 if (rdmaout) { 3171 rdmaout->errored = true; 3172 } 3173 break; 3174 case QIO_CHANNEL_SHUTDOWN_BOTH: 3175 default: 3176 if (rdmain) { 3177 rdmain->errored = true; 3178 } 3179 if (rdmaout) { 3180 rdmaout->errored = true; 3181 } 3182 break; 3183 } 3184 3185 return 0; 3186 } 3187 3188 /* 3189 * Parameters: 3190 * @offset == 0 : 3191 * This means that 'block_offset' is a full virtual address that does not 3192 * belong to a RAMBlock of the virtual machine and instead 3193 * represents a private malloc'd memory area that the caller wishes to 3194 * transfer. 3195 * 3196 * @offset != 0 : 3197 * Offset is an offset to be added to block_offset and used 3198 * to also lookup the corresponding RAMBlock. 3199 * 3200 * @size : Number of bytes to transfer 3201 * 3202 * @pages_sent : User-specificed pointer to indicate how many pages were 3203 * sent. Usually, this will not be more than a few bytes of 3204 * the protocol because most transfers are sent asynchronously. 3205 */ 3206 static int qemu_rdma_save_page(QEMUFile *f, ram_addr_t block_offset, 3207 ram_addr_t offset, size_t size) 3208 { 3209 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3210 Error *err = NULL; 3211 RDMAContext *rdma; 3212 int ret; 3213 3214 RCU_READ_LOCK_GUARD(); 3215 rdma = qatomic_rcu_read(&rioc->rdmaout); 3216 3217 if (!rdma) { 3218 return -1; 3219 } 3220 3221 if (rdma_errored(rdma)) { 3222 return -1; 3223 } 3224 3225 qemu_fflush(f); 3226 3227 /* 3228 * Add this page to the current 'chunk'. If the chunk 3229 * is full, or the page doesn't belong to the current chunk, 3230 * an actual RDMA write will occur and a new chunk will be formed. 3231 */ 3232 ret = qemu_rdma_write(rdma, block_offset, offset, size, &err); 3233 if (ret < 0) { 3234 error_report_err(err); 3235 goto err; 3236 } 3237 3238 /* 3239 * Drain the Completion Queue if possible, but do not block, 3240 * just poll. 3241 * 3242 * If nothing to poll, the end of the iteration will do this 3243 * again to make sure we don't overflow the request queue. 3244 */ 3245 while (1) { 3246 uint64_t wr_id, wr_id_in; 3247 ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL); 3248 3249 if (ret < 0) { 3250 error_report("rdma migration: polling error"); 3251 goto err; 3252 } 3253 3254 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 3255 3256 if (wr_id == RDMA_WRID_NONE) { 3257 break; 3258 } 3259 } 3260 3261 while (1) { 3262 uint64_t wr_id, wr_id_in; 3263 ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL); 3264 3265 if (ret < 0) { 3266 error_report("rdma migration: polling error"); 3267 goto err; 3268 } 3269 3270 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 3271 3272 if (wr_id == RDMA_WRID_NONE) { 3273 break; 3274 } 3275 } 3276 3277 return RAM_SAVE_CONTROL_DELAYED; 3278 3279 err: 3280 rdma->errored = true; 3281 return -1; 3282 } 3283 3284 int rdma_control_save_page(QEMUFile *f, ram_addr_t block_offset, 3285 ram_addr_t offset, size_t size) 3286 { 3287 if (!migrate_rdma() || migration_in_postcopy()) { 3288 return RAM_SAVE_CONTROL_NOT_SUPP; 3289 } 3290 3291 int ret = qemu_rdma_save_page(f, block_offset, offset, size); 3292 3293 if (ret != RAM_SAVE_CONTROL_DELAYED && 3294 ret != RAM_SAVE_CONTROL_NOT_SUPP) { 3295 if (ret < 0) { 3296 qemu_file_set_error(f, ret); 3297 } 3298 } 3299 return ret; 3300 } 3301 3302 static void rdma_accept_incoming_migration(void *opaque); 3303 3304 static void rdma_cm_poll_handler(void *opaque) 3305 { 3306 RDMAContext *rdma = opaque; 3307 struct rdma_cm_event *cm_event; 3308 MigrationIncomingState *mis = migration_incoming_get_current(); 3309 3310 if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) { 3311 error_report("get_cm_event failed %d", errno); 3312 return; 3313 } 3314 3315 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 3316 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 3317 if (!rdma->errored && 3318 migration_incoming_get_current()->state != 3319 MIGRATION_STATUS_COMPLETED) { 3320 error_report("receive cm event, cm event is %d", cm_event->event); 3321 rdma->errored = true; 3322 if (rdma->return_path) { 3323 rdma->return_path->errored = true; 3324 } 3325 } 3326 rdma_ack_cm_event(cm_event); 3327 if (mis->loadvm_co) { 3328 qemu_coroutine_enter(mis->loadvm_co); 3329 } 3330 return; 3331 } 3332 rdma_ack_cm_event(cm_event); 3333 } 3334 3335 static int qemu_rdma_accept(RDMAContext *rdma) 3336 { 3337 Error *err = NULL; 3338 RDMACapabilities cap; 3339 struct rdma_conn_param conn_param = { 3340 .responder_resources = 2, 3341 .private_data = &cap, 3342 .private_data_len = sizeof(cap), 3343 }; 3344 RDMAContext *rdma_return_path = NULL; 3345 g_autoptr(InetSocketAddress) isock = g_new0(InetSocketAddress, 1); 3346 struct rdma_cm_event *cm_event; 3347 struct ibv_context *verbs; 3348 int ret; 3349 3350 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3351 if (ret < 0) { 3352 goto err_rdma_dest_wait; 3353 } 3354 3355 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 3356 rdma_ack_cm_event(cm_event); 3357 goto err_rdma_dest_wait; 3358 } 3359 3360 isock->host = rdma->host; 3361 isock->port = g_strdup_printf("%d", rdma->port); 3362 3363 /* 3364 * initialize the RDMAContext for return path for postcopy after first 3365 * connection request reached. 3366 */ 3367 if ((migrate_postcopy() || migrate_return_path()) 3368 && !rdma->is_return_path) { 3369 rdma_return_path = qemu_rdma_data_init(isock, NULL); 3370 if (rdma_return_path == NULL) { 3371 rdma_ack_cm_event(cm_event); 3372 goto err_rdma_dest_wait; 3373 } 3374 3375 qemu_rdma_return_path_dest_init(rdma_return_path, rdma); 3376 } 3377 3378 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 3379 3380 network_to_caps(&cap); 3381 3382 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 3383 error_report("Unknown source RDMA version: %d, bailing...", 3384 cap.version); 3385 rdma_ack_cm_event(cm_event); 3386 goto err_rdma_dest_wait; 3387 } 3388 3389 /* 3390 * Respond with only the capabilities this version of QEMU knows about. 3391 */ 3392 cap.flags &= known_capabilities; 3393 3394 /* 3395 * Enable the ones that we do know about. 3396 * Add other checks here as new ones are introduced. 3397 */ 3398 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 3399 rdma->pin_all = true; 3400 } 3401 3402 rdma->cm_id = cm_event->id; 3403 verbs = cm_event->id->verbs; 3404 3405 rdma_ack_cm_event(cm_event); 3406 3407 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 3408 3409 caps_to_network(&cap); 3410 3411 trace_qemu_rdma_accept_pin_verbsc(verbs); 3412 3413 if (!rdma->verbs) { 3414 rdma->verbs = verbs; 3415 } else if (rdma->verbs != verbs) { 3416 error_report("ibv context not matching %p, %p!", rdma->verbs, 3417 verbs); 3418 goto err_rdma_dest_wait; 3419 } 3420 3421 qemu_rdma_dump_id("dest_init", verbs); 3422 3423 ret = qemu_rdma_alloc_pd_cq(rdma, &err); 3424 if (ret < 0) { 3425 error_report_err(err); 3426 goto err_rdma_dest_wait; 3427 } 3428 3429 ret = qemu_rdma_alloc_qp(rdma); 3430 if (ret < 0) { 3431 error_report("rdma migration: error allocating qp!"); 3432 goto err_rdma_dest_wait; 3433 } 3434 3435 qemu_rdma_init_ram_blocks(rdma); 3436 3437 for (int i = 0; i < RDMA_WRID_MAX; i++) { 3438 ret = qemu_rdma_reg_control(rdma, i); 3439 if (ret < 0) { 3440 error_report("rdma: error registering %d control", i); 3441 goto err_rdma_dest_wait; 3442 } 3443 } 3444 3445 /* Accept the second connection request for return path */ 3446 if ((migrate_postcopy() || migrate_return_path()) 3447 && !rdma->is_return_path) { 3448 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3449 NULL, 3450 (void *)(intptr_t)rdma->return_path); 3451 } else { 3452 qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, 3453 NULL, rdma); 3454 } 3455 3456 ret = rdma_accept(rdma->cm_id, &conn_param); 3457 if (ret < 0) { 3458 error_report("rdma_accept failed"); 3459 goto err_rdma_dest_wait; 3460 } 3461 3462 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3463 if (ret < 0) { 3464 error_report("rdma_accept get_cm_event failed"); 3465 goto err_rdma_dest_wait; 3466 } 3467 3468 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 3469 error_report("rdma_accept not event established"); 3470 rdma_ack_cm_event(cm_event); 3471 goto err_rdma_dest_wait; 3472 } 3473 3474 rdma_ack_cm_event(cm_event); 3475 rdma->connected = true; 3476 3477 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, &err); 3478 if (ret < 0) { 3479 error_report_err(err); 3480 goto err_rdma_dest_wait; 3481 } 3482 3483 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 3484 3485 return 0; 3486 3487 err_rdma_dest_wait: 3488 rdma->errored = true; 3489 qemu_rdma_cleanup(rdma); 3490 g_free(rdma_return_path); 3491 return -1; 3492 } 3493 3494 static int dest_ram_sort_func(const void *a, const void *b) 3495 { 3496 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index; 3497 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index; 3498 3499 return (a_index < b_index) ? -1 : (a_index != b_index); 3500 } 3501 3502 /* 3503 * During each iteration of the migration, we listen for instructions 3504 * by the source VM to perform dynamic page registrations before they 3505 * can perform RDMA operations. 3506 * 3507 * We respond with the 'rkey'. 3508 * 3509 * Keep doing this until the source tells us to stop. 3510 */ 3511 int rdma_registration_handle(QEMUFile *f) 3512 { 3513 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 3514 .type = RDMA_CONTROL_REGISTER_RESULT, 3515 .repeat = 0, 3516 }; 3517 RDMAControlHeader unreg_resp = { .len = 0, 3518 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 3519 .repeat = 0, 3520 }; 3521 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 3522 .repeat = 1 }; 3523 QIOChannelRDMA *rioc; 3524 Error *err = NULL; 3525 RDMAContext *rdma; 3526 RDMALocalBlocks *local; 3527 RDMAControlHeader head; 3528 RDMARegister *reg, *registers; 3529 RDMACompress *comp; 3530 RDMARegisterResult *reg_result; 3531 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 3532 RDMALocalBlock *block; 3533 void *host_addr; 3534 int ret; 3535 int idx = 0; 3536 3537 if (!migrate_rdma()) { 3538 return 0; 3539 } 3540 3541 RCU_READ_LOCK_GUARD(); 3542 rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3543 rdma = qatomic_rcu_read(&rioc->rdmain); 3544 3545 if (!rdma) { 3546 return -1; 3547 } 3548 3549 if (rdma_errored(rdma)) { 3550 return -1; 3551 } 3552 3553 local = &rdma->local_ram_blocks; 3554 do { 3555 trace_rdma_registration_handle_wait(); 3556 3557 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE, &err); 3558 3559 if (ret < 0) { 3560 error_report_err(err); 3561 break; 3562 } 3563 3564 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 3565 error_report("rdma: Too many requests in this message (%d)." 3566 "Bailing.", head.repeat); 3567 break; 3568 } 3569 3570 switch (head.type) { 3571 case RDMA_CONTROL_COMPRESS: 3572 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 3573 network_to_compress(comp); 3574 3575 trace_rdma_registration_handle_compress(comp->length, 3576 comp->block_idx, 3577 comp->offset); 3578 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) { 3579 error_report("rdma: 'compress' bad block index %u (vs %d)", 3580 (unsigned int)comp->block_idx, 3581 rdma->local_ram_blocks.nb_blocks); 3582 goto err; 3583 } 3584 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 3585 3586 host_addr = block->local_host_addr + 3587 (comp->offset - block->offset); 3588 if (comp->value) { 3589 error_report("rdma: Zero page with non-zero (%d) value", 3590 comp->value); 3591 goto err; 3592 } 3593 ram_handle_zero(host_addr, comp->length); 3594 break; 3595 3596 case RDMA_CONTROL_REGISTER_FINISHED: 3597 trace_rdma_registration_handle_finished(); 3598 return 0; 3599 3600 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 3601 trace_rdma_registration_handle_ram_blocks(); 3602 3603 /* Sort our local RAM Block list so it's the same as the source, 3604 * we can do this since we've filled in a src_index in the list 3605 * as we received the RAMBlock list earlier. 3606 */ 3607 qsort(rdma->local_ram_blocks.block, 3608 rdma->local_ram_blocks.nb_blocks, 3609 sizeof(RDMALocalBlock), dest_ram_sort_func); 3610 for (int i = 0; i < local->nb_blocks; i++) { 3611 local->block[i].index = i; 3612 } 3613 3614 if (rdma->pin_all) { 3615 ret = qemu_rdma_reg_whole_ram_blocks(rdma, &err); 3616 if (ret < 0) { 3617 error_report_err(err); 3618 goto err; 3619 } 3620 } 3621 3622 /* 3623 * Dest uses this to prepare to transmit the RAMBlock descriptions 3624 * to the source VM after connection setup. 3625 * Both sides use the "remote" structure to communicate and update 3626 * their "local" descriptions with what was sent. 3627 */ 3628 for (int i = 0; i < local->nb_blocks; i++) { 3629 rdma->dest_blocks[i].remote_host_addr = 3630 (uintptr_t)(local->block[i].local_host_addr); 3631 3632 if (rdma->pin_all) { 3633 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 3634 } 3635 3636 rdma->dest_blocks[i].offset = local->block[i].offset; 3637 rdma->dest_blocks[i].length = local->block[i].length; 3638 3639 dest_block_to_network(&rdma->dest_blocks[i]); 3640 trace_rdma_registration_handle_ram_blocks_loop( 3641 local->block[i].block_name, 3642 local->block[i].offset, 3643 local->block[i].length, 3644 local->block[i].local_host_addr, 3645 local->block[i].src_index); 3646 } 3647 3648 blocks.len = rdma->local_ram_blocks.nb_blocks 3649 * sizeof(RDMADestBlock); 3650 3651 3652 ret = qemu_rdma_post_send_control(rdma, 3653 (uint8_t *) rdma->dest_blocks, &blocks, 3654 &err); 3655 3656 if (ret < 0) { 3657 error_report_err(err); 3658 goto err; 3659 } 3660 3661 break; 3662 case RDMA_CONTROL_REGISTER_REQUEST: 3663 trace_rdma_registration_handle_register(head.repeat); 3664 3665 reg_resp.repeat = head.repeat; 3666 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3667 3668 for (int count = 0; count < head.repeat; count++) { 3669 uint64_t chunk; 3670 uint8_t *chunk_start, *chunk_end; 3671 3672 reg = ®isters[count]; 3673 network_to_register(reg); 3674 3675 reg_result = &results[count]; 3676 3677 trace_rdma_registration_handle_register_loop(count, 3678 reg->current_index, reg->key.current_addr, reg->chunks); 3679 3680 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) { 3681 error_report("rdma: 'register' bad block index %u (vs %d)", 3682 (unsigned int)reg->current_index, 3683 rdma->local_ram_blocks.nb_blocks); 3684 goto err; 3685 } 3686 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3687 if (block->is_ram_block) { 3688 if (block->offset > reg->key.current_addr) { 3689 error_report("rdma: bad register address for block %s" 3690 " offset: %" PRIx64 " current_addr: %" PRIx64, 3691 block->block_name, block->offset, 3692 reg->key.current_addr); 3693 goto err; 3694 } 3695 host_addr = (block->local_host_addr + 3696 (reg->key.current_addr - block->offset)); 3697 chunk = ram_chunk_index(block->local_host_addr, 3698 (uint8_t *) host_addr); 3699 } else { 3700 chunk = reg->key.chunk; 3701 host_addr = block->local_host_addr + 3702 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3703 /* Check for particularly bad chunk value */ 3704 if (host_addr < (void *)block->local_host_addr) { 3705 error_report("rdma: bad chunk for block %s" 3706 " chunk: %" PRIx64, 3707 block->block_name, reg->key.chunk); 3708 goto err; 3709 } 3710 } 3711 chunk_start = ram_chunk_start(block, chunk); 3712 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3713 /* avoid "-Waddress-of-packed-member" warning */ 3714 uint32_t tmp_rkey = 0; 3715 if (qemu_rdma_register_and_get_keys(rdma, block, 3716 (uintptr_t)host_addr, NULL, &tmp_rkey, 3717 chunk, chunk_start, chunk_end)) { 3718 error_report("cannot get rkey"); 3719 goto err; 3720 } 3721 reg_result->rkey = tmp_rkey; 3722 3723 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3724 3725 trace_rdma_registration_handle_register_rkey(reg_result->rkey); 3726 3727 result_to_network(reg_result); 3728 } 3729 3730 ret = qemu_rdma_post_send_control(rdma, 3731 (uint8_t *) results, ®_resp, &err); 3732 3733 if (ret < 0) { 3734 error_report_err(err); 3735 goto err; 3736 } 3737 break; 3738 case RDMA_CONTROL_UNREGISTER_REQUEST: 3739 trace_rdma_registration_handle_unregister(head.repeat); 3740 unreg_resp.repeat = head.repeat; 3741 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3742 3743 for (int count = 0; count < head.repeat; count++) { 3744 reg = ®isters[count]; 3745 network_to_register(reg); 3746 3747 trace_rdma_registration_handle_unregister_loop(count, 3748 reg->current_index, reg->key.chunk); 3749 3750 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3751 3752 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3753 block->pmr[reg->key.chunk] = NULL; 3754 3755 if (ret != 0) { 3756 error_report("rdma unregistration chunk failed: %s", 3757 strerror(errno)); 3758 goto err; 3759 } 3760 3761 rdma->total_registrations--; 3762 3763 trace_rdma_registration_handle_unregister_success(reg->key.chunk); 3764 } 3765 3766 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp, &err); 3767 3768 if (ret < 0) { 3769 error_report_err(err); 3770 goto err; 3771 } 3772 break; 3773 case RDMA_CONTROL_REGISTER_RESULT: 3774 error_report("Invalid RESULT message at dest."); 3775 goto err; 3776 default: 3777 error_report("Unknown control message %s", control_desc(head.type)); 3778 goto err; 3779 } 3780 } while (1); 3781 3782 err: 3783 rdma->errored = true; 3784 return -1; 3785 } 3786 3787 /* Destination: 3788 * Called during the initial RAM load section which lists the 3789 * RAMBlocks by name. This lets us know the order of the RAMBlocks on 3790 * the source. We've already built our local RAMBlock list, but not 3791 * yet sent the list to the source. 3792 */ 3793 int rdma_block_notification_handle(QEMUFile *f, const char *name) 3794 { 3795 int curr; 3796 int found = -1; 3797 3798 if (!migrate_rdma()) { 3799 return 0; 3800 } 3801 3802 RCU_READ_LOCK_GUARD(); 3803 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3804 RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmain); 3805 3806 if (!rdma) { 3807 return -1; 3808 } 3809 3810 /* Find the matching RAMBlock in our local list */ 3811 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { 3812 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { 3813 found = curr; 3814 break; 3815 } 3816 } 3817 3818 if (found == -1) { 3819 error_report("RAMBlock '%s' not found on destination", name); 3820 return -1; 3821 } 3822 3823 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index; 3824 trace_rdma_block_notification_handle(name, rdma->next_src_index); 3825 rdma->next_src_index++; 3826 3827 return 0; 3828 } 3829 3830 int rdma_registration_start(QEMUFile *f, uint64_t flags) 3831 { 3832 if (!migrate_rdma() || migration_in_postcopy()) { 3833 return 0; 3834 } 3835 3836 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3837 RCU_READ_LOCK_GUARD(); 3838 RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmaout); 3839 if (!rdma) { 3840 return -1; 3841 } 3842 3843 if (rdma_errored(rdma)) { 3844 return -1; 3845 } 3846 3847 trace_rdma_registration_start(flags); 3848 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3849 return qemu_fflush(f); 3850 } 3851 3852 /* 3853 * Inform dest that dynamic registrations are done for now. 3854 * First, flush writes, if any. 3855 */ 3856 int rdma_registration_stop(QEMUFile *f, uint64_t flags) 3857 { 3858 QIOChannelRDMA *rioc; 3859 Error *err = NULL; 3860 RDMAContext *rdma; 3861 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3862 int ret; 3863 3864 if (!migrate_rdma() || migration_in_postcopy()) { 3865 return 0; 3866 } 3867 3868 RCU_READ_LOCK_GUARD(); 3869 rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3870 rdma = qatomic_rcu_read(&rioc->rdmaout); 3871 if (!rdma) { 3872 return -1; 3873 } 3874 3875 if (rdma_errored(rdma)) { 3876 return -1; 3877 } 3878 3879 qemu_fflush(f); 3880 ret = qemu_rdma_drain_cq(rdma); 3881 3882 if (ret < 0) { 3883 goto err; 3884 } 3885 3886 if (flags == RAM_CONTROL_SETUP) { 3887 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3888 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3889 int reg_result_idx, nb_dest_blocks; 3890 3891 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3892 trace_rdma_registration_stop_ram(); 3893 3894 /* 3895 * Make sure that we parallelize the pinning on both sides. 3896 * For very large guests, doing this serially takes a really 3897 * long time, so we have to 'interleave' the pinning locally 3898 * with the control messages by performing the pinning on this 3899 * side before we receive the control response from the other 3900 * side that the pinning has completed. 3901 */ 3902 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3903 ®_result_idx, rdma->pin_all ? 3904 qemu_rdma_reg_whole_ram_blocks : NULL, 3905 &err); 3906 if (ret < 0) { 3907 error_report_err(err); 3908 return -1; 3909 } 3910 3911 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3912 3913 /* 3914 * The protocol uses two different sets of rkeys (mutually exclusive): 3915 * 1. One key to represent the virtual address of the entire ram block. 3916 * (dynamic chunk registration disabled - pin everything with one rkey.) 3917 * 2. One to represent individual chunks within a ram block. 3918 * (dynamic chunk registration enabled - pin individual chunks.) 3919 * 3920 * Once the capability is successfully negotiated, the destination transmits 3921 * the keys to use (or sends them later) including the virtual addresses 3922 * and then propagates the remote ram block descriptions to his local copy. 3923 */ 3924 3925 if (local->nb_blocks != nb_dest_blocks) { 3926 error_report("ram blocks mismatch (Number of blocks %d vs %d)", 3927 local->nb_blocks, nb_dest_blocks); 3928 error_printf("Your QEMU command line parameters are probably " 3929 "not identical on both the source and destination."); 3930 rdma->errored = true; 3931 return -1; 3932 } 3933 3934 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 3935 memcpy(rdma->dest_blocks, 3936 rdma->wr_data[reg_result_idx].control_curr, resp.len); 3937 for (int i = 0; i < nb_dest_blocks; i++) { 3938 network_to_dest_block(&rdma->dest_blocks[i]); 3939 3940 /* We require that the blocks are in the same order */ 3941 if (rdma->dest_blocks[i].length != local->block[i].length) { 3942 error_report("Block %s/%d has a different length %" PRIu64 3943 "vs %" PRIu64, 3944 local->block[i].block_name, i, 3945 local->block[i].length, 3946 rdma->dest_blocks[i].length); 3947 rdma->errored = true; 3948 return -1; 3949 } 3950 local->block[i].remote_host_addr = 3951 rdma->dest_blocks[i].remote_host_addr; 3952 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; 3953 } 3954 } 3955 3956 trace_rdma_registration_stop(flags); 3957 3958 head.type = RDMA_CONTROL_REGISTER_FINISHED; 3959 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL, &err); 3960 3961 if (ret < 0) { 3962 error_report_err(err); 3963 goto err; 3964 } 3965 3966 return 0; 3967 err: 3968 rdma->errored = true; 3969 return -1; 3970 } 3971 3972 static void qio_channel_rdma_finalize(Object *obj) 3973 { 3974 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); 3975 if (rioc->rdmain) { 3976 qemu_rdma_cleanup(rioc->rdmain); 3977 g_free(rioc->rdmain); 3978 rioc->rdmain = NULL; 3979 } 3980 if (rioc->rdmaout) { 3981 qemu_rdma_cleanup(rioc->rdmaout); 3982 g_free(rioc->rdmaout); 3983 rioc->rdmaout = NULL; 3984 } 3985 } 3986 3987 static void qio_channel_rdma_class_init(ObjectClass *klass, 3988 void *class_data G_GNUC_UNUSED) 3989 { 3990 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 3991 3992 ioc_klass->io_writev = qio_channel_rdma_writev; 3993 ioc_klass->io_readv = qio_channel_rdma_readv; 3994 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; 3995 ioc_klass->io_close = qio_channel_rdma_close; 3996 ioc_klass->io_create_watch = qio_channel_rdma_create_watch; 3997 ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler; 3998 ioc_klass->io_shutdown = qio_channel_rdma_shutdown; 3999 } 4000 4001 static const TypeInfo qio_channel_rdma_info = { 4002 .parent = TYPE_QIO_CHANNEL, 4003 .name = TYPE_QIO_CHANNEL_RDMA, 4004 .instance_size = sizeof(QIOChannelRDMA), 4005 .instance_finalize = qio_channel_rdma_finalize, 4006 .class_init = qio_channel_rdma_class_init, 4007 }; 4008 4009 static void qio_channel_rdma_register_types(void) 4010 { 4011 type_register_static(&qio_channel_rdma_info); 4012 } 4013 4014 type_init(qio_channel_rdma_register_types); 4015 4016 static QEMUFile *rdma_new_input(RDMAContext *rdma) 4017 { 4018 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); 4019 4020 rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc)); 4021 rioc->rdmain = rdma; 4022 rioc->rdmaout = rdma->return_path; 4023 4024 return rioc->file; 4025 } 4026 4027 static QEMUFile *rdma_new_output(RDMAContext *rdma) 4028 { 4029 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); 4030 4031 rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc)); 4032 rioc->rdmaout = rdma; 4033 rioc->rdmain = rdma->return_path; 4034 4035 return rioc->file; 4036 } 4037 4038 static void rdma_accept_incoming_migration(void *opaque) 4039 { 4040 RDMAContext *rdma = opaque; 4041 QEMUFile *f; 4042 4043 trace_qemu_rdma_accept_incoming_migration(); 4044 if (qemu_rdma_accept(rdma) < 0) { 4045 error_report("RDMA ERROR: Migration initialization failed"); 4046 return; 4047 } 4048 4049 trace_qemu_rdma_accept_incoming_migration_accepted(); 4050 4051 if (rdma->is_return_path) { 4052 return; 4053 } 4054 4055 f = rdma_new_input(rdma); 4056 if (f == NULL) { 4057 error_report("RDMA ERROR: could not open RDMA for input"); 4058 qemu_rdma_cleanup(rdma); 4059 return; 4060 } 4061 4062 rdma->migration_started_on_destination = 1; 4063 migration_fd_process_incoming(f); 4064 } 4065 4066 void rdma_start_incoming_migration(InetSocketAddress *host_port, 4067 Error **errp) 4068 { 4069 MigrationState *s = migrate_get_current(); 4070 int ret; 4071 RDMAContext *rdma; 4072 4073 trace_rdma_start_incoming_migration(); 4074 4075 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 4076 if (ram_block_discard_is_required()) { 4077 error_setg(errp, "RDMA: cannot disable RAM discard"); 4078 return; 4079 } 4080 4081 rdma = qemu_rdma_data_init(host_port, errp); 4082 if (rdma == NULL) { 4083 goto err; 4084 } 4085 4086 ret = qemu_rdma_dest_init(rdma, errp); 4087 if (ret < 0) { 4088 goto err; 4089 } 4090 4091 trace_rdma_start_incoming_migration_after_dest_init(); 4092 4093 ret = rdma_listen(rdma->listen_id, 5); 4094 4095 if (ret < 0) { 4096 error_setg(errp, "RDMA ERROR: listening on socket!"); 4097 goto cleanup_rdma; 4098 } 4099 4100 trace_rdma_start_incoming_migration_after_rdma_listen(); 4101 s->rdma_migration = true; 4102 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 4103 NULL, (void *)(intptr_t)rdma); 4104 return; 4105 4106 cleanup_rdma: 4107 qemu_rdma_cleanup(rdma); 4108 err: 4109 if (rdma) { 4110 g_free(rdma->host); 4111 } 4112 g_free(rdma); 4113 } 4114 4115 void rdma_start_outgoing_migration(void *opaque, 4116 InetSocketAddress *host_port, Error **errp) 4117 { 4118 MigrationState *s = opaque; 4119 RDMAContext *rdma_return_path = NULL; 4120 RDMAContext *rdma; 4121 int ret; 4122 4123 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 4124 if (ram_block_discard_is_required()) { 4125 error_setg(errp, "RDMA: cannot disable RAM discard"); 4126 return; 4127 } 4128 4129 rdma = qemu_rdma_data_init(host_port, errp); 4130 if (rdma == NULL) { 4131 goto err; 4132 } 4133 4134 ret = qemu_rdma_source_init(rdma, migrate_rdma_pin_all(), errp); 4135 4136 if (ret < 0) { 4137 goto err; 4138 } 4139 4140 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 4141 ret = qemu_rdma_connect(rdma, false, errp); 4142 4143 if (ret < 0) { 4144 goto err; 4145 } 4146 4147 /* RDMA postcopy need a separate queue pair for return path */ 4148 if (migrate_postcopy() || migrate_return_path()) { 4149 rdma_return_path = qemu_rdma_data_init(host_port, errp); 4150 4151 if (rdma_return_path == NULL) { 4152 goto return_path_err; 4153 } 4154 4155 ret = qemu_rdma_source_init(rdma_return_path, 4156 migrate_rdma_pin_all(), errp); 4157 4158 if (ret < 0) { 4159 goto return_path_err; 4160 } 4161 4162 ret = qemu_rdma_connect(rdma_return_path, true, errp); 4163 4164 if (ret < 0) { 4165 goto return_path_err; 4166 } 4167 4168 rdma->return_path = rdma_return_path; 4169 rdma_return_path->return_path = rdma; 4170 rdma_return_path->is_return_path = true; 4171 } 4172 4173 trace_rdma_start_outgoing_migration_after_rdma_connect(); 4174 4175 s->to_dst_file = rdma_new_output(rdma); 4176 s->rdma_migration = true; 4177 migrate_fd_connect(s, NULL); 4178 return; 4179 return_path_err: 4180 qemu_rdma_cleanup(rdma); 4181 err: 4182 g_free(rdma); 4183 g_free(rdma_return_path); 4184 } 4185