1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * 6 * Authors: 7 * Michael R. Hines <mrhines@us.ibm.com> 8 * Jiuxing Liu <jl@us.ibm.com> 9 * 10 * This work is licensed under the terms of the GNU GPL, version 2 or 11 * later. See the COPYING file in the top-level directory. 12 * 13 */ 14 #include "qemu/osdep.h" 15 #include "qemu-common.h" 16 #include "migration/migration.h" 17 #include "migration/qemu-file.h" 18 #include "exec/cpu-common.h" 19 #include "qemu/error-report.h" 20 #include "qemu/main-loop.h" 21 #include "qemu/sockets.h" 22 #include "qemu/bitmap.h" 23 #include "qemu/coroutine.h" 24 #include <sys/socket.h> 25 #include <netdb.h> 26 #include <arpa/inet.h> 27 #include <rdma/rdma_cma.h> 28 #include "trace.h" 29 30 /* 31 * Print and error on both the Monitor and the Log file. 32 */ 33 #define ERROR(errp, fmt, ...) \ 34 do { \ 35 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \ 36 if (errp && (*(errp) == NULL)) { \ 37 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \ 38 } \ 39 } while (0) 40 41 #define RDMA_RESOLVE_TIMEOUT_MS 10000 42 43 /* Do not merge data if larger than this. */ 44 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 45 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 46 47 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 48 49 /* 50 * This is only for non-live state being migrated. 51 * Instead of RDMA_WRITE messages, we use RDMA_SEND 52 * messages for that state, which requires a different 53 * delivery design than main memory. 54 */ 55 #define RDMA_SEND_INCREMENT 32768 56 57 /* 58 * Maximum size infiniband SEND message 59 */ 60 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 61 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 62 63 #define RDMA_CONTROL_VERSION_CURRENT 1 64 /* 65 * Capabilities for negotiation. 66 */ 67 #define RDMA_CAPABILITY_PIN_ALL 0x01 68 69 /* 70 * Add the other flags above to this list of known capabilities 71 * as they are introduced. 72 */ 73 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 74 75 #define CHECK_ERROR_STATE() \ 76 do { \ 77 if (rdma->error_state) { \ 78 if (!rdma->error_reported) { \ 79 error_report("RDMA is in an error state waiting migration" \ 80 " to abort!"); \ 81 rdma->error_reported = 1; \ 82 } \ 83 return rdma->error_state; \ 84 } \ 85 } while (0); 86 87 /* 88 * A work request ID is 64-bits and we split up these bits 89 * into 3 parts: 90 * 91 * bits 0-15 : type of control message, 2^16 92 * bits 16-29: ram block index, 2^14 93 * bits 30-63: ram block chunk number, 2^34 94 * 95 * The last two bit ranges are only used for RDMA writes, 96 * in order to track their completion and potentially 97 * also track unregistration status of the message. 98 */ 99 #define RDMA_WRID_TYPE_SHIFT 0UL 100 #define RDMA_WRID_BLOCK_SHIFT 16UL 101 #define RDMA_WRID_CHUNK_SHIFT 30UL 102 103 #define RDMA_WRID_TYPE_MASK \ 104 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 105 106 #define RDMA_WRID_BLOCK_MASK \ 107 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 108 109 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 110 111 /* 112 * RDMA migration protocol: 113 * 1. RDMA Writes (data messages, i.e. RAM) 114 * 2. IB Send/Recv (control channel messages) 115 */ 116 enum { 117 RDMA_WRID_NONE = 0, 118 RDMA_WRID_RDMA_WRITE = 1, 119 RDMA_WRID_SEND_CONTROL = 2000, 120 RDMA_WRID_RECV_CONTROL = 4000, 121 }; 122 123 static const char *wrid_desc[] = { 124 [RDMA_WRID_NONE] = "NONE", 125 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA", 126 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND", 127 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", 128 }; 129 130 /* 131 * Work request IDs for IB SEND messages only (not RDMA writes). 132 * This is used by the migration protocol to transmit 133 * control messages (such as device state and registration commands) 134 * 135 * We could use more WRs, but we have enough for now. 136 */ 137 enum { 138 RDMA_WRID_READY = 0, 139 RDMA_WRID_DATA, 140 RDMA_WRID_CONTROL, 141 RDMA_WRID_MAX, 142 }; 143 144 /* 145 * SEND/RECV IB Control Messages. 146 */ 147 enum { 148 RDMA_CONTROL_NONE = 0, 149 RDMA_CONTROL_ERROR, 150 RDMA_CONTROL_READY, /* ready to receive */ 151 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 152 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 153 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 154 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 155 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 156 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 157 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 158 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 159 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 160 }; 161 162 static const char *control_desc[] = { 163 [RDMA_CONTROL_NONE] = "NONE", 164 [RDMA_CONTROL_ERROR] = "ERROR", 165 [RDMA_CONTROL_READY] = "READY", 166 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 167 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 168 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 169 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 170 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 171 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 172 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 173 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 174 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 175 }; 176 177 /* 178 * Memory and MR structures used to represent an IB Send/Recv work request. 179 * This is *not* used for RDMA writes, only IB Send/Recv. 180 */ 181 typedef struct { 182 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 183 struct ibv_mr *control_mr; /* registration metadata */ 184 size_t control_len; /* length of the message */ 185 uint8_t *control_curr; /* start of unconsumed bytes */ 186 } RDMAWorkRequestData; 187 188 /* 189 * Negotiate RDMA capabilities during connection-setup time. 190 */ 191 typedef struct { 192 uint32_t version; 193 uint32_t flags; 194 } RDMACapabilities; 195 196 static void caps_to_network(RDMACapabilities *cap) 197 { 198 cap->version = htonl(cap->version); 199 cap->flags = htonl(cap->flags); 200 } 201 202 static void network_to_caps(RDMACapabilities *cap) 203 { 204 cap->version = ntohl(cap->version); 205 cap->flags = ntohl(cap->flags); 206 } 207 208 /* 209 * Representation of a RAMBlock from an RDMA perspective. 210 * This is not transmitted, only local. 211 * This and subsequent structures cannot be linked lists 212 * because we're using a single IB message to transmit 213 * the information. It's small anyway, so a list is overkill. 214 */ 215 typedef struct RDMALocalBlock { 216 char *block_name; 217 uint8_t *local_host_addr; /* local virtual address */ 218 uint64_t remote_host_addr; /* remote virtual address */ 219 uint64_t offset; 220 uint64_t length; 221 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 222 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 223 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 224 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 225 int index; /* which block are we */ 226 unsigned int src_index; /* (Only used on dest) */ 227 bool is_ram_block; 228 int nb_chunks; 229 unsigned long *transit_bitmap; 230 unsigned long *unregister_bitmap; 231 } RDMALocalBlock; 232 233 /* 234 * Also represents a RAMblock, but only on the dest. 235 * This gets transmitted by the dest during connection-time 236 * to the source VM and then is used to populate the 237 * corresponding RDMALocalBlock with 238 * the information needed to perform the actual RDMA. 239 */ 240 typedef struct QEMU_PACKED RDMADestBlock { 241 uint64_t remote_host_addr; 242 uint64_t offset; 243 uint64_t length; 244 uint32_t remote_rkey; 245 uint32_t padding; 246 } RDMADestBlock; 247 248 static uint64_t htonll(uint64_t v) 249 { 250 union { uint32_t lv[2]; uint64_t llv; } u; 251 u.lv[0] = htonl(v >> 32); 252 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 253 return u.llv; 254 } 255 256 static uint64_t ntohll(uint64_t v) { 257 union { uint32_t lv[2]; uint64_t llv; } u; 258 u.llv = v; 259 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 260 } 261 262 static void dest_block_to_network(RDMADestBlock *db) 263 { 264 db->remote_host_addr = htonll(db->remote_host_addr); 265 db->offset = htonll(db->offset); 266 db->length = htonll(db->length); 267 db->remote_rkey = htonl(db->remote_rkey); 268 } 269 270 static void network_to_dest_block(RDMADestBlock *db) 271 { 272 db->remote_host_addr = ntohll(db->remote_host_addr); 273 db->offset = ntohll(db->offset); 274 db->length = ntohll(db->length); 275 db->remote_rkey = ntohl(db->remote_rkey); 276 } 277 278 /* 279 * Virtual address of the above structures used for transmitting 280 * the RAMBlock descriptions at connection-time. 281 * This structure is *not* transmitted. 282 */ 283 typedef struct RDMALocalBlocks { 284 int nb_blocks; 285 bool init; /* main memory init complete */ 286 RDMALocalBlock *block; 287 } RDMALocalBlocks; 288 289 /* 290 * Main data structure for RDMA state. 291 * While there is only one copy of this structure being allocated right now, 292 * this is the place where one would start if you wanted to consider 293 * having more than one RDMA connection open at the same time. 294 */ 295 typedef struct RDMAContext { 296 char *host; 297 int port; 298 299 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 300 301 /* 302 * This is used by *_exchange_send() to figure out whether or not 303 * the initial "READY" message has already been received or not. 304 * This is because other functions may potentially poll() and detect 305 * the READY message before send() does, in which case we need to 306 * know if it completed. 307 */ 308 int control_ready_expected; 309 310 /* number of outstanding writes */ 311 int nb_sent; 312 313 /* store info about current buffer so that we can 314 merge it with future sends */ 315 uint64_t current_addr; 316 uint64_t current_length; 317 /* index of ram block the current buffer belongs to */ 318 int current_index; 319 /* index of the chunk in the current ram block */ 320 int current_chunk; 321 322 bool pin_all; 323 324 /* 325 * infiniband-specific variables for opening the device 326 * and maintaining connection state and so forth. 327 * 328 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 329 * cm_id->verbs, cm_id->channel, and cm_id->qp. 330 */ 331 struct rdma_cm_id *cm_id; /* connection manager ID */ 332 struct rdma_cm_id *listen_id; 333 bool connected; 334 335 struct ibv_context *verbs; 336 struct rdma_event_channel *channel; 337 struct ibv_qp *qp; /* queue pair */ 338 struct ibv_comp_channel *comp_channel; /* completion channel */ 339 struct ibv_pd *pd; /* protection domain */ 340 struct ibv_cq *cq; /* completion queue */ 341 342 /* 343 * If a previous write failed (perhaps because of a failed 344 * memory registration, then do not attempt any future work 345 * and remember the error state. 346 */ 347 int error_state; 348 int error_reported; 349 350 /* 351 * Description of ram blocks used throughout the code. 352 */ 353 RDMALocalBlocks local_ram_blocks; 354 RDMADestBlock *dest_blocks; 355 356 /* Index of the next RAMBlock received during block registration */ 357 unsigned int next_src_index; 358 359 /* 360 * Migration on *destination* started. 361 * Then use coroutine yield function. 362 * Source runs in a thread, so we don't care. 363 */ 364 int migration_started_on_destination; 365 366 int total_registrations; 367 int total_writes; 368 369 int unregister_current, unregister_next; 370 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 371 372 GHashTable *blockmap; 373 } RDMAContext; 374 375 /* 376 * Interface to the rest of the migration call stack. 377 */ 378 typedef struct QEMUFileRDMA { 379 RDMAContext *rdma; 380 size_t len; 381 void *file; 382 } QEMUFileRDMA; 383 384 /* 385 * Main structure for IB Send/Recv control messages. 386 * This gets prepended at the beginning of every Send/Recv. 387 */ 388 typedef struct QEMU_PACKED { 389 uint32_t len; /* Total length of data portion */ 390 uint32_t type; /* which control command to perform */ 391 uint32_t repeat; /* number of commands in data portion of same type */ 392 uint32_t padding; 393 } RDMAControlHeader; 394 395 static void control_to_network(RDMAControlHeader *control) 396 { 397 control->type = htonl(control->type); 398 control->len = htonl(control->len); 399 control->repeat = htonl(control->repeat); 400 } 401 402 static void network_to_control(RDMAControlHeader *control) 403 { 404 control->type = ntohl(control->type); 405 control->len = ntohl(control->len); 406 control->repeat = ntohl(control->repeat); 407 } 408 409 /* 410 * Register a single Chunk. 411 * Information sent by the source VM to inform the dest 412 * to register an single chunk of memory before we can perform 413 * the actual RDMA operation. 414 */ 415 typedef struct QEMU_PACKED { 416 union QEMU_PACKED { 417 uint64_t current_addr; /* offset into the ram_addr_t space */ 418 uint64_t chunk; /* chunk to lookup if unregistering */ 419 } key; 420 uint32_t current_index; /* which ramblock the chunk belongs to */ 421 uint32_t padding; 422 uint64_t chunks; /* how many sequential chunks to register */ 423 } RDMARegister; 424 425 static void register_to_network(RDMAContext *rdma, RDMARegister *reg) 426 { 427 RDMALocalBlock *local_block; 428 local_block = &rdma->local_ram_blocks.block[reg->current_index]; 429 430 if (local_block->is_ram_block) { 431 /* 432 * current_addr as passed in is an address in the local ram_addr_t 433 * space, we need to translate this for the destination 434 */ 435 reg->key.current_addr -= local_block->offset; 436 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset; 437 } 438 reg->key.current_addr = htonll(reg->key.current_addr); 439 reg->current_index = htonl(reg->current_index); 440 reg->chunks = htonll(reg->chunks); 441 } 442 443 static void network_to_register(RDMARegister *reg) 444 { 445 reg->key.current_addr = ntohll(reg->key.current_addr); 446 reg->current_index = ntohl(reg->current_index); 447 reg->chunks = ntohll(reg->chunks); 448 } 449 450 typedef struct QEMU_PACKED { 451 uint32_t value; /* if zero, we will madvise() */ 452 uint32_t block_idx; /* which ram block index */ 453 uint64_t offset; /* Address in remote ram_addr_t space */ 454 uint64_t length; /* length of the chunk */ 455 } RDMACompress; 456 457 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp) 458 { 459 comp->value = htonl(comp->value); 460 /* 461 * comp->offset as passed in is an address in the local ram_addr_t 462 * space, we need to translate this for the destination 463 */ 464 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset; 465 comp->offset += rdma->dest_blocks[comp->block_idx].offset; 466 comp->block_idx = htonl(comp->block_idx); 467 comp->offset = htonll(comp->offset); 468 comp->length = htonll(comp->length); 469 } 470 471 static void network_to_compress(RDMACompress *comp) 472 { 473 comp->value = ntohl(comp->value); 474 comp->block_idx = ntohl(comp->block_idx); 475 comp->offset = ntohll(comp->offset); 476 comp->length = ntohll(comp->length); 477 } 478 479 /* 480 * The result of the dest's memory registration produces an "rkey" 481 * which the source VM must reference in order to perform 482 * the RDMA operation. 483 */ 484 typedef struct QEMU_PACKED { 485 uint32_t rkey; 486 uint32_t padding; 487 uint64_t host_addr; 488 } RDMARegisterResult; 489 490 static void result_to_network(RDMARegisterResult *result) 491 { 492 result->rkey = htonl(result->rkey); 493 result->host_addr = htonll(result->host_addr); 494 }; 495 496 static void network_to_result(RDMARegisterResult *result) 497 { 498 result->rkey = ntohl(result->rkey); 499 result->host_addr = ntohll(result->host_addr); 500 }; 501 502 const char *print_wrid(int wrid); 503 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 504 uint8_t *data, RDMAControlHeader *resp, 505 int *resp_idx, 506 int (*callback)(RDMAContext *rdma)); 507 508 static inline uint64_t ram_chunk_index(const uint8_t *start, 509 const uint8_t *host) 510 { 511 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 512 } 513 514 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 515 uint64_t i) 516 { 517 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 518 (i << RDMA_REG_CHUNK_SHIFT)); 519 } 520 521 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 522 uint64_t i) 523 { 524 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 525 (1UL << RDMA_REG_CHUNK_SHIFT); 526 527 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 528 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 529 } 530 531 return result; 532 } 533 534 static int rdma_add_block(RDMAContext *rdma, const char *block_name, 535 void *host_addr, 536 ram_addr_t block_offset, uint64_t length) 537 { 538 RDMALocalBlocks *local = &rdma->local_ram_blocks; 539 RDMALocalBlock *block; 540 RDMALocalBlock *old = local->block; 541 542 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1); 543 544 if (local->nb_blocks) { 545 int x; 546 547 if (rdma->blockmap) { 548 for (x = 0; x < local->nb_blocks; x++) { 549 g_hash_table_remove(rdma->blockmap, 550 (void *)(uintptr_t)old[x].offset); 551 g_hash_table_insert(rdma->blockmap, 552 (void *)(uintptr_t)old[x].offset, 553 &local->block[x]); 554 } 555 } 556 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 557 g_free(old); 558 } 559 560 block = &local->block[local->nb_blocks]; 561 562 block->block_name = g_strdup(block_name); 563 block->local_host_addr = host_addr; 564 block->offset = block_offset; 565 block->length = length; 566 block->index = local->nb_blocks; 567 block->src_index = ~0U; /* Filled in by the receipt of the block list */ 568 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 569 block->transit_bitmap = bitmap_new(block->nb_chunks); 570 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 571 block->unregister_bitmap = bitmap_new(block->nb_chunks); 572 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 573 block->remote_keys = g_new0(uint32_t, block->nb_chunks); 574 575 block->is_ram_block = local->init ? false : true; 576 577 if (rdma->blockmap) { 578 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block); 579 } 580 581 trace_rdma_add_block(block_name, local->nb_blocks, 582 (uintptr_t) block->local_host_addr, 583 block->offset, block->length, 584 (uintptr_t) (block->local_host_addr + block->length), 585 BITS_TO_LONGS(block->nb_chunks) * 586 sizeof(unsigned long) * 8, 587 block->nb_chunks); 588 589 local->nb_blocks++; 590 591 return 0; 592 } 593 594 /* 595 * Memory regions need to be registered with the device and queue pairs setup 596 * in advanced before the migration starts. This tells us where the RAM blocks 597 * are so that we can register them individually. 598 */ 599 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr, 600 ram_addr_t block_offset, ram_addr_t length, void *opaque) 601 { 602 return rdma_add_block(opaque, block_name, host_addr, block_offset, length); 603 } 604 605 /* 606 * Identify the RAMBlocks and their quantity. They will be references to 607 * identify chunk boundaries inside each RAMBlock and also be referenced 608 * during dynamic page registration. 609 */ 610 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma) 611 { 612 RDMALocalBlocks *local = &rdma->local_ram_blocks; 613 614 assert(rdma->blockmap == NULL); 615 memset(local, 0, sizeof *local); 616 qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma); 617 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 618 rdma->dest_blocks = g_new0(RDMADestBlock, 619 rdma->local_ram_blocks.nb_blocks); 620 local->init = true; 621 return 0; 622 } 623 624 /* 625 * Note: If used outside of cleanup, the caller must ensure that the destination 626 * block structures are also updated 627 */ 628 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) 629 { 630 RDMALocalBlocks *local = &rdma->local_ram_blocks; 631 RDMALocalBlock *old = local->block; 632 int x; 633 634 if (rdma->blockmap) { 635 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset); 636 } 637 if (block->pmr) { 638 int j; 639 640 for (j = 0; j < block->nb_chunks; j++) { 641 if (!block->pmr[j]) { 642 continue; 643 } 644 ibv_dereg_mr(block->pmr[j]); 645 rdma->total_registrations--; 646 } 647 g_free(block->pmr); 648 block->pmr = NULL; 649 } 650 651 if (block->mr) { 652 ibv_dereg_mr(block->mr); 653 rdma->total_registrations--; 654 block->mr = NULL; 655 } 656 657 g_free(block->transit_bitmap); 658 block->transit_bitmap = NULL; 659 660 g_free(block->unregister_bitmap); 661 block->unregister_bitmap = NULL; 662 663 g_free(block->remote_keys); 664 block->remote_keys = NULL; 665 666 g_free(block->block_name); 667 block->block_name = NULL; 668 669 if (rdma->blockmap) { 670 for (x = 0; x < local->nb_blocks; x++) { 671 g_hash_table_remove(rdma->blockmap, 672 (void *)(uintptr_t)old[x].offset); 673 } 674 } 675 676 if (local->nb_blocks > 1) { 677 678 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1); 679 680 if (block->index) { 681 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 682 } 683 684 if (block->index < (local->nb_blocks - 1)) { 685 memcpy(local->block + block->index, old + (block->index + 1), 686 sizeof(RDMALocalBlock) * 687 (local->nb_blocks - (block->index + 1))); 688 } 689 } else { 690 assert(block == local->block); 691 local->block = NULL; 692 } 693 694 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr, 695 block->offset, block->length, 696 (uintptr_t)(block->local_host_addr + block->length), 697 BITS_TO_LONGS(block->nb_chunks) * 698 sizeof(unsigned long) * 8, block->nb_chunks); 699 700 g_free(old); 701 702 local->nb_blocks--; 703 704 if (local->nb_blocks && rdma->blockmap) { 705 for (x = 0; x < local->nb_blocks; x++) { 706 g_hash_table_insert(rdma->blockmap, 707 (void *)(uintptr_t)local->block[x].offset, 708 &local->block[x]); 709 } 710 } 711 712 return 0; 713 } 714 715 /* 716 * Put in the log file which RDMA device was opened and the details 717 * associated with that device. 718 */ 719 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 720 { 721 struct ibv_port_attr port; 722 723 if (ibv_query_port(verbs, 1, &port)) { 724 error_report("Failed to query port information"); 725 return; 726 } 727 728 printf("%s RDMA Device opened: kernel name %s " 729 "uverbs device name %s, " 730 "infiniband_verbs class device path %s, " 731 "infiniband class device path %s, " 732 "transport: (%d) %s\n", 733 who, 734 verbs->device->name, 735 verbs->device->dev_name, 736 verbs->device->dev_path, 737 verbs->device->ibdev_path, 738 port.link_layer, 739 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" : 740 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 741 ? "Ethernet" : "Unknown")); 742 } 743 744 /* 745 * Put in the log file the RDMA gid addressing information, 746 * useful for folks who have trouble understanding the 747 * RDMA device hierarchy in the kernel. 748 */ 749 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 750 { 751 char sgid[33]; 752 char dgid[33]; 753 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 754 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 755 trace_qemu_rdma_dump_gid(who, sgid, dgid); 756 } 757 758 /* 759 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 760 * We will try the next addrinfo struct, and fail if there are 761 * no other valid addresses to bind against. 762 * 763 * If user is listening on '[::]', then we will not have a opened a device 764 * yet and have no way of verifying if the device is RoCE or not. 765 * 766 * In this case, the source VM will throw an error for ALL types of 767 * connections (both IPv4 and IPv6) if the destination machine does not have 768 * a regular infiniband network available for use. 769 * 770 * The only way to guarantee that an error is thrown for broken kernels is 771 * for the management software to choose a *specific* interface at bind time 772 * and validate what time of hardware it is. 773 * 774 * Unfortunately, this puts the user in a fix: 775 * 776 * If the source VM connects with an IPv4 address without knowing that the 777 * destination has bound to '[::]' the migration will unconditionally fail 778 * unless the management software is explicitly listening on the IPv4 779 * address while using a RoCE-based device. 780 * 781 * If the source VM connects with an IPv6 address, then we're OK because we can 782 * throw an error on the source (and similarly on the destination). 783 * 784 * But in mixed environments, this will be broken for a while until it is fixed 785 * inside linux. 786 * 787 * We do provide a *tiny* bit of help in this function: We can list all of the 788 * devices in the system and check to see if all the devices are RoCE or 789 * Infiniband. 790 * 791 * If we detect that we have a *pure* RoCE environment, then we can safely 792 * thrown an error even if the management software has specified '[::]' as the 793 * bind address. 794 * 795 * However, if there is are multiple hetergeneous devices, then we cannot make 796 * this assumption and the user just has to be sure they know what they are 797 * doing. 798 * 799 * Patches are being reviewed on linux-rdma. 800 */ 801 static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs) 802 { 803 struct ibv_port_attr port_attr; 804 805 /* This bug only exists in linux, to our knowledge. */ 806 #ifdef CONFIG_LINUX 807 808 /* 809 * Verbs are only NULL if management has bound to '[::]'. 810 * 811 * Let's iterate through all the devices and see if there any pure IB 812 * devices (non-ethernet). 813 * 814 * If not, then we can safely proceed with the migration. 815 * Otherwise, there are no guarantees until the bug is fixed in linux. 816 */ 817 if (!verbs) { 818 int num_devices, x; 819 struct ibv_device ** dev_list = ibv_get_device_list(&num_devices); 820 bool roce_found = false; 821 bool ib_found = false; 822 823 for (x = 0; x < num_devices; x++) { 824 verbs = ibv_open_device(dev_list[x]); 825 if (!verbs) { 826 if (errno == EPERM) { 827 continue; 828 } else { 829 return -EINVAL; 830 } 831 } 832 833 if (ibv_query_port(verbs, 1, &port_attr)) { 834 ibv_close_device(verbs); 835 ERROR(errp, "Could not query initial IB port"); 836 return -EINVAL; 837 } 838 839 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 840 ib_found = true; 841 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 842 roce_found = true; 843 } 844 845 ibv_close_device(verbs); 846 847 } 848 849 if (roce_found) { 850 if (ib_found) { 851 fprintf(stderr, "WARN: migrations may fail:" 852 " IPv6 over RoCE / iWARP in linux" 853 " is broken. But since you appear to have a" 854 " mixed RoCE / IB environment, be sure to only" 855 " migrate over the IB fabric until the kernel " 856 " fixes the bug.\n"); 857 } else { 858 ERROR(errp, "You only have RoCE / iWARP devices in your systems" 859 " and your management software has specified '[::]'" 860 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 861 return -ENONET; 862 } 863 } 864 865 return 0; 866 } 867 868 /* 869 * If we have a verbs context, that means that some other than '[::]' was 870 * used by the management software for binding. In which case we can 871 * actually warn the user about a potentially broken kernel. 872 */ 873 874 /* IB ports start with 1, not 0 */ 875 if (ibv_query_port(verbs, 1, &port_attr)) { 876 ERROR(errp, "Could not query initial IB port"); 877 return -EINVAL; 878 } 879 880 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 881 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 " 882 "(but patches on linux-rdma in progress)"); 883 return -ENONET; 884 } 885 886 #endif 887 888 return 0; 889 } 890 891 /* 892 * Figure out which RDMA device corresponds to the requested IP hostname 893 * Also create the initial connection manager identifiers for opening 894 * the connection. 895 */ 896 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 897 { 898 int ret; 899 struct rdma_addrinfo *res; 900 char port_str[16]; 901 struct rdma_cm_event *cm_event; 902 char ip[40] = "unknown"; 903 struct rdma_addrinfo *e; 904 905 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 906 ERROR(errp, "RDMA hostname has not been set"); 907 return -EINVAL; 908 } 909 910 /* create CM channel */ 911 rdma->channel = rdma_create_event_channel(); 912 if (!rdma->channel) { 913 ERROR(errp, "could not create CM channel"); 914 return -EINVAL; 915 } 916 917 /* create CM id */ 918 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 919 if (ret) { 920 ERROR(errp, "could not create channel id"); 921 goto err_resolve_create_id; 922 } 923 924 snprintf(port_str, 16, "%d", rdma->port); 925 port_str[15] = '\0'; 926 927 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 928 if (ret < 0) { 929 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 930 goto err_resolve_get_addr; 931 } 932 933 for (e = res; e != NULL; e = e->ai_next) { 934 inet_ntop(e->ai_family, 935 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 936 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 937 938 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 939 RDMA_RESOLVE_TIMEOUT_MS); 940 if (!ret) { 941 if (e->ai_family == AF_INET6) { 942 ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs); 943 if (ret) { 944 continue; 945 } 946 } 947 goto route; 948 } 949 } 950 951 ERROR(errp, "could not resolve address %s", rdma->host); 952 goto err_resolve_get_addr; 953 954 route: 955 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 956 957 ret = rdma_get_cm_event(rdma->channel, &cm_event); 958 if (ret) { 959 ERROR(errp, "could not perform event_addr_resolved"); 960 goto err_resolve_get_addr; 961 } 962 963 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 964 ERROR(errp, "result not equal to event_addr_resolved %s", 965 rdma_event_str(cm_event->event)); 966 perror("rdma_resolve_addr"); 967 rdma_ack_cm_event(cm_event); 968 ret = -EINVAL; 969 goto err_resolve_get_addr; 970 } 971 rdma_ack_cm_event(cm_event); 972 973 /* resolve route */ 974 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 975 if (ret) { 976 ERROR(errp, "could not resolve rdma route"); 977 goto err_resolve_get_addr; 978 } 979 980 ret = rdma_get_cm_event(rdma->channel, &cm_event); 981 if (ret) { 982 ERROR(errp, "could not perform event_route_resolved"); 983 goto err_resolve_get_addr; 984 } 985 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 986 ERROR(errp, "result not equal to event_route_resolved: %s", 987 rdma_event_str(cm_event->event)); 988 rdma_ack_cm_event(cm_event); 989 ret = -EINVAL; 990 goto err_resolve_get_addr; 991 } 992 rdma_ack_cm_event(cm_event); 993 rdma->verbs = rdma->cm_id->verbs; 994 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 995 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 996 return 0; 997 998 err_resolve_get_addr: 999 rdma_destroy_id(rdma->cm_id); 1000 rdma->cm_id = NULL; 1001 err_resolve_create_id: 1002 rdma_destroy_event_channel(rdma->channel); 1003 rdma->channel = NULL; 1004 return ret; 1005 } 1006 1007 /* 1008 * Create protection domain and completion queues 1009 */ 1010 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma) 1011 { 1012 /* allocate pd */ 1013 rdma->pd = ibv_alloc_pd(rdma->verbs); 1014 if (!rdma->pd) { 1015 error_report("failed to allocate protection domain"); 1016 return -1; 1017 } 1018 1019 /* create completion channel */ 1020 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs); 1021 if (!rdma->comp_channel) { 1022 error_report("failed to allocate completion channel"); 1023 goto err_alloc_pd_cq; 1024 } 1025 1026 /* 1027 * Completion queue can be filled by both read and write work requests, 1028 * so must reflect the sum of both possible queue sizes. 1029 */ 1030 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1031 NULL, rdma->comp_channel, 0); 1032 if (!rdma->cq) { 1033 error_report("failed to allocate completion queue"); 1034 goto err_alloc_pd_cq; 1035 } 1036 1037 return 0; 1038 1039 err_alloc_pd_cq: 1040 if (rdma->pd) { 1041 ibv_dealloc_pd(rdma->pd); 1042 } 1043 if (rdma->comp_channel) { 1044 ibv_destroy_comp_channel(rdma->comp_channel); 1045 } 1046 rdma->pd = NULL; 1047 rdma->comp_channel = NULL; 1048 return -1; 1049 1050 } 1051 1052 /* 1053 * Create queue pairs. 1054 */ 1055 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1056 { 1057 struct ibv_qp_init_attr attr = { 0 }; 1058 int ret; 1059 1060 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1061 attr.cap.max_recv_wr = 3; 1062 attr.cap.max_send_sge = 1; 1063 attr.cap.max_recv_sge = 1; 1064 attr.send_cq = rdma->cq; 1065 attr.recv_cq = rdma->cq; 1066 attr.qp_type = IBV_QPT_RC; 1067 1068 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr); 1069 if (ret) { 1070 return -1; 1071 } 1072 1073 rdma->qp = rdma->cm_id->qp; 1074 return 0; 1075 } 1076 1077 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma) 1078 { 1079 int i; 1080 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1081 1082 for (i = 0; i < local->nb_blocks; i++) { 1083 local->block[i].mr = 1084 ibv_reg_mr(rdma->pd, 1085 local->block[i].local_host_addr, 1086 local->block[i].length, 1087 IBV_ACCESS_LOCAL_WRITE | 1088 IBV_ACCESS_REMOTE_WRITE 1089 ); 1090 if (!local->block[i].mr) { 1091 perror("Failed to register local dest ram block!\n"); 1092 break; 1093 } 1094 rdma->total_registrations++; 1095 } 1096 1097 if (i >= local->nb_blocks) { 1098 return 0; 1099 } 1100 1101 for (i--; i >= 0; i--) { 1102 ibv_dereg_mr(local->block[i].mr); 1103 rdma->total_registrations--; 1104 } 1105 1106 return -1; 1107 1108 } 1109 1110 /* 1111 * Find the ram block that corresponds to the page requested to be 1112 * transmitted by QEMU. 1113 * 1114 * Once the block is found, also identify which 'chunk' within that 1115 * block that the page belongs to. 1116 * 1117 * This search cannot fail or the migration will fail. 1118 */ 1119 static int qemu_rdma_search_ram_block(RDMAContext *rdma, 1120 uintptr_t block_offset, 1121 uint64_t offset, 1122 uint64_t length, 1123 uint64_t *block_index, 1124 uint64_t *chunk_index) 1125 { 1126 uint64_t current_addr = block_offset + offset; 1127 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1128 (void *) block_offset); 1129 assert(block); 1130 assert(current_addr >= block->offset); 1131 assert((current_addr + length) <= (block->offset + block->length)); 1132 1133 *block_index = block->index; 1134 *chunk_index = ram_chunk_index(block->local_host_addr, 1135 block->local_host_addr + (current_addr - block->offset)); 1136 1137 return 0; 1138 } 1139 1140 /* 1141 * Register a chunk with IB. If the chunk was already registered 1142 * previously, then skip. 1143 * 1144 * Also return the keys associated with the registration needed 1145 * to perform the actual RDMA operation. 1146 */ 1147 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1148 RDMALocalBlock *block, uintptr_t host_addr, 1149 uint32_t *lkey, uint32_t *rkey, int chunk, 1150 uint8_t *chunk_start, uint8_t *chunk_end) 1151 { 1152 if (block->mr) { 1153 if (lkey) { 1154 *lkey = block->mr->lkey; 1155 } 1156 if (rkey) { 1157 *rkey = block->mr->rkey; 1158 } 1159 return 0; 1160 } 1161 1162 /* allocate memory to store chunk MRs */ 1163 if (!block->pmr) { 1164 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks); 1165 } 1166 1167 /* 1168 * If 'rkey', then we're the destination, so grant access to the source. 1169 * 1170 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1171 */ 1172 if (!block->pmr[chunk]) { 1173 uint64_t len = chunk_end - chunk_start; 1174 1175 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1176 1177 block->pmr[chunk] = ibv_reg_mr(rdma->pd, 1178 chunk_start, len, 1179 (rkey ? (IBV_ACCESS_LOCAL_WRITE | 1180 IBV_ACCESS_REMOTE_WRITE) : 0)); 1181 1182 if (!block->pmr[chunk]) { 1183 perror("Failed to register chunk!"); 1184 fprintf(stderr, "Chunk details: block: %d chunk index %d" 1185 " start %" PRIuPTR " end %" PRIuPTR 1186 " host %" PRIuPTR 1187 " local %" PRIuPTR " registrations: %d\n", 1188 block->index, chunk, (uintptr_t)chunk_start, 1189 (uintptr_t)chunk_end, host_addr, 1190 (uintptr_t)block->local_host_addr, 1191 rdma->total_registrations); 1192 return -1; 1193 } 1194 rdma->total_registrations++; 1195 } 1196 1197 if (lkey) { 1198 *lkey = block->pmr[chunk]->lkey; 1199 } 1200 if (rkey) { 1201 *rkey = block->pmr[chunk]->rkey; 1202 } 1203 return 0; 1204 } 1205 1206 /* 1207 * Register (at connection time) the memory used for control 1208 * channel messages. 1209 */ 1210 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1211 { 1212 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1213 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1214 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1215 if (rdma->wr_data[idx].control_mr) { 1216 rdma->total_registrations++; 1217 return 0; 1218 } 1219 error_report("qemu_rdma_reg_control failed"); 1220 return -1; 1221 } 1222 1223 const char *print_wrid(int wrid) 1224 { 1225 if (wrid >= RDMA_WRID_RECV_CONTROL) { 1226 return wrid_desc[RDMA_WRID_RECV_CONTROL]; 1227 } 1228 return wrid_desc[wrid]; 1229 } 1230 1231 /* 1232 * RDMA requires memory registration (mlock/pinning), but this is not good for 1233 * overcommitment. 1234 * 1235 * In preparation for the future where LRU information or workload-specific 1236 * writable writable working set memory access behavior is available to QEMU 1237 * it would be nice to have in place the ability to UN-register/UN-pin 1238 * particular memory regions from the RDMA hardware when it is determine that 1239 * those regions of memory will likely not be accessed again in the near future. 1240 * 1241 * While we do not yet have such information right now, the following 1242 * compile-time option allows us to perform a non-optimized version of this 1243 * behavior. 1244 * 1245 * By uncommenting this option, you will cause *all* RDMA transfers to be 1246 * unregistered immediately after the transfer completes on both sides of the 1247 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode. 1248 * 1249 * This will have a terrible impact on migration performance, so until future 1250 * workload information or LRU information is available, do not attempt to use 1251 * this feature except for basic testing. 1252 */ 1253 //#define RDMA_UNREGISTRATION_EXAMPLE 1254 1255 /* 1256 * Perform a non-optimized memory unregistration after every transfer 1257 * for demonstration purposes, only if pin-all is not requested. 1258 * 1259 * Potential optimizations: 1260 * 1. Start a new thread to run this function continuously 1261 - for bit clearing 1262 - and for receipt of unregister messages 1263 * 2. Use an LRU. 1264 * 3. Use workload hints. 1265 */ 1266 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1267 { 1268 while (rdma->unregistrations[rdma->unregister_current]) { 1269 int ret; 1270 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1271 uint64_t chunk = 1272 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1273 uint64_t index = 1274 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1275 RDMALocalBlock *block = 1276 &(rdma->local_ram_blocks.block[index]); 1277 RDMARegister reg = { .current_index = index }; 1278 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1279 }; 1280 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1281 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1282 .repeat = 1, 1283 }; 1284 1285 trace_qemu_rdma_unregister_waiting_proc(chunk, 1286 rdma->unregister_current); 1287 1288 rdma->unregistrations[rdma->unregister_current] = 0; 1289 rdma->unregister_current++; 1290 1291 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1292 rdma->unregister_current = 0; 1293 } 1294 1295 1296 /* 1297 * Unregistration is speculative (because migration is single-threaded 1298 * and we cannot break the protocol's inifinband message ordering). 1299 * Thus, if the memory is currently being used for transmission, 1300 * then abort the attempt to unregister and try again 1301 * later the next time a completion is received for this memory. 1302 */ 1303 clear_bit(chunk, block->unregister_bitmap); 1304 1305 if (test_bit(chunk, block->transit_bitmap)) { 1306 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1307 continue; 1308 } 1309 1310 trace_qemu_rdma_unregister_waiting_send(chunk); 1311 1312 ret = ibv_dereg_mr(block->pmr[chunk]); 1313 block->pmr[chunk] = NULL; 1314 block->remote_keys[chunk] = 0; 1315 1316 if (ret != 0) { 1317 perror("unregistration chunk failed"); 1318 return -ret; 1319 } 1320 rdma->total_registrations--; 1321 1322 reg.key.chunk = chunk; 1323 register_to_network(rdma, ®); 1324 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1325 &resp, NULL, NULL); 1326 if (ret < 0) { 1327 return ret; 1328 } 1329 1330 trace_qemu_rdma_unregister_waiting_complete(chunk); 1331 } 1332 1333 return 0; 1334 } 1335 1336 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1337 uint64_t chunk) 1338 { 1339 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1340 1341 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1342 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1343 1344 return result; 1345 } 1346 1347 /* 1348 * Set bit for unregistration in the next iteration. 1349 * We cannot transmit right here, but will unpin later. 1350 */ 1351 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index, 1352 uint64_t chunk, uint64_t wr_id) 1353 { 1354 if (rdma->unregistrations[rdma->unregister_next] != 0) { 1355 error_report("rdma migration: queue is full"); 1356 } else { 1357 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1358 1359 if (!test_and_set_bit(chunk, block->unregister_bitmap)) { 1360 trace_qemu_rdma_signal_unregister_append(chunk, 1361 rdma->unregister_next); 1362 1363 rdma->unregistrations[rdma->unregister_next++] = 1364 qemu_rdma_make_wrid(wr_id, index, chunk); 1365 1366 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) { 1367 rdma->unregister_next = 0; 1368 } 1369 } else { 1370 trace_qemu_rdma_signal_unregister_already(chunk); 1371 } 1372 } 1373 } 1374 1375 /* 1376 * Consult the connection manager to see a work request 1377 * (of any kind) has completed. 1378 * Return the work request ID that completed. 1379 */ 1380 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out, 1381 uint32_t *byte_len) 1382 { 1383 int ret; 1384 struct ibv_wc wc; 1385 uint64_t wr_id; 1386 1387 ret = ibv_poll_cq(rdma->cq, 1, &wc); 1388 1389 if (!ret) { 1390 *wr_id_out = RDMA_WRID_NONE; 1391 return 0; 1392 } 1393 1394 if (ret < 0) { 1395 error_report("ibv_poll_cq return %d", ret); 1396 return ret; 1397 } 1398 1399 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1400 1401 if (wc.status != IBV_WC_SUCCESS) { 1402 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n", 1403 wc.status, ibv_wc_status_str(wc.status)); 1404 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]); 1405 1406 return -1; 1407 } 1408 1409 if (rdma->control_ready_expected && 1410 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1411 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL], 1412 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent); 1413 rdma->control_ready_expected = 0; 1414 } 1415 1416 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1417 uint64_t chunk = 1418 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1419 uint64_t index = 1420 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1421 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1422 1423 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent, 1424 index, chunk, block->local_host_addr, 1425 (void *)(uintptr_t)block->remote_host_addr); 1426 1427 clear_bit(chunk, block->transit_bitmap); 1428 1429 if (rdma->nb_sent > 0) { 1430 rdma->nb_sent--; 1431 } 1432 1433 if (!rdma->pin_all) { 1434 /* 1435 * FYI: If one wanted to signal a specific chunk to be unregistered 1436 * using LRU or workload-specific information, this is the function 1437 * you would call to do so. That chunk would then get asynchronously 1438 * unregistered later. 1439 */ 1440 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1441 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id); 1442 #endif 1443 } 1444 } else { 1445 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent); 1446 } 1447 1448 *wr_id_out = wc.wr_id; 1449 if (byte_len) { 1450 *byte_len = wc.byte_len; 1451 } 1452 1453 return 0; 1454 } 1455 1456 /* 1457 * Block until the next work request has completed. 1458 * 1459 * First poll to see if a work request has already completed, 1460 * otherwise block. 1461 * 1462 * If we encounter completed work requests for IDs other than 1463 * the one we're interested in, then that's generally an error. 1464 * 1465 * The only exception is actual RDMA Write completions. These 1466 * completions only need to be recorded, but do not actually 1467 * need further processing. 1468 */ 1469 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested, 1470 uint32_t *byte_len) 1471 { 1472 int num_cq_events = 0, ret = 0; 1473 struct ibv_cq *cq; 1474 void *cq_ctx; 1475 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1476 1477 if (ibv_req_notify_cq(rdma->cq, 0)) { 1478 return -1; 1479 } 1480 /* poll cq first */ 1481 while (wr_id != wrid_requested) { 1482 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1483 if (ret < 0) { 1484 return ret; 1485 } 1486 1487 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1488 1489 if (wr_id == RDMA_WRID_NONE) { 1490 break; 1491 } 1492 if (wr_id != wrid_requested) { 1493 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1494 wrid_requested, print_wrid(wr_id), wr_id); 1495 } 1496 } 1497 1498 if (wr_id == wrid_requested) { 1499 return 0; 1500 } 1501 1502 while (1) { 1503 /* 1504 * Coroutine doesn't start until process_incoming_migration() 1505 * so don't yield unless we know we're running inside of a coroutine. 1506 */ 1507 if (rdma->migration_started_on_destination) { 1508 yield_until_fd_readable(rdma->comp_channel->fd); 1509 } 1510 1511 if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) { 1512 perror("ibv_get_cq_event"); 1513 goto err_block_for_wrid; 1514 } 1515 1516 num_cq_events++; 1517 1518 if (ibv_req_notify_cq(cq, 0)) { 1519 goto err_block_for_wrid; 1520 } 1521 1522 while (wr_id != wrid_requested) { 1523 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1524 if (ret < 0) { 1525 goto err_block_for_wrid; 1526 } 1527 1528 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1529 1530 if (wr_id == RDMA_WRID_NONE) { 1531 break; 1532 } 1533 if (wr_id != wrid_requested) { 1534 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1535 wrid_requested, print_wrid(wr_id), wr_id); 1536 } 1537 } 1538 1539 if (wr_id == wrid_requested) { 1540 goto success_block_for_wrid; 1541 } 1542 } 1543 1544 success_block_for_wrid: 1545 if (num_cq_events) { 1546 ibv_ack_cq_events(cq, num_cq_events); 1547 } 1548 return 0; 1549 1550 err_block_for_wrid: 1551 if (num_cq_events) { 1552 ibv_ack_cq_events(cq, num_cq_events); 1553 } 1554 return ret; 1555 } 1556 1557 /* 1558 * Post a SEND message work request for the control channel 1559 * containing some data and block until the post completes. 1560 */ 1561 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1562 RDMAControlHeader *head) 1563 { 1564 int ret = 0; 1565 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1566 struct ibv_send_wr *bad_wr; 1567 struct ibv_sge sge = { 1568 .addr = (uintptr_t)(wr->control), 1569 .length = head->len + sizeof(RDMAControlHeader), 1570 .lkey = wr->control_mr->lkey, 1571 }; 1572 struct ibv_send_wr send_wr = { 1573 .wr_id = RDMA_WRID_SEND_CONTROL, 1574 .opcode = IBV_WR_SEND, 1575 .send_flags = IBV_SEND_SIGNALED, 1576 .sg_list = &sge, 1577 .num_sge = 1, 1578 }; 1579 1580 trace_qemu_rdma_post_send_control(control_desc[head->type]); 1581 1582 /* 1583 * We don't actually need to do a memcpy() in here if we used 1584 * the "sge" properly, but since we're only sending control messages 1585 * (not RAM in a performance-critical path), then its OK for now. 1586 * 1587 * The copy makes the RDMAControlHeader simpler to manipulate 1588 * for the time being. 1589 */ 1590 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1591 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1592 control_to_network((void *) wr->control); 1593 1594 if (buf) { 1595 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1596 } 1597 1598 1599 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1600 1601 if (ret > 0) { 1602 error_report("Failed to use post IB SEND for control"); 1603 return -ret; 1604 } 1605 1606 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1607 if (ret < 0) { 1608 error_report("rdma migration: send polling control error"); 1609 } 1610 1611 return ret; 1612 } 1613 1614 /* 1615 * Post a RECV work request in anticipation of some future receipt 1616 * of data on the control channel. 1617 */ 1618 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx) 1619 { 1620 struct ibv_recv_wr *bad_wr; 1621 struct ibv_sge sge = { 1622 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1623 .length = RDMA_CONTROL_MAX_BUFFER, 1624 .lkey = rdma->wr_data[idx].control_mr->lkey, 1625 }; 1626 1627 struct ibv_recv_wr recv_wr = { 1628 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1629 .sg_list = &sge, 1630 .num_sge = 1, 1631 }; 1632 1633 1634 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1635 return -1; 1636 } 1637 1638 return 0; 1639 } 1640 1641 /* 1642 * Block and wait for a RECV control channel message to arrive. 1643 */ 1644 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1645 RDMAControlHeader *head, int expecting, int idx) 1646 { 1647 uint32_t byte_len; 1648 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1649 &byte_len); 1650 1651 if (ret < 0) { 1652 error_report("rdma migration: recv polling control error!"); 1653 return ret; 1654 } 1655 1656 network_to_control((void *) rdma->wr_data[idx].control); 1657 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1658 1659 trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]); 1660 1661 if (expecting == RDMA_CONTROL_NONE) { 1662 trace_qemu_rdma_exchange_get_response_none(control_desc[head->type], 1663 head->type); 1664 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1665 error_report("Was expecting a %s (%d) control message" 1666 ", but got: %s (%d), length: %d", 1667 control_desc[expecting], expecting, 1668 control_desc[head->type], head->type, head->len); 1669 return -EIO; 1670 } 1671 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1672 error_report("too long length: %d", head->len); 1673 return -EINVAL; 1674 } 1675 if (sizeof(*head) + head->len != byte_len) { 1676 error_report("Malformed length: %d byte_len %d", head->len, byte_len); 1677 return -EINVAL; 1678 } 1679 1680 return 0; 1681 } 1682 1683 /* 1684 * When a RECV work request has completed, the work request's 1685 * buffer is pointed at the header. 1686 * 1687 * This will advance the pointer to the data portion 1688 * of the control message of the work request's buffer that 1689 * was populated after the work request finished. 1690 */ 1691 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1692 RDMAControlHeader *head) 1693 { 1694 rdma->wr_data[idx].control_len = head->len; 1695 rdma->wr_data[idx].control_curr = 1696 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1697 } 1698 1699 /* 1700 * This is an 'atomic' high-level operation to deliver a single, unified 1701 * control-channel message. 1702 * 1703 * Additionally, if the user is expecting some kind of reply to this message, 1704 * they can request a 'resp' response message be filled in by posting an 1705 * additional work request on behalf of the user and waiting for an additional 1706 * completion. 1707 * 1708 * The extra (optional) response is used during registration to us from having 1709 * to perform an *additional* exchange of message just to provide a response by 1710 * instead piggy-backing on the acknowledgement. 1711 */ 1712 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1713 uint8_t *data, RDMAControlHeader *resp, 1714 int *resp_idx, 1715 int (*callback)(RDMAContext *rdma)) 1716 { 1717 int ret = 0; 1718 1719 /* 1720 * Wait until the dest is ready before attempting to deliver the message 1721 * by waiting for a READY message. 1722 */ 1723 if (rdma->control_ready_expected) { 1724 RDMAControlHeader resp; 1725 ret = qemu_rdma_exchange_get_response(rdma, 1726 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY); 1727 if (ret < 0) { 1728 return ret; 1729 } 1730 } 1731 1732 /* 1733 * If the user is expecting a response, post a WR in anticipation of it. 1734 */ 1735 if (resp) { 1736 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA); 1737 if (ret) { 1738 error_report("rdma migration: error posting" 1739 " extra control recv for anticipated result!"); 1740 return ret; 1741 } 1742 } 1743 1744 /* 1745 * Post a WR to replace the one we just consumed for the READY message. 1746 */ 1747 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1748 if (ret) { 1749 error_report("rdma migration: error posting first control recv!"); 1750 return ret; 1751 } 1752 1753 /* 1754 * Deliver the control message that was requested. 1755 */ 1756 ret = qemu_rdma_post_send_control(rdma, data, head); 1757 1758 if (ret < 0) { 1759 error_report("Failed to send control buffer!"); 1760 return ret; 1761 } 1762 1763 /* 1764 * If we're expecting a response, block and wait for it. 1765 */ 1766 if (resp) { 1767 if (callback) { 1768 trace_qemu_rdma_exchange_send_issue_callback(); 1769 ret = callback(rdma); 1770 if (ret < 0) { 1771 return ret; 1772 } 1773 } 1774 1775 trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]); 1776 ret = qemu_rdma_exchange_get_response(rdma, resp, 1777 resp->type, RDMA_WRID_DATA); 1778 1779 if (ret < 0) { 1780 return ret; 1781 } 1782 1783 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1784 if (resp_idx) { 1785 *resp_idx = RDMA_WRID_DATA; 1786 } 1787 trace_qemu_rdma_exchange_send_received(control_desc[resp->type]); 1788 } 1789 1790 rdma->control_ready_expected = 1; 1791 1792 return 0; 1793 } 1794 1795 /* 1796 * This is an 'atomic' high-level operation to receive a single, unified 1797 * control-channel message. 1798 */ 1799 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 1800 int expecting) 1801 { 1802 RDMAControlHeader ready = { 1803 .len = 0, 1804 .type = RDMA_CONTROL_READY, 1805 .repeat = 1, 1806 }; 1807 int ret; 1808 1809 /* 1810 * Inform the source that we're ready to receive a message. 1811 */ 1812 ret = qemu_rdma_post_send_control(rdma, NULL, &ready); 1813 1814 if (ret < 0) { 1815 error_report("Failed to send control buffer!"); 1816 return ret; 1817 } 1818 1819 /* 1820 * Block and wait for the message. 1821 */ 1822 ret = qemu_rdma_exchange_get_response(rdma, head, 1823 expecting, RDMA_WRID_READY); 1824 1825 if (ret < 0) { 1826 return ret; 1827 } 1828 1829 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 1830 1831 /* 1832 * Post a new RECV work request to replace the one we just consumed. 1833 */ 1834 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1835 if (ret) { 1836 error_report("rdma migration: error posting second control recv!"); 1837 return ret; 1838 } 1839 1840 return 0; 1841 } 1842 1843 /* 1844 * Write an actual chunk of memory using RDMA. 1845 * 1846 * If we're using dynamic registration on the dest-side, we have to 1847 * send a registration command first. 1848 */ 1849 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, 1850 int current_index, uint64_t current_addr, 1851 uint64_t length) 1852 { 1853 struct ibv_sge sge; 1854 struct ibv_send_wr send_wr = { 0 }; 1855 struct ibv_send_wr *bad_wr; 1856 int reg_result_idx, ret, count = 0; 1857 uint64_t chunk, chunks; 1858 uint8_t *chunk_start, *chunk_end; 1859 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 1860 RDMARegister reg; 1861 RDMARegisterResult *reg_result; 1862 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 1863 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1864 .type = RDMA_CONTROL_REGISTER_REQUEST, 1865 .repeat = 1, 1866 }; 1867 1868 retry: 1869 sge.addr = (uintptr_t)(block->local_host_addr + 1870 (current_addr - block->offset)); 1871 sge.length = length; 1872 1873 chunk = ram_chunk_index(block->local_host_addr, 1874 (uint8_t *)(uintptr_t)sge.addr); 1875 chunk_start = ram_chunk_start(block, chunk); 1876 1877 if (block->is_ram_block) { 1878 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 1879 1880 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1881 chunks--; 1882 } 1883 } else { 1884 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 1885 1886 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1887 chunks--; 1888 } 1889 } 1890 1891 trace_qemu_rdma_write_one_top(chunks + 1, 1892 (chunks + 1) * 1893 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 1894 1895 chunk_end = ram_chunk_end(block, chunk + chunks); 1896 1897 if (!rdma->pin_all) { 1898 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1899 qemu_rdma_unregister_waiting(rdma); 1900 #endif 1901 } 1902 1903 while (test_bit(chunk, block->transit_bitmap)) { 1904 (void)count; 1905 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 1906 sge.addr, length, rdma->nb_sent, block->nb_chunks); 1907 1908 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 1909 1910 if (ret < 0) { 1911 error_report("Failed to Wait for previous write to complete " 1912 "block %d chunk %" PRIu64 1913 " current %" PRIu64 " len %" PRIu64 " %d", 1914 current_index, chunk, sge.addr, length, rdma->nb_sent); 1915 return ret; 1916 } 1917 } 1918 1919 if (!rdma->pin_all || !block->is_ram_block) { 1920 if (!block->remote_keys[chunk]) { 1921 /* 1922 * This chunk has not yet been registered, so first check to see 1923 * if the entire chunk is zero. If so, tell the other size to 1924 * memset() + madvise() the entire chunk without RDMA. 1925 */ 1926 1927 if (can_use_buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr, 1928 length) 1929 && buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr, 1930 length) == length) { 1931 RDMACompress comp = { 1932 .offset = current_addr, 1933 .value = 0, 1934 .block_idx = current_index, 1935 .length = length, 1936 }; 1937 1938 head.len = sizeof(comp); 1939 head.type = RDMA_CONTROL_COMPRESS; 1940 1941 trace_qemu_rdma_write_one_zero(chunk, sge.length, 1942 current_index, current_addr); 1943 1944 compress_to_network(rdma, &comp); 1945 ret = qemu_rdma_exchange_send(rdma, &head, 1946 (uint8_t *) &comp, NULL, NULL, NULL); 1947 1948 if (ret < 0) { 1949 return -EIO; 1950 } 1951 1952 acct_update_position(f, sge.length, true); 1953 1954 return 1; 1955 } 1956 1957 /* 1958 * Otherwise, tell other side to register. 1959 */ 1960 reg.current_index = current_index; 1961 if (block->is_ram_block) { 1962 reg.key.current_addr = current_addr; 1963 } else { 1964 reg.key.chunk = chunk; 1965 } 1966 reg.chunks = chunks; 1967 1968 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 1969 current_addr); 1970 1971 register_to_network(rdma, ®); 1972 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1973 &resp, ®_result_idx, NULL); 1974 if (ret < 0) { 1975 return ret; 1976 } 1977 1978 /* try to overlap this single registration with the one we sent. */ 1979 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1980 &sge.lkey, NULL, chunk, 1981 chunk_start, chunk_end)) { 1982 error_report("cannot get lkey"); 1983 return -EINVAL; 1984 } 1985 1986 reg_result = (RDMARegisterResult *) 1987 rdma->wr_data[reg_result_idx].control_curr; 1988 1989 network_to_result(reg_result); 1990 1991 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 1992 reg_result->rkey, chunk); 1993 1994 block->remote_keys[chunk] = reg_result->rkey; 1995 block->remote_host_addr = reg_result->host_addr; 1996 } else { 1997 /* already registered before */ 1998 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1999 &sge.lkey, NULL, chunk, 2000 chunk_start, chunk_end)) { 2001 error_report("cannot get lkey!"); 2002 return -EINVAL; 2003 } 2004 } 2005 2006 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 2007 } else { 2008 send_wr.wr.rdma.rkey = block->remote_rkey; 2009 2010 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2011 &sge.lkey, NULL, chunk, 2012 chunk_start, chunk_end)) { 2013 error_report("cannot get lkey!"); 2014 return -EINVAL; 2015 } 2016 } 2017 2018 /* 2019 * Encode the ram block index and chunk within this wrid. 2020 * We will use this information at the time of completion 2021 * to figure out which bitmap to check against and then which 2022 * chunk in the bitmap to look for. 2023 */ 2024 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 2025 current_index, chunk); 2026 2027 send_wr.opcode = IBV_WR_RDMA_WRITE; 2028 send_wr.send_flags = IBV_SEND_SIGNALED; 2029 send_wr.sg_list = &sge; 2030 send_wr.num_sge = 1; 2031 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2032 (current_addr - block->offset); 2033 2034 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2035 sge.length); 2036 2037 /* 2038 * ibv_post_send() does not return negative error numbers, 2039 * per the specification they are positive - no idea why. 2040 */ 2041 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2042 2043 if (ret == ENOMEM) { 2044 trace_qemu_rdma_write_one_queue_full(); 2045 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2046 if (ret < 0) { 2047 error_report("rdma migration: failed to make " 2048 "room in full send queue! %d", ret); 2049 return ret; 2050 } 2051 2052 goto retry; 2053 2054 } else if (ret > 0) { 2055 perror("rdma migration: post rdma write failed"); 2056 return -ret; 2057 } 2058 2059 set_bit(chunk, block->transit_bitmap); 2060 acct_update_position(f, sge.length, false); 2061 rdma->total_writes++; 2062 2063 return 0; 2064 } 2065 2066 /* 2067 * Push out any unwritten RDMA operations. 2068 * 2069 * We support sending out multiple chunks at the same time. 2070 * Not all of them need to get signaled in the completion queue. 2071 */ 2072 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) 2073 { 2074 int ret; 2075 2076 if (!rdma->current_length) { 2077 return 0; 2078 } 2079 2080 ret = qemu_rdma_write_one(f, rdma, 2081 rdma->current_index, rdma->current_addr, rdma->current_length); 2082 2083 if (ret < 0) { 2084 return ret; 2085 } 2086 2087 if (ret == 0) { 2088 rdma->nb_sent++; 2089 trace_qemu_rdma_write_flush(rdma->nb_sent); 2090 } 2091 2092 rdma->current_length = 0; 2093 rdma->current_addr = 0; 2094 2095 return 0; 2096 } 2097 2098 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma, 2099 uint64_t offset, uint64_t len) 2100 { 2101 RDMALocalBlock *block; 2102 uint8_t *host_addr; 2103 uint8_t *chunk_end; 2104 2105 if (rdma->current_index < 0) { 2106 return 0; 2107 } 2108 2109 if (rdma->current_chunk < 0) { 2110 return 0; 2111 } 2112 2113 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2114 host_addr = block->local_host_addr + (offset - block->offset); 2115 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2116 2117 if (rdma->current_length == 0) { 2118 return 0; 2119 } 2120 2121 /* 2122 * Only merge into chunk sequentially. 2123 */ 2124 if (offset != (rdma->current_addr + rdma->current_length)) { 2125 return 0; 2126 } 2127 2128 if (offset < block->offset) { 2129 return 0; 2130 } 2131 2132 if ((offset + len) > (block->offset + block->length)) { 2133 return 0; 2134 } 2135 2136 if ((host_addr + len) > chunk_end) { 2137 return 0; 2138 } 2139 2140 return 1; 2141 } 2142 2143 /* 2144 * We're not actually writing here, but doing three things: 2145 * 2146 * 1. Identify the chunk the buffer belongs to. 2147 * 2. If the chunk is full or the buffer doesn't belong to the current 2148 * chunk, then start a new chunk and flush() the old chunk. 2149 * 3. To keep the hardware busy, we also group chunks into batches 2150 * and only require that a batch gets acknowledged in the completion 2151 * qeueue instead of each individual chunk. 2152 */ 2153 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma, 2154 uint64_t block_offset, uint64_t offset, 2155 uint64_t len) 2156 { 2157 uint64_t current_addr = block_offset + offset; 2158 uint64_t index = rdma->current_index; 2159 uint64_t chunk = rdma->current_chunk; 2160 int ret; 2161 2162 /* If we cannot merge it, we flush the current buffer first. */ 2163 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) { 2164 ret = qemu_rdma_write_flush(f, rdma); 2165 if (ret) { 2166 return ret; 2167 } 2168 rdma->current_length = 0; 2169 rdma->current_addr = current_addr; 2170 2171 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2172 offset, len, &index, &chunk); 2173 if (ret) { 2174 error_report("ram block search failed"); 2175 return ret; 2176 } 2177 rdma->current_index = index; 2178 rdma->current_chunk = chunk; 2179 } 2180 2181 /* merge it */ 2182 rdma->current_length += len; 2183 2184 /* flush it if buffer is too large */ 2185 if (rdma->current_length >= RDMA_MERGE_MAX) { 2186 return qemu_rdma_write_flush(f, rdma); 2187 } 2188 2189 return 0; 2190 } 2191 2192 static void qemu_rdma_cleanup(RDMAContext *rdma) 2193 { 2194 struct rdma_cm_event *cm_event; 2195 int ret, idx; 2196 2197 if (rdma->cm_id && rdma->connected) { 2198 if (rdma->error_state) { 2199 RDMAControlHeader head = { .len = 0, 2200 .type = RDMA_CONTROL_ERROR, 2201 .repeat = 1, 2202 }; 2203 error_report("Early error. Sending error."); 2204 qemu_rdma_post_send_control(rdma, NULL, &head); 2205 } 2206 2207 ret = rdma_disconnect(rdma->cm_id); 2208 if (!ret) { 2209 trace_qemu_rdma_cleanup_waiting_for_disconnect(); 2210 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2211 if (!ret) { 2212 rdma_ack_cm_event(cm_event); 2213 } 2214 } 2215 trace_qemu_rdma_cleanup_disconnect(); 2216 rdma->connected = false; 2217 } 2218 2219 g_free(rdma->dest_blocks); 2220 rdma->dest_blocks = NULL; 2221 2222 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2223 if (rdma->wr_data[idx].control_mr) { 2224 rdma->total_registrations--; 2225 ibv_dereg_mr(rdma->wr_data[idx].control_mr); 2226 } 2227 rdma->wr_data[idx].control_mr = NULL; 2228 } 2229 2230 if (rdma->local_ram_blocks.block) { 2231 while (rdma->local_ram_blocks.nb_blocks) { 2232 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); 2233 } 2234 } 2235 2236 if (rdma->qp) { 2237 rdma_destroy_qp(rdma->cm_id); 2238 rdma->qp = NULL; 2239 } 2240 if (rdma->cq) { 2241 ibv_destroy_cq(rdma->cq); 2242 rdma->cq = NULL; 2243 } 2244 if (rdma->comp_channel) { 2245 ibv_destroy_comp_channel(rdma->comp_channel); 2246 rdma->comp_channel = NULL; 2247 } 2248 if (rdma->pd) { 2249 ibv_dealloc_pd(rdma->pd); 2250 rdma->pd = NULL; 2251 } 2252 if (rdma->cm_id) { 2253 rdma_destroy_id(rdma->cm_id); 2254 rdma->cm_id = NULL; 2255 } 2256 if (rdma->listen_id) { 2257 rdma_destroy_id(rdma->listen_id); 2258 rdma->listen_id = NULL; 2259 } 2260 if (rdma->channel) { 2261 rdma_destroy_event_channel(rdma->channel); 2262 rdma->channel = NULL; 2263 } 2264 g_free(rdma->host); 2265 rdma->host = NULL; 2266 } 2267 2268 2269 static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all) 2270 { 2271 int ret, idx; 2272 Error *local_err = NULL, **temp = &local_err; 2273 2274 /* 2275 * Will be validated against destination's actual capabilities 2276 * after the connect() completes. 2277 */ 2278 rdma->pin_all = pin_all; 2279 2280 ret = qemu_rdma_resolve_host(rdma, temp); 2281 if (ret) { 2282 goto err_rdma_source_init; 2283 } 2284 2285 ret = qemu_rdma_alloc_pd_cq(rdma); 2286 if (ret) { 2287 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()" 2288 " limits may be too low. Please check $ ulimit -a # and " 2289 "search for 'ulimit -l' in the output"); 2290 goto err_rdma_source_init; 2291 } 2292 2293 ret = qemu_rdma_alloc_qp(rdma); 2294 if (ret) { 2295 ERROR(temp, "rdma migration: error allocating qp!"); 2296 goto err_rdma_source_init; 2297 } 2298 2299 ret = qemu_rdma_init_ram_blocks(rdma); 2300 if (ret) { 2301 ERROR(temp, "rdma migration: error initializing ram blocks!"); 2302 goto err_rdma_source_init; 2303 } 2304 2305 /* Build the hash that maps from offset to RAMBlock */ 2306 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 2307 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) { 2308 g_hash_table_insert(rdma->blockmap, 2309 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset, 2310 &rdma->local_ram_blocks.block[idx]); 2311 } 2312 2313 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2314 ret = qemu_rdma_reg_control(rdma, idx); 2315 if (ret) { 2316 ERROR(temp, "rdma migration: error registering %d control!", 2317 idx); 2318 goto err_rdma_source_init; 2319 } 2320 } 2321 2322 return 0; 2323 2324 err_rdma_source_init: 2325 error_propagate(errp, local_err); 2326 qemu_rdma_cleanup(rdma); 2327 return -1; 2328 } 2329 2330 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp) 2331 { 2332 RDMACapabilities cap = { 2333 .version = RDMA_CONTROL_VERSION_CURRENT, 2334 .flags = 0, 2335 }; 2336 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2337 .retry_count = 5, 2338 .private_data = &cap, 2339 .private_data_len = sizeof(cap), 2340 }; 2341 struct rdma_cm_event *cm_event; 2342 int ret; 2343 2344 /* 2345 * Only negotiate the capability with destination if the user 2346 * on the source first requested the capability. 2347 */ 2348 if (rdma->pin_all) { 2349 trace_qemu_rdma_connect_pin_all_requested(); 2350 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2351 } 2352 2353 caps_to_network(&cap); 2354 2355 ret = rdma_connect(rdma->cm_id, &conn_param); 2356 if (ret) { 2357 perror("rdma_connect"); 2358 ERROR(errp, "connecting to destination!"); 2359 goto err_rdma_source_connect; 2360 } 2361 2362 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2363 if (ret) { 2364 perror("rdma_get_cm_event after rdma_connect"); 2365 ERROR(errp, "connecting to destination!"); 2366 rdma_ack_cm_event(cm_event); 2367 goto err_rdma_source_connect; 2368 } 2369 2370 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2371 perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect"); 2372 ERROR(errp, "connecting to destination!"); 2373 rdma_ack_cm_event(cm_event); 2374 goto err_rdma_source_connect; 2375 } 2376 rdma->connected = true; 2377 2378 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2379 network_to_caps(&cap); 2380 2381 /* 2382 * Verify that the *requested* capabilities are supported by the destination 2383 * and disable them otherwise. 2384 */ 2385 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2386 ERROR(errp, "Server cannot support pinning all memory. " 2387 "Will register memory dynamically."); 2388 rdma->pin_all = false; 2389 } 2390 2391 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2392 2393 rdma_ack_cm_event(cm_event); 2394 2395 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2396 if (ret) { 2397 ERROR(errp, "posting second control recv!"); 2398 goto err_rdma_source_connect; 2399 } 2400 2401 rdma->control_ready_expected = 1; 2402 rdma->nb_sent = 0; 2403 return 0; 2404 2405 err_rdma_source_connect: 2406 qemu_rdma_cleanup(rdma); 2407 return -1; 2408 } 2409 2410 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2411 { 2412 int ret, idx; 2413 struct rdma_cm_id *listen_id; 2414 char ip[40] = "unknown"; 2415 struct rdma_addrinfo *res, *e; 2416 char port_str[16]; 2417 2418 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2419 rdma->wr_data[idx].control_len = 0; 2420 rdma->wr_data[idx].control_curr = NULL; 2421 } 2422 2423 if (!rdma->host || !rdma->host[0]) { 2424 ERROR(errp, "RDMA host is not set!"); 2425 rdma->error_state = -EINVAL; 2426 return -1; 2427 } 2428 /* create CM channel */ 2429 rdma->channel = rdma_create_event_channel(); 2430 if (!rdma->channel) { 2431 ERROR(errp, "could not create rdma event channel"); 2432 rdma->error_state = -EINVAL; 2433 return -1; 2434 } 2435 2436 /* create CM id */ 2437 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2438 if (ret) { 2439 ERROR(errp, "could not create cm_id!"); 2440 goto err_dest_init_create_listen_id; 2441 } 2442 2443 snprintf(port_str, 16, "%d", rdma->port); 2444 port_str[15] = '\0'; 2445 2446 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2447 if (ret < 0) { 2448 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 2449 goto err_dest_init_bind_addr; 2450 } 2451 2452 for (e = res; e != NULL; e = e->ai_next) { 2453 inet_ntop(e->ai_family, 2454 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2455 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2456 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2457 if (ret) { 2458 continue; 2459 } 2460 if (e->ai_family == AF_INET6) { 2461 ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs); 2462 if (ret) { 2463 continue; 2464 } 2465 } 2466 break; 2467 } 2468 2469 if (!e) { 2470 ERROR(errp, "Error: could not rdma_bind_addr!"); 2471 goto err_dest_init_bind_addr; 2472 } 2473 2474 rdma->listen_id = listen_id; 2475 qemu_rdma_dump_gid("dest_init", listen_id); 2476 return 0; 2477 2478 err_dest_init_bind_addr: 2479 rdma_destroy_id(listen_id); 2480 err_dest_init_create_listen_id: 2481 rdma_destroy_event_channel(rdma->channel); 2482 rdma->channel = NULL; 2483 rdma->error_state = ret; 2484 return ret; 2485 2486 } 2487 2488 static void *qemu_rdma_data_init(const char *host_port, Error **errp) 2489 { 2490 RDMAContext *rdma = NULL; 2491 InetSocketAddress *addr; 2492 2493 if (host_port) { 2494 rdma = g_new0(RDMAContext, 1); 2495 rdma->current_index = -1; 2496 rdma->current_chunk = -1; 2497 2498 addr = inet_parse(host_port, NULL); 2499 if (addr != NULL) { 2500 rdma->port = atoi(addr->port); 2501 rdma->host = g_strdup(addr->host); 2502 } else { 2503 ERROR(errp, "bad RDMA migration address '%s'", host_port); 2504 g_free(rdma); 2505 rdma = NULL; 2506 } 2507 2508 qapi_free_InetSocketAddress(addr); 2509 } 2510 2511 return rdma; 2512 } 2513 2514 /* 2515 * QEMUFile interface to the control channel. 2516 * SEND messages for control only. 2517 * VM's ram is handled with regular RDMA messages. 2518 */ 2519 static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, 2520 int64_t pos, size_t size) 2521 { 2522 QEMUFileRDMA *r = opaque; 2523 QEMUFile *f = r->file; 2524 RDMAContext *rdma = r->rdma; 2525 size_t remaining = size; 2526 uint8_t * data = (void *) buf; 2527 int ret; 2528 2529 CHECK_ERROR_STATE(); 2530 2531 /* 2532 * Push out any writes that 2533 * we're queued up for VM's ram. 2534 */ 2535 ret = qemu_rdma_write_flush(f, rdma); 2536 if (ret < 0) { 2537 rdma->error_state = ret; 2538 return ret; 2539 } 2540 2541 while (remaining) { 2542 RDMAControlHeader head; 2543 2544 r->len = MIN(remaining, RDMA_SEND_INCREMENT); 2545 remaining -= r->len; 2546 2547 /* Guaranteed to fit due to RDMA_SEND_INCREMENT MIN above */ 2548 head.len = (uint32_t)r->len; 2549 head.type = RDMA_CONTROL_QEMU_FILE; 2550 2551 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); 2552 2553 if (ret < 0) { 2554 rdma->error_state = ret; 2555 return ret; 2556 } 2557 2558 data += r->len; 2559 } 2560 2561 return size; 2562 } 2563 2564 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2565 size_t size, int idx) 2566 { 2567 size_t len = 0; 2568 2569 if (rdma->wr_data[idx].control_len) { 2570 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2571 2572 len = MIN(size, rdma->wr_data[idx].control_len); 2573 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2574 rdma->wr_data[idx].control_curr += len; 2575 rdma->wr_data[idx].control_len -= len; 2576 } 2577 2578 return len; 2579 } 2580 2581 /* 2582 * QEMUFile interface to the control channel. 2583 * RDMA links don't use bytestreams, so we have to 2584 * return bytes to QEMUFile opportunistically. 2585 */ 2586 static ssize_t qemu_rdma_get_buffer(void *opaque, uint8_t *buf, 2587 int64_t pos, size_t size) 2588 { 2589 QEMUFileRDMA *r = opaque; 2590 RDMAContext *rdma = r->rdma; 2591 RDMAControlHeader head; 2592 int ret = 0; 2593 2594 CHECK_ERROR_STATE(); 2595 2596 /* 2597 * First, we hold on to the last SEND message we 2598 * were given and dish out the bytes until we run 2599 * out of bytes. 2600 */ 2601 r->len = qemu_rdma_fill(r->rdma, buf, size, 0); 2602 if (r->len) { 2603 return r->len; 2604 } 2605 2606 /* 2607 * Once we run out, we block and wait for another 2608 * SEND message to arrive. 2609 */ 2610 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); 2611 2612 if (ret < 0) { 2613 rdma->error_state = ret; 2614 return ret; 2615 } 2616 2617 /* 2618 * SEND was received with new bytes, now try again. 2619 */ 2620 return qemu_rdma_fill(r->rdma, buf, size, 0); 2621 } 2622 2623 /* 2624 * Block until all the outstanding chunks have been delivered by the hardware. 2625 */ 2626 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) 2627 { 2628 int ret; 2629 2630 if (qemu_rdma_write_flush(f, rdma) < 0) { 2631 return -EIO; 2632 } 2633 2634 while (rdma->nb_sent) { 2635 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2636 if (ret < 0) { 2637 error_report("rdma migration: complete polling error!"); 2638 return -EIO; 2639 } 2640 } 2641 2642 qemu_rdma_unregister_waiting(rdma); 2643 2644 return 0; 2645 } 2646 2647 static int qemu_rdma_close(void *opaque) 2648 { 2649 trace_qemu_rdma_close(); 2650 QEMUFileRDMA *r = opaque; 2651 if (r->rdma) { 2652 qemu_rdma_cleanup(r->rdma); 2653 g_free(r->rdma); 2654 } 2655 g_free(r); 2656 return 0; 2657 } 2658 2659 /* 2660 * Parameters: 2661 * @offset == 0 : 2662 * This means that 'block_offset' is a full virtual address that does not 2663 * belong to a RAMBlock of the virtual machine and instead 2664 * represents a private malloc'd memory area that the caller wishes to 2665 * transfer. 2666 * 2667 * @offset != 0 : 2668 * Offset is an offset to be added to block_offset and used 2669 * to also lookup the corresponding RAMBlock. 2670 * 2671 * @size > 0 : 2672 * Initiate an transfer this size. 2673 * 2674 * @size == 0 : 2675 * A 'hint' or 'advice' that means that we wish to speculatively 2676 * and asynchronously unregister this memory. In this case, there is no 2677 * guarantee that the unregister will actually happen, for example, 2678 * if the memory is being actively transmitted. Additionally, the memory 2679 * may be re-registered at any future time if a write within the same 2680 * chunk was requested again, even if you attempted to unregister it 2681 * here. 2682 * 2683 * @size < 0 : TODO, not yet supported 2684 * Unregister the memory NOW. This means that the caller does not 2685 * expect there to be any future RDMA transfers and we just want to clean 2686 * things up. This is used in case the upper layer owns the memory and 2687 * cannot wait for qemu_fclose() to occur. 2688 * 2689 * @bytes_sent : User-specificed pointer to indicate how many bytes were 2690 * sent. Usually, this will not be more than a few bytes of 2691 * the protocol because most transfers are sent asynchronously. 2692 */ 2693 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, 2694 ram_addr_t block_offset, ram_addr_t offset, 2695 size_t size, uint64_t *bytes_sent) 2696 { 2697 QEMUFileRDMA *rfile = opaque; 2698 RDMAContext *rdma = rfile->rdma; 2699 int ret; 2700 2701 CHECK_ERROR_STATE(); 2702 2703 qemu_fflush(f); 2704 2705 if (size > 0) { 2706 /* 2707 * Add this page to the current 'chunk'. If the chunk 2708 * is full, or the page doen't belong to the current chunk, 2709 * an actual RDMA write will occur and a new chunk will be formed. 2710 */ 2711 ret = qemu_rdma_write(f, rdma, block_offset, offset, size); 2712 if (ret < 0) { 2713 error_report("rdma migration: write error! %d", ret); 2714 goto err; 2715 } 2716 2717 /* 2718 * We always return 1 bytes because the RDMA 2719 * protocol is completely asynchronous. We do not yet know 2720 * whether an identified chunk is zero or not because we're 2721 * waiting for other pages to potentially be merged with 2722 * the current chunk. So, we have to call qemu_update_position() 2723 * later on when the actual write occurs. 2724 */ 2725 if (bytes_sent) { 2726 *bytes_sent = 1; 2727 } 2728 } else { 2729 uint64_t index, chunk; 2730 2731 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long 2732 if (size < 0) { 2733 ret = qemu_rdma_drain_cq(f, rdma); 2734 if (ret < 0) { 2735 fprintf(stderr, "rdma: failed to synchronously drain" 2736 " completion queue before unregistration.\n"); 2737 goto err; 2738 } 2739 } 2740 */ 2741 2742 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2743 offset, size, &index, &chunk); 2744 2745 if (ret) { 2746 error_report("ram block search failed"); 2747 goto err; 2748 } 2749 2750 qemu_rdma_signal_unregister(rdma, index, chunk, 0); 2751 2752 /* 2753 * TODO: Synchronous, guaranteed unregistration (should not occur during 2754 * fast-path). Otherwise, unregisters will process on the next call to 2755 * qemu_rdma_drain_cq() 2756 if (size < 0) { 2757 qemu_rdma_unregister_waiting(rdma); 2758 } 2759 */ 2760 } 2761 2762 /* 2763 * Drain the Completion Queue if possible, but do not block, 2764 * just poll. 2765 * 2766 * If nothing to poll, the end of the iteration will do this 2767 * again to make sure we don't overflow the request queue. 2768 */ 2769 while (1) { 2770 uint64_t wr_id, wr_id_in; 2771 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL); 2772 if (ret < 0) { 2773 error_report("rdma migration: polling error! %d", ret); 2774 goto err; 2775 } 2776 2777 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 2778 2779 if (wr_id == RDMA_WRID_NONE) { 2780 break; 2781 } 2782 } 2783 2784 return RAM_SAVE_CONTROL_DELAYED; 2785 err: 2786 rdma->error_state = ret; 2787 return ret; 2788 } 2789 2790 static int qemu_rdma_accept(RDMAContext *rdma) 2791 { 2792 RDMACapabilities cap; 2793 struct rdma_conn_param conn_param = { 2794 .responder_resources = 2, 2795 .private_data = &cap, 2796 .private_data_len = sizeof(cap), 2797 }; 2798 struct rdma_cm_event *cm_event; 2799 struct ibv_context *verbs; 2800 int ret = -EINVAL; 2801 int idx; 2802 2803 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2804 if (ret) { 2805 goto err_rdma_dest_wait; 2806 } 2807 2808 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 2809 rdma_ack_cm_event(cm_event); 2810 goto err_rdma_dest_wait; 2811 } 2812 2813 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2814 2815 network_to_caps(&cap); 2816 2817 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 2818 error_report("Unknown source RDMA version: %d, bailing...", 2819 cap.version); 2820 rdma_ack_cm_event(cm_event); 2821 goto err_rdma_dest_wait; 2822 } 2823 2824 /* 2825 * Respond with only the capabilities this version of QEMU knows about. 2826 */ 2827 cap.flags &= known_capabilities; 2828 2829 /* 2830 * Enable the ones that we do know about. 2831 * Add other checks here as new ones are introduced. 2832 */ 2833 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 2834 rdma->pin_all = true; 2835 } 2836 2837 rdma->cm_id = cm_event->id; 2838 verbs = cm_event->id->verbs; 2839 2840 rdma_ack_cm_event(cm_event); 2841 2842 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 2843 2844 caps_to_network(&cap); 2845 2846 trace_qemu_rdma_accept_pin_verbsc(verbs); 2847 2848 if (!rdma->verbs) { 2849 rdma->verbs = verbs; 2850 } else if (rdma->verbs != verbs) { 2851 error_report("ibv context not matching %p, %p!", rdma->verbs, 2852 verbs); 2853 goto err_rdma_dest_wait; 2854 } 2855 2856 qemu_rdma_dump_id("dest_init", verbs); 2857 2858 ret = qemu_rdma_alloc_pd_cq(rdma); 2859 if (ret) { 2860 error_report("rdma migration: error allocating pd and cq!"); 2861 goto err_rdma_dest_wait; 2862 } 2863 2864 ret = qemu_rdma_alloc_qp(rdma); 2865 if (ret) { 2866 error_report("rdma migration: error allocating qp!"); 2867 goto err_rdma_dest_wait; 2868 } 2869 2870 ret = qemu_rdma_init_ram_blocks(rdma); 2871 if (ret) { 2872 error_report("rdma migration: error initializing ram blocks!"); 2873 goto err_rdma_dest_wait; 2874 } 2875 2876 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2877 ret = qemu_rdma_reg_control(rdma, idx); 2878 if (ret) { 2879 error_report("rdma: error registering %d control", idx); 2880 goto err_rdma_dest_wait; 2881 } 2882 } 2883 2884 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 2885 2886 ret = rdma_accept(rdma->cm_id, &conn_param); 2887 if (ret) { 2888 error_report("rdma_accept returns %d", ret); 2889 goto err_rdma_dest_wait; 2890 } 2891 2892 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2893 if (ret) { 2894 error_report("rdma_accept get_cm_event failed %d", ret); 2895 goto err_rdma_dest_wait; 2896 } 2897 2898 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2899 error_report("rdma_accept not event established"); 2900 rdma_ack_cm_event(cm_event); 2901 goto err_rdma_dest_wait; 2902 } 2903 2904 rdma_ack_cm_event(cm_event); 2905 rdma->connected = true; 2906 2907 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2908 if (ret) { 2909 error_report("rdma migration: error posting second control recv"); 2910 goto err_rdma_dest_wait; 2911 } 2912 2913 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 2914 2915 return 0; 2916 2917 err_rdma_dest_wait: 2918 rdma->error_state = ret; 2919 qemu_rdma_cleanup(rdma); 2920 return ret; 2921 } 2922 2923 static int dest_ram_sort_func(const void *a, const void *b) 2924 { 2925 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index; 2926 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index; 2927 2928 return (a_index < b_index) ? -1 : (a_index != b_index); 2929 } 2930 2931 /* 2932 * During each iteration of the migration, we listen for instructions 2933 * by the source VM to perform dynamic page registrations before they 2934 * can perform RDMA operations. 2935 * 2936 * We respond with the 'rkey'. 2937 * 2938 * Keep doing this until the source tells us to stop. 2939 */ 2940 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) 2941 { 2942 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 2943 .type = RDMA_CONTROL_REGISTER_RESULT, 2944 .repeat = 0, 2945 }; 2946 RDMAControlHeader unreg_resp = { .len = 0, 2947 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 2948 .repeat = 0, 2949 }; 2950 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 2951 .repeat = 1 }; 2952 QEMUFileRDMA *rfile = opaque; 2953 RDMAContext *rdma = rfile->rdma; 2954 RDMALocalBlocks *local = &rdma->local_ram_blocks; 2955 RDMAControlHeader head; 2956 RDMARegister *reg, *registers; 2957 RDMACompress *comp; 2958 RDMARegisterResult *reg_result; 2959 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 2960 RDMALocalBlock *block; 2961 void *host_addr; 2962 int ret = 0; 2963 int idx = 0; 2964 int count = 0; 2965 int i = 0; 2966 2967 CHECK_ERROR_STATE(); 2968 2969 do { 2970 trace_qemu_rdma_registration_handle_wait(); 2971 2972 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE); 2973 2974 if (ret < 0) { 2975 break; 2976 } 2977 2978 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 2979 error_report("rdma: Too many requests in this message (%d)." 2980 "Bailing.", head.repeat); 2981 ret = -EIO; 2982 break; 2983 } 2984 2985 switch (head.type) { 2986 case RDMA_CONTROL_COMPRESS: 2987 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 2988 network_to_compress(comp); 2989 2990 trace_qemu_rdma_registration_handle_compress(comp->length, 2991 comp->block_idx, 2992 comp->offset); 2993 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) { 2994 error_report("rdma: 'compress' bad block index %u (vs %d)", 2995 (unsigned int)comp->block_idx, 2996 rdma->local_ram_blocks.nb_blocks); 2997 ret = -EIO; 2998 goto out; 2999 } 3000 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 3001 3002 host_addr = block->local_host_addr + 3003 (comp->offset - block->offset); 3004 3005 ram_handle_compressed(host_addr, comp->value, comp->length); 3006 break; 3007 3008 case RDMA_CONTROL_REGISTER_FINISHED: 3009 trace_qemu_rdma_registration_handle_finished(); 3010 goto out; 3011 3012 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 3013 trace_qemu_rdma_registration_handle_ram_blocks(); 3014 3015 /* Sort our local RAM Block list so it's the same as the source, 3016 * we can do this since we've filled in a src_index in the list 3017 * as we received the RAMBlock list earlier. 3018 */ 3019 qsort(rdma->local_ram_blocks.block, 3020 rdma->local_ram_blocks.nb_blocks, 3021 sizeof(RDMALocalBlock), dest_ram_sort_func); 3022 if (rdma->pin_all) { 3023 ret = qemu_rdma_reg_whole_ram_blocks(rdma); 3024 if (ret) { 3025 error_report("rdma migration: error dest " 3026 "registering ram blocks"); 3027 goto out; 3028 } 3029 } 3030 3031 /* 3032 * Dest uses this to prepare to transmit the RAMBlock descriptions 3033 * to the source VM after connection setup. 3034 * Both sides use the "remote" structure to communicate and update 3035 * their "local" descriptions with what was sent. 3036 */ 3037 for (i = 0; i < local->nb_blocks; i++) { 3038 rdma->dest_blocks[i].remote_host_addr = 3039 (uintptr_t)(local->block[i].local_host_addr); 3040 3041 if (rdma->pin_all) { 3042 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 3043 } 3044 3045 rdma->dest_blocks[i].offset = local->block[i].offset; 3046 rdma->dest_blocks[i].length = local->block[i].length; 3047 3048 dest_block_to_network(&rdma->dest_blocks[i]); 3049 trace_qemu_rdma_registration_handle_ram_blocks_loop( 3050 local->block[i].block_name, 3051 local->block[i].offset, 3052 local->block[i].length, 3053 local->block[i].local_host_addr, 3054 local->block[i].src_index); 3055 } 3056 3057 blocks.len = rdma->local_ram_blocks.nb_blocks 3058 * sizeof(RDMADestBlock); 3059 3060 3061 ret = qemu_rdma_post_send_control(rdma, 3062 (uint8_t *) rdma->dest_blocks, &blocks); 3063 3064 if (ret < 0) { 3065 error_report("rdma migration: error sending remote info"); 3066 goto out; 3067 } 3068 3069 break; 3070 case RDMA_CONTROL_REGISTER_REQUEST: 3071 trace_qemu_rdma_registration_handle_register(head.repeat); 3072 3073 reg_resp.repeat = head.repeat; 3074 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3075 3076 for (count = 0; count < head.repeat; count++) { 3077 uint64_t chunk; 3078 uint8_t *chunk_start, *chunk_end; 3079 3080 reg = ®isters[count]; 3081 network_to_register(reg); 3082 3083 reg_result = &results[count]; 3084 3085 trace_qemu_rdma_registration_handle_register_loop(count, 3086 reg->current_index, reg->key.current_addr, reg->chunks); 3087 3088 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) { 3089 error_report("rdma: 'register' bad block index %u (vs %d)", 3090 (unsigned int)reg->current_index, 3091 rdma->local_ram_blocks.nb_blocks); 3092 ret = -ENOENT; 3093 goto out; 3094 } 3095 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3096 if (block->is_ram_block) { 3097 if (block->offset > reg->key.current_addr) { 3098 error_report("rdma: bad register address for block %s" 3099 " offset: %" PRIx64 " current_addr: %" PRIx64, 3100 block->block_name, block->offset, 3101 reg->key.current_addr); 3102 ret = -ERANGE; 3103 goto out; 3104 } 3105 host_addr = (block->local_host_addr + 3106 (reg->key.current_addr - block->offset)); 3107 chunk = ram_chunk_index(block->local_host_addr, 3108 (uint8_t *) host_addr); 3109 } else { 3110 chunk = reg->key.chunk; 3111 host_addr = block->local_host_addr + 3112 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3113 /* Check for particularly bad chunk value */ 3114 if (host_addr < (void *)block->local_host_addr) { 3115 error_report("rdma: bad chunk for block %s" 3116 " chunk: %" PRIx64, 3117 block->block_name, reg->key.chunk); 3118 ret = -ERANGE; 3119 goto out; 3120 } 3121 } 3122 chunk_start = ram_chunk_start(block, chunk); 3123 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3124 if (qemu_rdma_register_and_get_keys(rdma, block, 3125 (uintptr_t)host_addr, NULL, ®_result->rkey, 3126 chunk, chunk_start, chunk_end)) { 3127 error_report("cannot get rkey"); 3128 ret = -EINVAL; 3129 goto out; 3130 } 3131 3132 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3133 3134 trace_qemu_rdma_registration_handle_register_rkey( 3135 reg_result->rkey); 3136 3137 result_to_network(reg_result); 3138 } 3139 3140 ret = qemu_rdma_post_send_control(rdma, 3141 (uint8_t *) results, ®_resp); 3142 3143 if (ret < 0) { 3144 error_report("Failed to send control buffer"); 3145 goto out; 3146 } 3147 break; 3148 case RDMA_CONTROL_UNREGISTER_REQUEST: 3149 trace_qemu_rdma_registration_handle_unregister(head.repeat); 3150 unreg_resp.repeat = head.repeat; 3151 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3152 3153 for (count = 0; count < head.repeat; count++) { 3154 reg = ®isters[count]; 3155 network_to_register(reg); 3156 3157 trace_qemu_rdma_registration_handle_unregister_loop(count, 3158 reg->current_index, reg->key.chunk); 3159 3160 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3161 3162 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3163 block->pmr[reg->key.chunk] = NULL; 3164 3165 if (ret != 0) { 3166 perror("rdma unregistration chunk failed"); 3167 ret = -ret; 3168 goto out; 3169 } 3170 3171 rdma->total_registrations--; 3172 3173 trace_qemu_rdma_registration_handle_unregister_success( 3174 reg->key.chunk); 3175 } 3176 3177 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp); 3178 3179 if (ret < 0) { 3180 error_report("Failed to send control buffer"); 3181 goto out; 3182 } 3183 break; 3184 case RDMA_CONTROL_REGISTER_RESULT: 3185 error_report("Invalid RESULT message at dest."); 3186 ret = -EIO; 3187 goto out; 3188 default: 3189 error_report("Unknown control message %s", control_desc[head.type]); 3190 ret = -EIO; 3191 goto out; 3192 } 3193 } while (1); 3194 out: 3195 if (ret < 0) { 3196 rdma->error_state = ret; 3197 } 3198 return ret; 3199 } 3200 3201 /* Destination: 3202 * Called via a ram_control_load_hook during the initial RAM load section which 3203 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks 3204 * on the source. 3205 * We've already built our local RAMBlock list, but not yet sent the list to 3206 * the source. 3207 */ 3208 static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char *name) 3209 { 3210 RDMAContext *rdma = rfile->rdma; 3211 int curr; 3212 int found = -1; 3213 3214 /* Find the matching RAMBlock in our local list */ 3215 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { 3216 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { 3217 found = curr; 3218 break; 3219 } 3220 } 3221 3222 if (found == -1) { 3223 error_report("RAMBlock '%s' not found on destination", name); 3224 return -ENOENT; 3225 } 3226 3227 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index; 3228 trace_rdma_block_notification_handle(name, rdma->next_src_index); 3229 rdma->next_src_index++; 3230 3231 return 0; 3232 } 3233 3234 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) 3235 { 3236 switch (flags) { 3237 case RAM_CONTROL_BLOCK_REG: 3238 return rdma_block_notification_handle(opaque, data); 3239 3240 case RAM_CONTROL_HOOK: 3241 return qemu_rdma_registration_handle(f, opaque); 3242 3243 default: 3244 /* Shouldn't be called with any other values */ 3245 abort(); 3246 } 3247 } 3248 3249 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, 3250 uint64_t flags, void *data) 3251 { 3252 QEMUFileRDMA *rfile = opaque; 3253 RDMAContext *rdma = rfile->rdma; 3254 3255 CHECK_ERROR_STATE(); 3256 3257 trace_qemu_rdma_registration_start(flags); 3258 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3259 qemu_fflush(f); 3260 3261 return 0; 3262 } 3263 3264 /* 3265 * Inform dest that dynamic registrations are done for now. 3266 * First, flush writes, if any. 3267 */ 3268 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, 3269 uint64_t flags, void *data) 3270 { 3271 Error *local_err = NULL, **errp = &local_err; 3272 QEMUFileRDMA *rfile = opaque; 3273 RDMAContext *rdma = rfile->rdma; 3274 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3275 int ret = 0; 3276 3277 CHECK_ERROR_STATE(); 3278 3279 qemu_fflush(f); 3280 ret = qemu_rdma_drain_cq(f, rdma); 3281 3282 if (ret < 0) { 3283 goto err; 3284 } 3285 3286 if (flags == RAM_CONTROL_SETUP) { 3287 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3288 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3289 int reg_result_idx, i, nb_dest_blocks; 3290 3291 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3292 trace_qemu_rdma_registration_stop_ram(); 3293 3294 /* 3295 * Make sure that we parallelize the pinning on both sides. 3296 * For very large guests, doing this serially takes a really 3297 * long time, so we have to 'interleave' the pinning locally 3298 * with the control messages by performing the pinning on this 3299 * side before we receive the control response from the other 3300 * side that the pinning has completed. 3301 */ 3302 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3303 ®_result_idx, rdma->pin_all ? 3304 qemu_rdma_reg_whole_ram_blocks : NULL); 3305 if (ret < 0) { 3306 ERROR(errp, "receiving remote info!"); 3307 return ret; 3308 } 3309 3310 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3311 3312 /* 3313 * The protocol uses two different sets of rkeys (mutually exclusive): 3314 * 1. One key to represent the virtual address of the entire ram block. 3315 * (dynamic chunk registration disabled - pin everything with one rkey.) 3316 * 2. One to represent individual chunks within a ram block. 3317 * (dynamic chunk registration enabled - pin individual chunks.) 3318 * 3319 * Once the capability is successfully negotiated, the destination transmits 3320 * the keys to use (or sends them later) including the virtual addresses 3321 * and then propagates the remote ram block descriptions to his local copy. 3322 */ 3323 3324 if (local->nb_blocks != nb_dest_blocks) { 3325 ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) " 3326 "Your QEMU command line parameters are probably " 3327 "not identical on both the source and destination.", 3328 local->nb_blocks, nb_dest_blocks); 3329 rdma->error_state = -EINVAL; 3330 return -EINVAL; 3331 } 3332 3333 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 3334 memcpy(rdma->dest_blocks, 3335 rdma->wr_data[reg_result_idx].control_curr, resp.len); 3336 for (i = 0; i < nb_dest_blocks; i++) { 3337 network_to_dest_block(&rdma->dest_blocks[i]); 3338 3339 /* We require that the blocks are in the same order */ 3340 if (rdma->dest_blocks[i].length != local->block[i].length) { 3341 ERROR(errp, "Block %s/%d has a different length %" PRIu64 3342 "vs %" PRIu64, local->block[i].block_name, i, 3343 local->block[i].length, 3344 rdma->dest_blocks[i].length); 3345 rdma->error_state = -EINVAL; 3346 return -EINVAL; 3347 } 3348 local->block[i].remote_host_addr = 3349 rdma->dest_blocks[i].remote_host_addr; 3350 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; 3351 } 3352 } 3353 3354 trace_qemu_rdma_registration_stop(flags); 3355 3356 head.type = RDMA_CONTROL_REGISTER_FINISHED; 3357 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); 3358 3359 if (ret < 0) { 3360 goto err; 3361 } 3362 3363 return 0; 3364 err: 3365 rdma->error_state = ret; 3366 return ret; 3367 } 3368 3369 static int qemu_rdma_get_fd(void *opaque) 3370 { 3371 QEMUFileRDMA *rfile = opaque; 3372 RDMAContext *rdma = rfile->rdma; 3373 3374 return rdma->comp_channel->fd; 3375 } 3376 3377 static const QEMUFileOps rdma_read_ops = { 3378 .get_buffer = qemu_rdma_get_buffer, 3379 .get_fd = qemu_rdma_get_fd, 3380 .close = qemu_rdma_close, 3381 .hook_ram_load = rdma_load_hook, 3382 }; 3383 3384 static const QEMUFileOps rdma_write_ops = { 3385 .put_buffer = qemu_rdma_put_buffer, 3386 .close = qemu_rdma_close, 3387 .before_ram_iterate = qemu_rdma_registration_start, 3388 .after_ram_iterate = qemu_rdma_registration_stop, 3389 .save_page = qemu_rdma_save_page, 3390 }; 3391 3392 static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) 3393 { 3394 QEMUFileRDMA *r; 3395 3396 if (qemu_file_mode_is_not_valid(mode)) { 3397 return NULL; 3398 } 3399 3400 r = g_new0(QEMUFileRDMA, 1); 3401 r->rdma = rdma; 3402 3403 if (mode[0] == 'w') { 3404 r->file = qemu_fopen_ops(r, &rdma_write_ops); 3405 } else { 3406 r->file = qemu_fopen_ops(r, &rdma_read_ops); 3407 } 3408 3409 return r->file; 3410 } 3411 3412 static void rdma_accept_incoming_migration(void *opaque) 3413 { 3414 RDMAContext *rdma = opaque; 3415 int ret; 3416 QEMUFile *f; 3417 Error *local_err = NULL, **errp = &local_err; 3418 3419 trace_qemu_rdma_accept_incoming_migration(); 3420 ret = qemu_rdma_accept(rdma); 3421 3422 if (ret) { 3423 ERROR(errp, "RDMA Migration initialization failed!"); 3424 return; 3425 } 3426 3427 trace_qemu_rdma_accept_incoming_migration_accepted(); 3428 3429 f = qemu_fopen_rdma(rdma, "rb"); 3430 if (f == NULL) { 3431 ERROR(errp, "could not qemu_fopen_rdma!"); 3432 qemu_rdma_cleanup(rdma); 3433 return; 3434 } 3435 3436 rdma->migration_started_on_destination = 1; 3437 process_incoming_migration(f); 3438 } 3439 3440 void rdma_start_incoming_migration(const char *host_port, Error **errp) 3441 { 3442 int ret; 3443 RDMAContext *rdma; 3444 Error *local_err = NULL; 3445 3446 trace_rdma_start_incoming_migration(); 3447 rdma = qemu_rdma_data_init(host_port, &local_err); 3448 3449 if (rdma == NULL) { 3450 goto err; 3451 } 3452 3453 ret = qemu_rdma_dest_init(rdma, &local_err); 3454 3455 if (ret) { 3456 goto err; 3457 } 3458 3459 trace_rdma_start_incoming_migration_after_dest_init(); 3460 3461 ret = rdma_listen(rdma->listen_id, 5); 3462 3463 if (ret) { 3464 ERROR(errp, "listening on socket!"); 3465 goto err; 3466 } 3467 3468 trace_rdma_start_incoming_migration_after_rdma_listen(); 3469 3470 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3471 NULL, (void *)(intptr_t)rdma); 3472 return; 3473 err: 3474 error_propagate(errp, local_err); 3475 g_free(rdma); 3476 } 3477 3478 void rdma_start_outgoing_migration(void *opaque, 3479 const char *host_port, Error **errp) 3480 { 3481 MigrationState *s = opaque; 3482 Error *local_err = NULL, **temp = &local_err; 3483 RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err); 3484 int ret = 0; 3485 3486 if (rdma == NULL) { 3487 ERROR(temp, "Failed to initialize RDMA data structures! %d", ret); 3488 goto err; 3489 } 3490 3491 ret = qemu_rdma_source_init(rdma, &local_err, 3492 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]); 3493 3494 if (ret) { 3495 goto err; 3496 } 3497 3498 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 3499 ret = qemu_rdma_connect(rdma, &local_err); 3500 3501 if (ret) { 3502 goto err; 3503 } 3504 3505 trace_rdma_start_outgoing_migration_after_rdma_connect(); 3506 3507 s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); 3508 migrate_fd_connect(s); 3509 return; 3510 err: 3511 error_propagate(errp, local_err); 3512 g_free(rdma); 3513 migrate_fd_error(s); 3514 } 3515