1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * 6 * Authors: 7 * Michael R. Hines <mrhines@us.ibm.com> 8 * Jiuxing Liu <jl@us.ibm.com> 9 * 10 * This work is licensed under the terms of the GNU GPL, version 2 or 11 * later. See the COPYING file in the top-level directory. 12 * 13 */ 14 #include "qemu-common.h" 15 #include "migration/migration.h" 16 #include "migration/qemu-file.h" 17 #include "exec/cpu-common.h" 18 #include "qemu/error-report.h" 19 #include "qemu/main-loop.h" 20 #include "qemu/sockets.h" 21 #include "qemu/bitmap.h" 22 #include "qemu/coroutine.h" 23 #include <stdio.h> 24 #include <sys/types.h> 25 #include <sys/socket.h> 26 #include <netdb.h> 27 #include <arpa/inet.h> 28 #include <string.h> 29 #include <rdma/rdma_cma.h> 30 #include "trace.h" 31 32 /* 33 * Print and error on both the Monitor and the Log file. 34 */ 35 #define ERROR(errp, fmt, ...) \ 36 do { \ 37 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \ 38 if (errp && (*(errp) == NULL)) { \ 39 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \ 40 } \ 41 } while (0) 42 43 #define RDMA_RESOLVE_TIMEOUT_MS 10000 44 45 /* Do not merge data if larger than this. */ 46 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 47 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 48 49 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 50 51 /* 52 * This is only for non-live state being migrated. 53 * Instead of RDMA_WRITE messages, we use RDMA_SEND 54 * messages for that state, which requires a different 55 * delivery design than main memory. 56 */ 57 #define RDMA_SEND_INCREMENT 32768 58 59 /* 60 * Maximum size infiniband SEND message 61 */ 62 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 63 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 64 65 #define RDMA_CONTROL_VERSION_CURRENT 1 66 /* 67 * Capabilities for negotiation. 68 */ 69 #define RDMA_CAPABILITY_PIN_ALL 0x01 70 71 /* 72 * Add the other flags above to this list of known capabilities 73 * as they are introduced. 74 */ 75 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 76 77 #define CHECK_ERROR_STATE() \ 78 do { \ 79 if (rdma->error_state) { \ 80 if (!rdma->error_reported) { \ 81 error_report("RDMA is in an error state waiting migration" \ 82 " to abort!"); \ 83 rdma->error_reported = 1; \ 84 } \ 85 return rdma->error_state; \ 86 } \ 87 } while (0); 88 89 /* 90 * A work request ID is 64-bits and we split up these bits 91 * into 3 parts: 92 * 93 * bits 0-15 : type of control message, 2^16 94 * bits 16-29: ram block index, 2^14 95 * bits 30-63: ram block chunk number, 2^34 96 * 97 * The last two bit ranges are only used for RDMA writes, 98 * in order to track their completion and potentially 99 * also track unregistration status of the message. 100 */ 101 #define RDMA_WRID_TYPE_SHIFT 0UL 102 #define RDMA_WRID_BLOCK_SHIFT 16UL 103 #define RDMA_WRID_CHUNK_SHIFT 30UL 104 105 #define RDMA_WRID_TYPE_MASK \ 106 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 107 108 #define RDMA_WRID_BLOCK_MASK \ 109 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 110 111 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 112 113 /* 114 * RDMA migration protocol: 115 * 1. RDMA Writes (data messages, i.e. RAM) 116 * 2. IB Send/Recv (control channel messages) 117 */ 118 enum { 119 RDMA_WRID_NONE = 0, 120 RDMA_WRID_RDMA_WRITE = 1, 121 RDMA_WRID_SEND_CONTROL = 2000, 122 RDMA_WRID_RECV_CONTROL = 4000, 123 }; 124 125 static const char *wrid_desc[] = { 126 [RDMA_WRID_NONE] = "NONE", 127 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA", 128 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND", 129 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", 130 }; 131 132 /* 133 * Work request IDs for IB SEND messages only (not RDMA writes). 134 * This is used by the migration protocol to transmit 135 * control messages (such as device state and registration commands) 136 * 137 * We could use more WRs, but we have enough for now. 138 */ 139 enum { 140 RDMA_WRID_READY = 0, 141 RDMA_WRID_DATA, 142 RDMA_WRID_CONTROL, 143 RDMA_WRID_MAX, 144 }; 145 146 /* 147 * SEND/RECV IB Control Messages. 148 */ 149 enum { 150 RDMA_CONTROL_NONE = 0, 151 RDMA_CONTROL_ERROR, 152 RDMA_CONTROL_READY, /* ready to receive */ 153 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 154 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 155 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 156 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 157 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 158 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 159 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 160 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 161 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 162 }; 163 164 static const char *control_desc[] = { 165 [RDMA_CONTROL_NONE] = "NONE", 166 [RDMA_CONTROL_ERROR] = "ERROR", 167 [RDMA_CONTROL_READY] = "READY", 168 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 169 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 170 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 171 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 172 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 173 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 174 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 175 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 176 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 177 }; 178 179 /* 180 * Memory and MR structures used to represent an IB Send/Recv work request. 181 * This is *not* used for RDMA writes, only IB Send/Recv. 182 */ 183 typedef struct { 184 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 185 struct ibv_mr *control_mr; /* registration metadata */ 186 size_t control_len; /* length of the message */ 187 uint8_t *control_curr; /* start of unconsumed bytes */ 188 } RDMAWorkRequestData; 189 190 /* 191 * Negotiate RDMA capabilities during connection-setup time. 192 */ 193 typedef struct { 194 uint32_t version; 195 uint32_t flags; 196 } RDMACapabilities; 197 198 static void caps_to_network(RDMACapabilities *cap) 199 { 200 cap->version = htonl(cap->version); 201 cap->flags = htonl(cap->flags); 202 } 203 204 static void network_to_caps(RDMACapabilities *cap) 205 { 206 cap->version = ntohl(cap->version); 207 cap->flags = ntohl(cap->flags); 208 } 209 210 /* 211 * Representation of a RAMBlock from an RDMA perspective. 212 * This is not transmitted, only local. 213 * This and subsequent structures cannot be linked lists 214 * because we're using a single IB message to transmit 215 * the information. It's small anyway, so a list is overkill. 216 */ 217 typedef struct RDMALocalBlock { 218 char *block_name; 219 uint8_t *local_host_addr; /* local virtual address */ 220 uint64_t remote_host_addr; /* remote virtual address */ 221 uint64_t offset; 222 uint64_t length; 223 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 224 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 225 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 226 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 227 int index; /* which block are we */ 228 unsigned int src_index; /* (Only used on dest) */ 229 bool is_ram_block; 230 int nb_chunks; 231 unsigned long *transit_bitmap; 232 unsigned long *unregister_bitmap; 233 } RDMALocalBlock; 234 235 /* 236 * Also represents a RAMblock, but only on the dest. 237 * This gets transmitted by the dest during connection-time 238 * to the source VM and then is used to populate the 239 * corresponding RDMALocalBlock with 240 * the information needed to perform the actual RDMA. 241 */ 242 typedef struct QEMU_PACKED RDMADestBlock { 243 uint64_t remote_host_addr; 244 uint64_t offset; 245 uint64_t length; 246 uint32_t remote_rkey; 247 uint32_t padding; 248 } RDMADestBlock; 249 250 static uint64_t htonll(uint64_t v) 251 { 252 union { uint32_t lv[2]; uint64_t llv; } u; 253 u.lv[0] = htonl(v >> 32); 254 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 255 return u.llv; 256 } 257 258 static uint64_t ntohll(uint64_t v) { 259 union { uint32_t lv[2]; uint64_t llv; } u; 260 u.llv = v; 261 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 262 } 263 264 static void dest_block_to_network(RDMADestBlock *db) 265 { 266 db->remote_host_addr = htonll(db->remote_host_addr); 267 db->offset = htonll(db->offset); 268 db->length = htonll(db->length); 269 db->remote_rkey = htonl(db->remote_rkey); 270 } 271 272 static void network_to_dest_block(RDMADestBlock *db) 273 { 274 db->remote_host_addr = ntohll(db->remote_host_addr); 275 db->offset = ntohll(db->offset); 276 db->length = ntohll(db->length); 277 db->remote_rkey = ntohl(db->remote_rkey); 278 } 279 280 /* 281 * Virtual address of the above structures used for transmitting 282 * the RAMBlock descriptions at connection-time. 283 * This structure is *not* transmitted. 284 */ 285 typedef struct RDMALocalBlocks { 286 int nb_blocks; 287 bool init; /* main memory init complete */ 288 RDMALocalBlock *block; 289 } RDMALocalBlocks; 290 291 /* 292 * Main data structure for RDMA state. 293 * While there is only one copy of this structure being allocated right now, 294 * this is the place where one would start if you wanted to consider 295 * having more than one RDMA connection open at the same time. 296 */ 297 typedef struct RDMAContext { 298 char *host; 299 int port; 300 301 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 302 303 /* 304 * This is used by *_exchange_send() to figure out whether or not 305 * the initial "READY" message has already been received or not. 306 * This is because other functions may potentially poll() and detect 307 * the READY message before send() does, in which case we need to 308 * know if it completed. 309 */ 310 int control_ready_expected; 311 312 /* number of outstanding writes */ 313 int nb_sent; 314 315 /* store info about current buffer so that we can 316 merge it with future sends */ 317 uint64_t current_addr; 318 uint64_t current_length; 319 /* index of ram block the current buffer belongs to */ 320 int current_index; 321 /* index of the chunk in the current ram block */ 322 int current_chunk; 323 324 bool pin_all; 325 326 /* 327 * infiniband-specific variables for opening the device 328 * and maintaining connection state and so forth. 329 * 330 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 331 * cm_id->verbs, cm_id->channel, and cm_id->qp. 332 */ 333 struct rdma_cm_id *cm_id; /* connection manager ID */ 334 struct rdma_cm_id *listen_id; 335 bool connected; 336 337 struct ibv_context *verbs; 338 struct rdma_event_channel *channel; 339 struct ibv_qp *qp; /* queue pair */ 340 struct ibv_comp_channel *comp_channel; /* completion channel */ 341 struct ibv_pd *pd; /* protection domain */ 342 struct ibv_cq *cq; /* completion queue */ 343 344 /* 345 * If a previous write failed (perhaps because of a failed 346 * memory registration, then do not attempt any future work 347 * and remember the error state. 348 */ 349 int error_state; 350 int error_reported; 351 352 /* 353 * Description of ram blocks used throughout the code. 354 */ 355 RDMALocalBlocks local_ram_blocks; 356 RDMADestBlock *dest_blocks; 357 358 /* Index of the next RAMBlock received during block registration */ 359 unsigned int next_src_index; 360 361 /* 362 * Migration on *destination* started. 363 * Then use coroutine yield function. 364 * Source runs in a thread, so we don't care. 365 */ 366 int migration_started_on_destination; 367 368 int total_registrations; 369 int total_writes; 370 371 int unregister_current, unregister_next; 372 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 373 374 GHashTable *blockmap; 375 } RDMAContext; 376 377 /* 378 * Interface to the rest of the migration call stack. 379 */ 380 typedef struct QEMUFileRDMA { 381 RDMAContext *rdma; 382 size_t len; 383 void *file; 384 } QEMUFileRDMA; 385 386 /* 387 * Main structure for IB Send/Recv control messages. 388 * This gets prepended at the beginning of every Send/Recv. 389 */ 390 typedef struct QEMU_PACKED { 391 uint32_t len; /* Total length of data portion */ 392 uint32_t type; /* which control command to perform */ 393 uint32_t repeat; /* number of commands in data portion of same type */ 394 uint32_t padding; 395 } RDMAControlHeader; 396 397 static void control_to_network(RDMAControlHeader *control) 398 { 399 control->type = htonl(control->type); 400 control->len = htonl(control->len); 401 control->repeat = htonl(control->repeat); 402 } 403 404 static void network_to_control(RDMAControlHeader *control) 405 { 406 control->type = ntohl(control->type); 407 control->len = ntohl(control->len); 408 control->repeat = ntohl(control->repeat); 409 } 410 411 /* 412 * Register a single Chunk. 413 * Information sent by the source VM to inform the dest 414 * to register an single chunk of memory before we can perform 415 * the actual RDMA operation. 416 */ 417 typedef struct QEMU_PACKED { 418 union QEMU_PACKED { 419 uint64_t current_addr; /* offset into the ram_addr_t space */ 420 uint64_t chunk; /* chunk to lookup if unregistering */ 421 } key; 422 uint32_t current_index; /* which ramblock the chunk belongs to */ 423 uint32_t padding; 424 uint64_t chunks; /* how many sequential chunks to register */ 425 } RDMARegister; 426 427 static void register_to_network(RDMAContext *rdma, RDMARegister *reg) 428 { 429 RDMALocalBlock *local_block; 430 local_block = &rdma->local_ram_blocks.block[reg->current_index]; 431 432 if (local_block->is_ram_block) { 433 /* 434 * current_addr as passed in is an address in the local ram_addr_t 435 * space, we need to translate this for the destination 436 */ 437 reg->key.current_addr -= local_block->offset; 438 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset; 439 } 440 reg->key.current_addr = htonll(reg->key.current_addr); 441 reg->current_index = htonl(reg->current_index); 442 reg->chunks = htonll(reg->chunks); 443 } 444 445 static void network_to_register(RDMARegister *reg) 446 { 447 reg->key.current_addr = ntohll(reg->key.current_addr); 448 reg->current_index = ntohl(reg->current_index); 449 reg->chunks = ntohll(reg->chunks); 450 } 451 452 typedef struct QEMU_PACKED { 453 uint32_t value; /* if zero, we will madvise() */ 454 uint32_t block_idx; /* which ram block index */ 455 uint64_t offset; /* Address in remote ram_addr_t space */ 456 uint64_t length; /* length of the chunk */ 457 } RDMACompress; 458 459 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp) 460 { 461 comp->value = htonl(comp->value); 462 /* 463 * comp->offset as passed in is an address in the local ram_addr_t 464 * space, we need to translate this for the destination 465 */ 466 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset; 467 comp->offset += rdma->dest_blocks[comp->block_idx].offset; 468 comp->block_idx = htonl(comp->block_idx); 469 comp->offset = htonll(comp->offset); 470 comp->length = htonll(comp->length); 471 } 472 473 static void network_to_compress(RDMACompress *comp) 474 { 475 comp->value = ntohl(comp->value); 476 comp->block_idx = ntohl(comp->block_idx); 477 comp->offset = ntohll(comp->offset); 478 comp->length = ntohll(comp->length); 479 } 480 481 /* 482 * The result of the dest's memory registration produces an "rkey" 483 * which the source VM must reference in order to perform 484 * the RDMA operation. 485 */ 486 typedef struct QEMU_PACKED { 487 uint32_t rkey; 488 uint32_t padding; 489 uint64_t host_addr; 490 } RDMARegisterResult; 491 492 static void result_to_network(RDMARegisterResult *result) 493 { 494 result->rkey = htonl(result->rkey); 495 result->host_addr = htonll(result->host_addr); 496 }; 497 498 static void network_to_result(RDMARegisterResult *result) 499 { 500 result->rkey = ntohl(result->rkey); 501 result->host_addr = ntohll(result->host_addr); 502 }; 503 504 const char *print_wrid(int wrid); 505 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 506 uint8_t *data, RDMAControlHeader *resp, 507 int *resp_idx, 508 int (*callback)(RDMAContext *rdma)); 509 510 static inline uint64_t ram_chunk_index(const uint8_t *start, 511 const uint8_t *host) 512 { 513 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 514 } 515 516 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 517 uint64_t i) 518 { 519 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 520 (i << RDMA_REG_CHUNK_SHIFT)); 521 } 522 523 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 524 uint64_t i) 525 { 526 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 527 (1UL << RDMA_REG_CHUNK_SHIFT); 528 529 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 530 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 531 } 532 533 return result; 534 } 535 536 static int rdma_add_block(RDMAContext *rdma, const char *block_name, 537 void *host_addr, 538 ram_addr_t block_offset, uint64_t length) 539 { 540 RDMALocalBlocks *local = &rdma->local_ram_blocks; 541 RDMALocalBlock *block; 542 RDMALocalBlock *old = local->block; 543 544 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1); 545 546 if (local->nb_blocks) { 547 int x; 548 549 if (rdma->blockmap) { 550 for (x = 0; x < local->nb_blocks; x++) { 551 g_hash_table_remove(rdma->blockmap, 552 (void *)(uintptr_t)old[x].offset); 553 g_hash_table_insert(rdma->blockmap, 554 (void *)(uintptr_t)old[x].offset, 555 &local->block[x]); 556 } 557 } 558 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 559 g_free(old); 560 } 561 562 block = &local->block[local->nb_blocks]; 563 564 block->block_name = g_strdup(block_name); 565 block->local_host_addr = host_addr; 566 block->offset = block_offset; 567 block->length = length; 568 block->index = local->nb_blocks; 569 block->src_index = ~0U; /* Filled in by the receipt of the block list */ 570 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 571 block->transit_bitmap = bitmap_new(block->nb_chunks); 572 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 573 block->unregister_bitmap = bitmap_new(block->nb_chunks); 574 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 575 block->remote_keys = g_new0(uint32_t, block->nb_chunks); 576 577 block->is_ram_block = local->init ? false : true; 578 579 if (rdma->blockmap) { 580 g_hash_table_insert(rdma->blockmap, (void *) block_offset, block); 581 } 582 583 trace_rdma_add_block(block_name, local->nb_blocks, 584 (uintptr_t) block->local_host_addr, 585 block->offset, block->length, 586 (uintptr_t) (block->local_host_addr + block->length), 587 BITS_TO_LONGS(block->nb_chunks) * 588 sizeof(unsigned long) * 8, 589 block->nb_chunks); 590 591 local->nb_blocks++; 592 593 return 0; 594 } 595 596 /* 597 * Memory regions need to be registered with the device and queue pairs setup 598 * in advanced before the migration starts. This tells us where the RAM blocks 599 * are so that we can register them individually. 600 */ 601 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr, 602 ram_addr_t block_offset, ram_addr_t length, void *opaque) 603 { 604 return rdma_add_block(opaque, block_name, host_addr, block_offset, length); 605 } 606 607 /* 608 * Identify the RAMBlocks and their quantity. They will be references to 609 * identify chunk boundaries inside each RAMBlock and also be referenced 610 * during dynamic page registration. 611 */ 612 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma) 613 { 614 RDMALocalBlocks *local = &rdma->local_ram_blocks; 615 616 assert(rdma->blockmap == NULL); 617 memset(local, 0, sizeof *local); 618 qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma); 619 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 620 rdma->dest_blocks = g_new0(RDMADestBlock, 621 rdma->local_ram_blocks.nb_blocks); 622 local->init = true; 623 return 0; 624 } 625 626 /* 627 * Note: If used outside of cleanup, the caller must ensure that the destination 628 * block structures are also updated 629 */ 630 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) 631 { 632 RDMALocalBlocks *local = &rdma->local_ram_blocks; 633 RDMALocalBlock *old = local->block; 634 int x; 635 636 if (rdma->blockmap) { 637 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset); 638 } 639 if (block->pmr) { 640 int j; 641 642 for (j = 0; j < block->nb_chunks; j++) { 643 if (!block->pmr[j]) { 644 continue; 645 } 646 ibv_dereg_mr(block->pmr[j]); 647 rdma->total_registrations--; 648 } 649 g_free(block->pmr); 650 block->pmr = NULL; 651 } 652 653 if (block->mr) { 654 ibv_dereg_mr(block->mr); 655 rdma->total_registrations--; 656 block->mr = NULL; 657 } 658 659 g_free(block->transit_bitmap); 660 block->transit_bitmap = NULL; 661 662 g_free(block->unregister_bitmap); 663 block->unregister_bitmap = NULL; 664 665 g_free(block->remote_keys); 666 block->remote_keys = NULL; 667 668 g_free(block->block_name); 669 block->block_name = NULL; 670 671 if (rdma->blockmap) { 672 for (x = 0; x < local->nb_blocks; x++) { 673 g_hash_table_remove(rdma->blockmap, 674 (void *)(uintptr_t)old[x].offset); 675 } 676 } 677 678 if (local->nb_blocks > 1) { 679 680 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1); 681 682 if (block->index) { 683 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 684 } 685 686 if (block->index < (local->nb_blocks - 1)) { 687 memcpy(local->block + block->index, old + (block->index + 1), 688 sizeof(RDMALocalBlock) * 689 (local->nb_blocks - (block->index + 1))); 690 } 691 } else { 692 assert(block == local->block); 693 local->block = NULL; 694 } 695 696 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr, 697 block->offset, block->length, 698 (uintptr_t)(block->local_host_addr + block->length), 699 BITS_TO_LONGS(block->nb_chunks) * 700 sizeof(unsigned long) * 8, block->nb_chunks); 701 702 g_free(old); 703 704 local->nb_blocks--; 705 706 if (local->nb_blocks && rdma->blockmap) { 707 for (x = 0; x < local->nb_blocks; x++) { 708 g_hash_table_insert(rdma->blockmap, 709 (void *)(uintptr_t)local->block[x].offset, 710 &local->block[x]); 711 } 712 } 713 714 return 0; 715 } 716 717 /* 718 * Put in the log file which RDMA device was opened and the details 719 * associated with that device. 720 */ 721 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 722 { 723 struct ibv_port_attr port; 724 725 if (ibv_query_port(verbs, 1, &port)) { 726 error_report("Failed to query port information"); 727 return; 728 } 729 730 printf("%s RDMA Device opened: kernel name %s " 731 "uverbs device name %s, " 732 "infiniband_verbs class device path %s, " 733 "infiniband class device path %s, " 734 "transport: (%d) %s\n", 735 who, 736 verbs->device->name, 737 verbs->device->dev_name, 738 verbs->device->dev_path, 739 verbs->device->ibdev_path, 740 port.link_layer, 741 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" : 742 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 743 ? "Ethernet" : "Unknown")); 744 } 745 746 /* 747 * Put in the log file the RDMA gid addressing information, 748 * useful for folks who have trouble understanding the 749 * RDMA device hierarchy in the kernel. 750 */ 751 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 752 { 753 char sgid[33]; 754 char dgid[33]; 755 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 756 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 757 trace_qemu_rdma_dump_gid(who, sgid, dgid); 758 } 759 760 /* 761 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 762 * We will try the next addrinfo struct, and fail if there are 763 * no other valid addresses to bind against. 764 * 765 * If user is listening on '[::]', then we will not have a opened a device 766 * yet and have no way of verifying if the device is RoCE or not. 767 * 768 * In this case, the source VM will throw an error for ALL types of 769 * connections (both IPv4 and IPv6) if the destination machine does not have 770 * a regular infiniband network available for use. 771 * 772 * The only way to guarantee that an error is thrown for broken kernels is 773 * for the management software to choose a *specific* interface at bind time 774 * and validate what time of hardware it is. 775 * 776 * Unfortunately, this puts the user in a fix: 777 * 778 * If the source VM connects with an IPv4 address without knowing that the 779 * destination has bound to '[::]' the migration will unconditionally fail 780 * unless the management software is explicitly listening on the IPv4 781 * address while using a RoCE-based device. 782 * 783 * If the source VM connects with an IPv6 address, then we're OK because we can 784 * throw an error on the source (and similarly on the destination). 785 * 786 * But in mixed environments, this will be broken for a while until it is fixed 787 * inside linux. 788 * 789 * We do provide a *tiny* bit of help in this function: We can list all of the 790 * devices in the system and check to see if all the devices are RoCE or 791 * Infiniband. 792 * 793 * If we detect that we have a *pure* RoCE environment, then we can safely 794 * thrown an error even if the management software has specified '[::]' as the 795 * bind address. 796 * 797 * However, if there is are multiple hetergeneous devices, then we cannot make 798 * this assumption and the user just has to be sure they know what they are 799 * doing. 800 * 801 * Patches are being reviewed on linux-rdma. 802 */ 803 static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs) 804 { 805 struct ibv_port_attr port_attr; 806 807 /* This bug only exists in linux, to our knowledge. */ 808 #ifdef CONFIG_LINUX 809 810 /* 811 * Verbs are only NULL if management has bound to '[::]'. 812 * 813 * Let's iterate through all the devices and see if there any pure IB 814 * devices (non-ethernet). 815 * 816 * If not, then we can safely proceed with the migration. 817 * Otherwise, there are no guarantees until the bug is fixed in linux. 818 */ 819 if (!verbs) { 820 int num_devices, x; 821 struct ibv_device ** dev_list = ibv_get_device_list(&num_devices); 822 bool roce_found = false; 823 bool ib_found = false; 824 825 for (x = 0; x < num_devices; x++) { 826 verbs = ibv_open_device(dev_list[x]); 827 if (!verbs) { 828 if (errno == EPERM) { 829 continue; 830 } else { 831 return -EINVAL; 832 } 833 } 834 835 if (ibv_query_port(verbs, 1, &port_attr)) { 836 ibv_close_device(verbs); 837 ERROR(errp, "Could not query initial IB port"); 838 return -EINVAL; 839 } 840 841 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 842 ib_found = true; 843 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 844 roce_found = true; 845 } 846 847 ibv_close_device(verbs); 848 849 } 850 851 if (roce_found) { 852 if (ib_found) { 853 fprintf(stderr, "WARN: migrations may fail:" 854 " IPv6 over RoCE / iWARP in linux" 855 " is broken. But since you appear to have a" 856 " mixed RoCE / IB environment, be sure to only" 857 " migrate over the IB fabric until the kernel " 858 " fixes the bug.\n"); 859 } else { 860 ERROR(errp, "You only have RoCE / iWARP devices in your systems" 861 " and your management software has specified '[::]'" 862 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 863 return -ENONET; 864 } 865 } 866 867 return 0; 868 } 869 870 /* 871 * If we have a verbs context, that means that some other than '[::]' was 872 * used by the management software for binding. In which case we can 873 * actually warn the user about a potentially broken kernel. 874 */ 875 876 /* IB ports start with 1, not 0 */ 877 if (ibv_query_port(verbs, 1, &port_attr)) { 878 ERROR(errp, "Could not query initial IB port"); 879 return -EINVAL; 880 } 881 882 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 883 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 " 884 "(but patches on linux-rdma in progress)"); 885 return -ENONET; 886 } 887 888 #endif 889 890 return 0; 891 } 892 893 /* 894 * Figure out which RDMA device corresponds to the requested IP hostname 895 * Also create the initial connection manager identifiers for opening 896 * the connection. 897 */ 898 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 899 { 900 int ret; 901 struct rdma_addrinfo *res; 902 char port_str[16]; 903 struct rdma_cm_event *cm_event; 904 char ip[40] = "unknown"; 905 struct rdma_addrinfo *e; 906 907 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 908 ERROR(errp, "RDMA hostname has not been set"); 909 return -EINVAL; 910 } 911 912 /* create CM channel */ 913 rdma->channel = rdma_create_event_channel(); 914 if (!rdma->channel) { 915 ERROR(errp, "could not create CM channel"); 916 return -EINVAL; 917 } 918 919 /* create CM id */ 920 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 921 if (ret) { 922 ERROR(errp, "could not create channel id"); 923 goto err_resolve_create_id; 924 } 925 926 snprintf(port_str, 16, "%d", rdma->port); 927 port_str[15] = '\0'; 928 929 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 930 if (ret < 0) { 931 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 932 goto err_resolve_get_addr; 933 } 934 935 for (e = res; e != NULL; e = e->ai_next) { 936 inet_ntop(e->ai_family, 937 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 938 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 939 940 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 941 RDMA_RESOLVE_TIMEOUT_MS); 942 if (!ret) { 943 if (e->ai_family == AF_INET6) { 944 ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs); 945 if (ret) { 946 continue; 947 } 948 } 949 goto route; 950 } 951 } 952 953 ERROR(errp, "could not resolve address %s", rdma->host); 954 goto err_resolve_get_addr; 955 956 route: 957 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 958 959 ret = rdma_get_cm_event(rdma->channel, &cm_event); 960 if (ret) { 961 ERROR(errp, "could not perform event_addr_resolved"); 962 goto err_resolve_get_addr; 963 } 964 965 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 966 ERROR(errp, "result not equal to event_addr_resolved %s", 967 rdma_event_str(cm_event->event)); 968 perror("rdma_resolve_addr"); 969 rdma_ack_cm_event(cm_event); 970 ret = -EINVAL; 971 goto err_resolve_get_addr; 972 } 973 rdma_ack_cm_event(cm_event); 974 975 /* resolve route */ 976 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 977 if (ret) { 978 ERROR(errp, "could not resolve rdma route"); 979 goto err_resolve_get_addr; 980 } 981 982 ret = rdma_get_cm_event(rdma->channel, &cm_event); 983 if (ret) { 984 ERROR(errp, "could not perform event_route_resolved"); 985 goto err_resolve_get_addr; 986 } 987 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 988 ERROR(errp, "result not equal to event_route_resolved: %s", 989 rdma_event_str(cm_event->event)); 990 rdma_ack_cm_event(cm_event); 991 ret = -EINVAL; 992 goto err_resolve_get_addr; 993 } 994 rdma_ack_cm_event(cm_event); 995 rdma->verbs = rdma->cm_id->verbs; 996 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 997 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 998 return 0; 999 1000 err_resolve_get_addr: 1001 rdma_destroy_id(rdma->cm_id); 1002 rdma->cm_id = NULL; 1003 err_resolve_create_id: 1004 rdma_destroy_event_channel(rdma->channel); 1005 rdma->channel = NULL; 1006 return ret; 1007 } 1008 1009 /* 1010 * Create protection domain and completion queues 1011 */ 1012 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma) 1013 { 1014 /* allocate pd */ 1015 rdma->pd = ibv_alloc_pd(rdma->verbs); 1016 if (!rdma->pd) { 1017 error_report("failed to allocate protection domain"); 1018 return -1; 1019 } 1020 1021 /* create completion channel */ 1022 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs); 1023 if (!rdma->comp_channel) { 1024 error_report("failed to allocate completion channel"); 1025 goto err_alloc_pd_cq; 1026 } 1027 1028 /* 1029 * Completion queue can be filled by both read and write work requests, 1030 * so must reflect the sum of both possible queue sizes. 1031 */ 1032 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1033 NULL, rdma->comp_channel, 0); 1034 if (!rdma->cq) { 1035 error_report("failed to allocate completion queue"); 1036 goto err_alloc_pd_cq; 1037 } 1038 1039 return 0; 1040 1041 err_alloc_pd_cq: 1042 if (rdma->pd) { 1043 ibv_dealloc_pd(rdma->pd); 1044 } 1045 if (rdma->comp_channel) { 1046 ibv_destroy_comp_channel(rdma->comp_channel); 1047 } 1048 rdma->pd = NULL; 1049 rdma->comp_channel = NULL; 1050 return -1; 1051 1052 } 1053 1054 /* 1055 * Create queue pairs. 1056 */ 1057 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1058 { 1059 struct ibv_qp_init_attr attr = { 0 }; 1060 int ret; 1061 1062 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1063 attr.cap.max_recv_wr = 3; 1064 attr.cap.max_send_sge = 1; 1065 attr.cap.max_recv_sge = 1; 1066 attr.send_cq = rdma->cq; 1067 attr.recv_cq = rdma->cq; 1068 attr.qp_type = IBV_QPT_RC; 1069 1070 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr); 1071 if (ret) { 1072 return -1; 1073 } 1074 1075 rdma->qp = rdma->cm_id->qp; 1076 return 0; 1077 } 1078 1079 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma) 1080 { 1081 int i; 1082 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1083 1084 for (i = 0; i < local->nb_blocks; i++) { 1085 local->block[i].mr = 1086 ibv_reg_mr(rdma->pd, 1087 local->block[i].local_host_addr, 1088 local->block[i].length, 1089 IBV_ACCESS_LOCAL_WRITE | 1090 IBV_ACCESS_REMOTE_WRITE 1091 ); 1092 if (!local->block[i].mr) { 1093 perror("Failed to register local dest ram block!\n"); 1094 break; 1095 } 1096 rdma->total_registrations++; 1097 } 1098 1099 if (i >= local->nb_blocks) { 1100 return 0; 1101 } 1102 1103 for (i--; i >= 0; i--) { 1104 ibv_dereg_mr(local->block[i].mr); 1105 rdma->total_registrations--; 1106 } 1107 1108 return -1; 1109 1110 } 1111 1112 /* 1113 * Find the ram block that corresponds to the page requested to be 1114 * transmitted by QEMU. 1115 * 1116 * Once the block is found, also identify which 'chunk' within that 1117 * block that the page belongs to. 1118 * 1119 * This search cannot fail or the migration will fail. 1120 */ 1121 static int qemu_rdma_search_ram_block(RDMAContext *rdma, 1122 uintptr_t block_offset, 1123 uint64_t offset, 1124 uint64_t length, 1125 uint64_t *block_index, 1126 uint64_t *chunk_index) 1127 { 1128 uint64_t current_addr = block_offset + offset; 1129 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1130 (void *) block_offset); 1131 assert(block); 1132 assert(current_addr >= block->offset); 1133 assert((current_addr + length) <= (block->offset + block->length)); 1134 1135 *block_index = block->index; 1136 *chunk_index = ram_chunk_index(block->local_host_addr, 1137 block->local_host_addr + (current_addr - block->offset)); 1138 1139 return 0; 1140 } 1141 1142 /* 1143 * Register a chunk with IB. If the chunk was already registered 1144 * previously, then skip. 1145 * 1146 * Also return the keys associated with the registration needed 1147 * to perform the actual RDMA operation. 1148 */ 1149 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1150 RDMALocalBlock *block, uintptr_t host_addr, 1151 uint32_t *lkey, uint32_t *rkey, int chunk, 1152 uint8_t *chunk_start, uint8_t *chunk_end) 1153 { 1154 if (block->mr) { 1155 if (lkey) { 1156 *lkey = block->mr->lkey; 1157 } 1158 if (rkey) { 1159 *rkey = block->mr->rkey; 1160 } 1161 return 0; 1162 } 1163 1164 /* allocate memory to store chunk MRs */ 1165 if (!block->pmr) { 1166 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks); 1167 } 1168 1169 /* 1170 * If 'rkey', then we're the destination, so grant access to the source. 1171 * 1172 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1173 */ 1174 if (!block->pmr[chunk]) { 1175 uint64_t len = chunk_end - chunk_start; 1176 1177 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1178 1179 block->pmr[chunk] = ibv_reg_mr(rdma->pd, 1180 chunk_start, len, 1181 (rkey ? (IBV_ACCESS_LOCAL_WRITE | 1182 IBV_ACCESS_REMOTE_WRITE) : 0)); 1183 1184 if (!block->pmr[chunk]) { 1185 perror("Failed to register chunk!"); 1186 fprintf(stderr, "Chunk details: block: %d chunk index %d" 1187 " start %" PRIuPTR " end %" PRIuPTR 1188 " host %" PRIuPTR 1189 " local %" PRIuPTR " registrations: %d\n", 1190 block->index, chunk, (uintptr_t)chunk_start, 1191 (uintptr_t)chunk_end, host_addr, 1192 (uintptr_t)block->local_host_addr, 1193 rdma->total_registrations); 1194 return -1; 1195 } 1196 rdma->total_registrations++; 1197 } 1198 1199 if (lkey) { 1200 *lkey = block->pmr[chunk]->lkey; 1201 } 1202 if (rkey) { 1203 *rkey = block->pmr[chunk]->rkey; 1204 } 1205 return 0; 1206 } 1207 1208 /* 1209 * Register (at connection time) the memory used for control 1210 * channel messages. 1211 */ 1212 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1213 { 1214 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1215 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1216 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1217 if (rdma->wr_data[idx].control_mr) { 1218 rdma->total_registrations++; 1219 return 0; 1220 } 1221 error_report("qemu_rdma_reg_control failed"); 1222 return -1; 1223 } 1224 1225 const char *print_wrid(int wrid) 1226 { 1227 if (wrid >= RDMA_WRID_RECV_CONTROL) { 1228 return wrid_desc[RDMA_WRID_RECV_CONTROL]; 1229 } 1230 return wrid_desc[wrid]; 1231 } 1232 1233 /* 1234 * RDMA requires memory registration (mlock/pinning), but this is not good for 1235 * overcommitment. 1236 * 1237 * In preparation for the future where LRU information or workload-specific 1238 * writable writable working set memory access behavior is available to QEMU 1239 * it would be nice to have in place the ability to UN-register/UN-pin 1240 * particular memory regions from the RDMA hardware when it is determine that 1241 * those regions of memory will likely not be accessed again in the near future. 1242 * 1243 * While we do not yet have such information right now, the following 1244 * compile-time option allows us to perform a non-optimized version of this 1245 * behavior. 1246 * 1247 * By uncommenting this option, you will cause *all* RDMA transfers to be 1248 * unregistered immediately after the transfer completes on both sides of the 1249 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode. 1250 * 1251 * This will have a terrible impact on migration performance, so until future 1252 * workload information or LRU information is available, do not attempt to use 1253 * this feature except for basic testing. 1254 */ 1255 //#define RDMA_UNREGISTRATION_EXAMPLE 1256 1257 /* 1258 * Perform a non-optimized memory unregistration after every transfer 1259 * for demonstration purposes, only if pin-all is not requested. 1260 * 1261 * Potential optimizations: 1262 * 1. Start a new thread to run this function continuously 1263 - for bit clearing 1264 - and for receipt of unregister messages 1265 * 2. Use an LRU. 1266 * 3. Use workload hints. 1267 */ 1268 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1269 { 1270 while (rdma->unregistrations[rdma->unregister_current]) { 1271 int ret; 1272 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1273 uint64_t chunk = 1274 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1275 uint64_t index = 1276 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1277 RDMALocalBlock *block = 1278 &(rdma->local_ram_blocks.block[index]); 1279 RDMARegister reg = { .current_index = index }; 1280 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1281 }; 1282 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1283 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1284 .repeat = 1, 1285 }; 1286 1287 trace_qemu_rdma_unregister_waiting_proc(chunk, 1288 rdma->unregister_current); 1289 1290 rdma->unregistrations[rdma->unregister_current] = 0; 1291 rdma->unregister_current++; 1292 1293 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1294 rdma->unregister_current = 0; 1295 } 1296 1297 1298 /* 1299 * Unregistration is speculative (because migration is single-threaded 1300 * and we cannot break the protocol's inifinband message ordering). 1301 * Thus, if the memory is currently being used for transmission, 1302 * then abort the attempt to unregister and try again 1303 * later the next time a completion is received for this memory. 1304 */ 1305 clear_bit(chunk, block->unregister_bitmap); 1306 1307 if (test_bit(chunk, block->transit_bitmap)) { 1308 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1309 continue; 1310 } 1311 1312 trace_qemu_rdma_unregister_waiting_send(chunk); 1313 1314 ret = ibv_dereg_mr(block->pmr[chunk]); 1315 block->pmr[chunk] = NULL; 1316 block->remote_keys[chunk] = 0; 1317 1318 if (ret != 0) { 1319 perror("unregistration chunk failed"); 1320 return -ret; 1321 } 1322 rdma->total_registrations--; 1323 1324 reg.key.chunk = chunk; 1325 register_to_network(rdma, ®); 1326 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1327 &resp, NULL, NULL); 1328 if (ret < 0) { 1329 return ret; 1330 } 1331 1332 trace_qemu_rdma_unregister_waiting_complete(chunk); 1333 } 1334 1335 return 0; 1336 } 1337 1338 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1339 uint64_t chunk) 1340 { 1341 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1342 1343 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1344 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1345 1346 return result; 1347 } 1348 1349 /* 1350 * Set bit for unregistration in the next iteration. 1351 * We cannot transmit right here, but will unpin later. 1352 */ 1353 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index, 1354 uint64_t chunk, uint64_t wr_id) 1355 { 1356 if (rdma->unregistrations[rdma->unregister_next] != 0) { 1357 error_report("rdma migration: queue is full"); 1358 } else { 1359 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1360 1361 if (!test_and_set_bit(chunk, block->unregister_bitmap)) { 1362 trace_qemu_rdma_signal_unregister_append(chunk, 1363 rdma->unregister_next); 1364 1365 rdma->unregistrations[rdma->unregister_next++] = 1366 qemu_rdma_make_wrid(wr_id, index, chunk); 1367 1368 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) { 1369 rdma->unregister_next = 0; 1370 } 1371 } else { 1372 trace_qemu_rdma_signal_unregister_already(chunk); 1373 } 1374 } 1375 } 1376 1377 /* 1378 * Consult the connection manager to see a work request 1379 * (of any kind) has completed. 1380 * Return the work request ID that completed. 1381 */ 1382 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out, 1383 uint32_t *byte_len) 1384 { 1385 int ret; 1386 struct ibv_wc wc; 1387 uint64_t wr_id; 1388 1389 ret = ibv_poll_cq(rdma->cq, 1, &wc); 1390 1391 if (!ret) { 1392 *wr_id_out = RDMA_WRID_NONE; 1393 return 0; 1394 } 1395 1396 if (ret < 0) { 1397 error_report("ibv_poll_cq return %d", ret); 1398 return ret; 1399 } 1400 1401 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1402 1403 if (wc.status != IBV_WC_SUCCESS) { 1404 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n", 1405 wc.status, ibv_wc_status_str(wc.status)); 1406 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]); 1407 1408 return -1; 1409 } 1410 1411 if (rdma->control_ready_expected && 1412 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1413 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL], 1414 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent); 1415 rdma->control_ready_expected = 0; 1416 } 1417 1418 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1419 uint64_t chunk = 1420 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1421 uint64_t index = 1422 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1423 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1424 1425 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent, 1426 index, chunk, block->local_host_addr, 1427 (void *)(uintptr_t)block->remote_host_addr); 1428 1429 clear_bit(chunk, block->transit_bitmap); 1430 1431 if (rdma->nb_sent > 0) { 1432 rdma->nb_sent--; 1433 } 1434 1435 if (!rdma->pin_all) { 1436 /* 1437 * FYI: If one wanted to signal a specific chunk to be unregistered 1438 * using LRU or workload-specific information, this is the function 1439 * you would call to do so. That chunk would then get asynchronously 1440 * unregistered later. 1441 */ 1442 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1443 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id); 1444 #endif 1445 } 1446 } else { 1447 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent); 1448 } 1449 1450 *wr_id_out = wc.wr_id; 1451 if (byte_len) { 1452 *byte_len = wc.byte_len; 1453 } 1454 1455 return 0; 1456 } 1457 1458 /* 1459 * Block until the next work request has completed. 1460 * 1461 * First poll to see if a work request has already completed, 1462 * otherwise block. 1463 * 1464 * If we encounter completed work requests for IDs other than 1465 * the one we're interested in, then that's generally an error. 1466 * 1467 * The only exception is actual RDMA Write completions. These 1468 * completions only need to be recorded, but do not actually 1469 * need further processing. 1470 */ 1471 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested, 1472 uint32_t *byte_len) 1473 { 1474 int num_cq_events = 0, ret = 0; 1475 struct ibv_cq *cq; 1476 void *cq_ctx; 1477 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1478 1479 if (ibv_req_notify_cq(rdma->cq, 0)) { 1480 return -1; 1481 } 1482 /* poll cq first */ 1483 while (wr_id != wrid_requested) { 1484 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1485 if (ret < 0) { 1486 return ret; 1487 } 1488 1489 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1490 1491 if (wr_id == RDMA_WRID_NONE) { 1492 break; 1493 } 1494 if (wr_id != wrid_requested) { 1495 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1496 wrid_requested, print_wrid(wr_id), wr_id); 1497 } 1498 } 1499 1500 if (wr_id == wrid_requested) { 1501 return 0; 1502 } 1503 1504 while (1) { 1505 /* 1506 * Coroutine doesn't start until process_incoming_migration() 1507 * so don't yield unless we know we're running inside of a coroutine. 1508 */ 1509 if (rdma->migration_started_on_destination) { 1510 yield_until_fd_readable(rdma->comp_channel->fd); 1511 } 1512 1513 if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) { 1514 perror("ibv_get_cq_event"); 1515 goto err_block_for_wrid; 1516 } 1517 1518 num_cq_events++; 1519 1520 if (ibv_req_notify_cq(cq, 0)) { 1521 goto err_block_for_wrid; 1522 } 1523 1524 while (wr_id != wrid_requested) { 1525 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1526 if (ret < 0) { 1527 goto err_block_for_wrid; 1528 } 1529 1530 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1531 1532 if (wr_id == RDMA_WRID_NONE) { 1533 break; 1534 } 1535 if (wr_id != wrid_requested) { 1536 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1537 wrid_requested, print_wrid(wr_id), wr_id); 1538 } 1539 } 1540 1541 if (wr_id == wrid_requested) { 1542 goto success_block_for_wrid; 1543 } 1544 } 1545 1546 success_block_for_wrid: 1547 if (num_cq_events) { 1548 ibv_ack_cq_events(cq, num_cq_events); 1549 } 1550 return 0; 1551 1552 err_block_for_wrid: 1553 if (num_cq_events) { 1554 ibv_ack_cq_events(cq, num_cq_events); 1555 } 1556 return ret; 1557 } 1558 1559 /* 1560 * Post a SEND message work request for the control channel 1561 * containing some data and block until the post completes. 1562 */ 1563 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1564 RDMAControlHeader *head) 1565 { 1566 int ret = 0; 1567 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1568 struct ibv_send_wr *bad_wr; 1569 struct ibv_sge sge = { 1570 .addr = (uintptr_t)(wr->control), 1571 .length = head->len + sizeof(RDMAControlHeader), 1572 .lkey = wr->control_mr->lkey, 1573 }; 1574 struct ibv_send_wr send_wr = { 1575 .wr_id = RDMA_WRID_SEND_CONTROL, 1576 .opcode = IBV_WR_SEND, 1577 .send_flags = IBV_SEND_SIGNALED, 1578 .sg_list = &sge, 1579 .num_sge = 1, 1580 }; 1581 1582 trace_qemu_rdma_post_send_control(control_desc[head->type]); 1583 1584 /* 1585 * We don't actually need to do a memcpy() in here if we used 1586 * the "sge" properly, but since we're only sending control messages 1587 * (not RAM in a performance-critical path), then its OK for now. 1588 * 1589 * The copy makes the RDMAControlHeader simpler to manipulate 1590 * for the time being. 1591 */ 1592 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1593 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1594 control_to_network((void *) wr->control); 1595 1596 if (buf) { 1597 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1598 } 1599 1600 1601 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1602 1603 if (ret > 0) { 1604 error_report("Failed to use post IB SEND for control"); 1605 return -ret; 1606 } 1607 1608 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1609 if (ret < 0) { 1610 error_report("rdma migration: send polling control error"); 1611 } 1612 1613 return ret; 1614 } 1615 1616 /* 1617 * Post a RECV work request in anticipation of some future receipt 1618 * of data on the control channel. 1619 */ 1620 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx) 1621 { 1622 struct ibv_recv_wr *bad_wr; 1623 struct ibv_sge sge = { 1624 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1625 .length = RDMA_CONTROL_MAX_BUFFER, 1626 .lkey = rdma->wr_data[idx].control_mr->lkey, 1627 }; 1628 1629 struct ibv_recv_wr recv_wr = { 1630 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1631 .sg_list = &sge, 1632 .num_sge = 1, 1633 }; 1634 1635 1636 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1637 return -1; 1638 } 1639 1640 return 0; 1641 } 1642 1643 /* 1644 * Block and wait for a RECV control channel message to arrive. 1645 */ 1646 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1647 RDMAControlHeader *head, int expecting, int idx) 1648 { 1649 uint32_t byte_len; 1650 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1651 &byte_len); 1652 1653 if (ret < 0) { 1654 error_report("rdma migration: recv polling control error!"); 1655 return ret; 1656 } 1657 1658 network_to_control((void *) rdma->wr_data[idx].control); 1659 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1660 1661 trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]); 1662 1663 if (expecting == RDMA_CONTROL_NONE) { 1664 trace_qemu_rdma_exchange_get_response_none(control_desc[head->type], 1665 head->type); 1666 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1667 error_report("Was expecting a %s (%d) control message" 1668 ", but got: %s (%d), length: %d", 1669 control_desc[expecting], expecting, 1670 control_desc[head->type], head->type, head->len); 1671 return -EIO; 1672 } 1673 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1674 error_report("too long length: %d", head->len); 1675 return -EINVAL; 1676 } 1677 if (sizeof(*head) + head->len != byte_len) { 1678 error_report("Malformed length: %d byte_len %d", head->len, byte_len); 1679 return -EINVAL; 1680 } 1681 1682 return 0; 1683 } 1684 1685 /* 1686 * When a RECV work request has completed, the work request's 1687 * buffer is pointed at the header. 1688 * 1689 * This will advance the pointer to the data portion 1690 * of the control message of the work request's buffer that 1691 * was populated after the work request finished. 1692 */ 1693 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1694 RDMAControlHeader *head) 1695 { 1696 rdma->wr_data[idx].control_len = head->len; 1697 rdma->wr_data[idx].control_curr = 1698 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1699 } 1700 1701 /* 1702 * This is an 'atomic' high-level operation to deliver a single, unified 1703 * control-channel message. 1704 * 1705 * Additionally, if the user is expecting some kind of reply to this message, 1706 * they can request a 'resp' response message be filled in by posting an 1707 * additional work request on behalf of the user and waiting for an additional 1708 * completion. 1709 * 1710 * The extra (optional) response is used during registration to us from having 1711 * to perform an *additional* exchange of message just to provide a response by 1712 * instead piggy-backing on the acknowledgement. 1713 */ 1714 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1715 uint8_t *data, RDMAControlHeader *resp, 1716 int *resp_idx, 1717 int (*callback)(RDMAContext *rdma)) 1718 { 1719 int ret = 0; 1720 1721 /* 1722 * Wait until the dest is ready before attempting to deliver the message 1723 * by waiting for a READY message. 1724 */ 1725 if (rdma->control_ready_expected) { 1726 RDMAControlHeader resp; 1727 ret = qemu_rdma_exchange_get_response(rdma, 1728 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY); 1729 if (ret < 0) { 1730 return ret; 1731 } 1732 } 1733 1734 /* 1735 * If the user is expecting a response, post a WR in anticipation of it. 1736 */ 1737 if (resp) { 1738 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA); 1739 if (ret) { 1740 error_report("rdma migration: error posting" 1741 " extra control recv for anticipated result!"); 1742 return ret; 1743 } 1744 } 1745 1746 /* 1747 * Post a WR to replace the one we just consumed for the READY message. 1748 */ 1749 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1750 if (ret) { 1751 error_report("rdma migration: error posting first control recv!"); 1752 return ret; 1753 } 1754 1755 /* 1756 * Deliver the control message that was requested. 1757 */ 1758 ret = qemu_rdma_post_send_control(rdma, data, head); 1759 1760 if (ret < 0) { 1761 error_report("Failed to send control buffer!"); 1762 return ret; 1763 } 1764 1765 /* 1766 * If we're expecting a response, block and wait for it. 1767 */ 1768 if (resp) { 1769 if (callback) { 1770 trace_qemu_rdma_exchange_send_issue_callback(); 1771 ret = callback(rdma); 1772 if (ret < 0) { 1773 return ret; 1774 } 1775 } 1776 1777 trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]); 1778 ret = qemu_rdma_exchange_get_response(rdma, resp, 1779 resp->type, RDMA_WRID_DATA); 1780 1781 if (ret < 0) { 1782 return ret; 1783 } 1784 1785 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1786 if (resp_idx) { 1787 *resp_idx = RDMA_WRID_DATA; 1788 } 1789 trace_qemu_rdma_exchange_send_received(control_desc[resp->type]); 1790 } 1791 1792 rdma->control_ready_expected = 1; 1793 1794 return 0; 1795 } 1796 1797 /* 1798 * This is an 'atomic' high-level operation to receive a single, unified 1799 * control-channel message. 1800 */ 1801 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 1802 int expecting) 1803 { 1804 RDMAControlHeader ready = { 1805 .len = 0, 1806 .type = RDMA_CONTROL_READY, 1807 .repeat = 1, 1808 }; 1809 int ret; 1810 1811 /* 1812 * Inform the source that we're ready to receive a message. 1813 */ 1814 ret = qemu_rdma_post_send_control(rdma, NULL, &ready); 1815 1816 if (ret < 0) { 1817 error_report("Failed to send control buffer!"); 1818 return ret; 1819 } 1820 1821 /* 1822 * Block and wait for the message. 1823 */ 1824 ret = qemu_rdma_exchange_get_response(rdma, head, 1825 expecting, RDMA_WRID_READY); 1826 1827 if (ret < 0) { 1828 return ret; 1829 } 1830 1831 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 1832 1833 /* 1834 * Post a new RECV work request to replace the one we just consumed. 1835 */ 1836 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1837 if (ret) { 1838 error_report("rdma migration: error posting second control recv!"); 1839 return ret; 1840 } 1841 1842 return 0; 1843 } 1844 1845 /* 1846 * Write an actual chunk of memory using RDMA. 1847 * 1848 * If we're using dynamic registration on the dest-side, we have to 1849 * send a registration command first. 1850 */ 1851 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, 1852 int current_index, uint64_t current_addr, 1853 uint64_t length) 1854 { 1855 struct ibv_sge sge; 1856 struct ibv_send_wr send_wr = { 0 }; 1857 struct ibv_send_wr *bad_wr; 1858 int reg_result_idx, ret, count = 0; 1859 uint64_t chunk, chunks; 1860 uint8_t *chunk_start, *chunk_end; 1861 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 1862 RDMARegister reg; 1863 RDMARegisterResult *reg_result; 1864 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 1865 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1866 .type = RDMA_CONTROL_REGISTER_REQUEST, 1867 .repeat = 1, 1868 }; 1869 1870 retry: 1871 sge.addr = (uintptr_t)(block->local_host_addr + 1872 (current_addr - block->offset)); 1873 sge.length = length; 1874 1875 chunk = ram_chunk_index(block->local_host_addr, 1876 (uint8_t *)(uintptr_t)sge.addr); 1877 chunk_start = ram_chunk_start(block, chunk); 1878 1879 if (block->is_ram_block) { 1880 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 1881 1882 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1883 chunks--; 1884 } 1885 } else { 1886 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 1887 1888 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1889 chunks--; 1890 } 1891 } 1892 1893 trace_qemu_rdma_write_one_top(chunks + 1, 1894 (chunks + 1) * 1895 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 1896 1897 chunk_end = ram_chunk_end(block, chunk + chunks); 1898 1899 if (!rdma->pin_all) { 1900 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1901 qemu_rdma_unregister_waiting(rdma); 1902 #endif 1903 } 1904 1905 while (test_bit(chunk, block->transit_bitmap)) { 1906 (void)count; 1907 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 1908 sge.addr, length, rdma->nb_sent, block->nb_chunks); 1909 1910 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 1911 1912 if (ret < 0) { 1913 error_report("Failed to Wait for previous write to complete " 1914 "block %d chunk %" PRIu64 1915 " current %" PRIu64 " len %" PRIu64 " %d", 1916 current_index, chunk, sge.addr, length, rdma->nb_sent); 1917 return ret; 1918 } 1919 } 1920 1921 if (!rdma->pin_all || !block->is_ram_block) { 1922 if (!block->remote_keys[chunk]) { 1923 /* 1924 * This chunk has not yet been registered, so first check to see 1925 * if the entire chunk is zero. If so, tell the other size to 1926 * memset() + madvise() the entire chunk without RDMA. 1927 */ 1928 1929 if (can_use_buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr, 1930 length) 1931 && buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr, 1932 length) == length) { 1933 RDMACompress comp = { 1934 .offset = current_addr, 1935 .value = 0, 1936 .block_idx = current_index, 1937 .length = length, 1938 }; 1939 1940 head.len = sizeof(comp); 1941 head.type = RDMA_CONTROL_COMPRESS; 1942 1943 trace_qemu_rdma_write_one_zero(chunk, sge.length, 1944 current_index, current_addr); 1945 1946 compress_to_network(rdma, &comp); 1947 ret = qemu_rdma_exchange_send(rdma, &head, 1948 (uint8_t *) &comp, NULL, NULL, NULL); 1949 1950 if (ret < 0) { 1951 return -EIO; 1952 } 1953 1954 acct_update_position(f, sge.length, true); 1955 1956 return 1; 1957 } 1958 1959 /* 1960 * Otherwise, tell other side to register. 1961 */ 1962 reg.current_index = current_index; 1963 if (block->is_ram_block) { 1964 reg.key.current_addr = current_addr; 1965 } else { 1966 reg.key.chunk = chunk; 1967 } 1968 reg.chunks = chunks; 1969 1970 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 1971 current_addr); 1972 1973 register_to_network(rdma, ®); 1974 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1975 &resp, ®_result_idx, NULL); 1976 if (ret < 0) { 1977 return ret; 1978 } 1979 1980 /* try to overlap this single registration with the one we sent. */ 1981 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1982 &sge.lkey, NULL, chunk, 1983 chunk_start, chunk_end)) { 1984 error_report("cannot get lkey"); 1985 return -EINVAL; 1986 } 1987 1988 reg_result = (RDMARegisterResult *) 1989 rdma->wr_data[reg_result_idx].control_curr; 1990 1991 network_to_result(reg_result); 1992 1993 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 1994 reg_result->rkey, chunk); 1995 1996 block->remote_keys[chunk] = reg_result->rkey; 1997 block->remote_host_addr = reg_result->host_addr; 1998 } else { 1999 /* already registered before */ 2000 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2001 &sge.lkey, NULL, chunk, 2002 chunk_start, chunk_end)) { 2003 error_report("cannot get lkey!"); 2004 return -EINVAL; 2005 } 2006 } 2007 2008 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 2009 } else { 2010 send_wr.wr.rdma.rkey = block->remote_rkey; 2011 2012 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2013 &sge.lkey, NULL, chunk, 2014 chunk_start, chunk_end)) { 2015 error_report("cannot get lkey!"); 2016 return -EINVAL; 2017 } 2018 } 2019 2020 /* 2021 * Encode the ram block index and chunk within this wrid. 2022 * We will use this information at the time of completion 2023 * to figure out which bitmap to check against and then which 2024 * chunk in the bitmap to look for. 2025 */ 2026 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 2027 current_index, chunk); 2028 2029 send_wr.opcode = IBV_WR_RDMA_WRITE; 2030 send_wr.send_flags = IBV_SEND_SIGNALED; 2031 send_wr.sg_list = &sge; 2032 send_wr.num_sge = 1; 2033 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2034 (current_addr - block->offset); 2035 2036 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2037 sge.length); 2038 2039 /* 2040 * ibv_post_send() does not return negative error numbers, 2041 * per the specification they are positive - no idea why. 2042 */ 2043 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2044 2045 if (ret == ENOMEM) { 2046 trace_qemu_rdma_write_one_queue_full(); 2047 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2048 if (ret < 0) { 2049 error_report("rdma migration: failed to make " 2050 "room in full send queue! %d", ret); 2051 return ret; 2052 } 2053 2054 goto retry; 2055 2056 } else if (ret > 0) { 2057 perror("rdma migration: post rdma write failed"); 2058 return -ret; 2059 } 2060 2061 set_bit(chunk, block->transit_bitmap); 2062 acct_update_position(f, sge.length, false); 2063 rdma->total_writes++; 2064 2065 return 0; 2066 } 2067 2068 /* 2069 * Push out any unwritten RDMA operations. 2070 * 2071 * We support sending out multiple chunks at the same time. 2072 * Not all of them need to get signaled in the completion queue. 2073 */ 2074 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) 2075 { 2076 int ret; 2077 2078 if (!rdma->current_length) { 2079 return 0; 2080 } 2081 2082 ret = qemu_rdma_write_one(f, rdma, 2083 rdma->current_index, rdma->current_addr, rdma->current_length); 2084 2085 if (ret < 0) { 2086 return ret; 2087 } 2088 2089 if (ret == 0) { 2090 rdma->nb_sent++; 2091 trace_qemu_rdma_write_flush(rdma->nb_sent); 2092 } 2093 2094 rdma->current_length = 0; 2095 rdma->current_addr = 0; 2096 2097 return 0; 2098 } 2099 2100 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma, 2101 uint64_t offset, uint64_t len) 2102 { 2103 RDMALocalBlock *block; 2104 uint8_t *host_addr; 2105 uint8_t *chunk_end; 2106 2107 if (rdma->current_index < 0) { 2108 return 0; 2109 } 2110 2111 if (rdma->current_chunk < 0) { 2112 return 0; 2113 } 2114 2115 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2116 host_addr = block->local_host_addr + (offset - block->offset); 2117 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2118 2119 if (rdma->current_length == 0) { 2120 return 0; 2121 } 2122 2123 /* 2124 * Only merge into chunk sequentially. 2125 */ 2126 if (offset != (rdma->current_addr + rdma->current_length)) { 2127 return 0; 2128 } 2129 2130 if (offset < block->offset) { 2131 return 0; 2132 } 2133 2134 if ((offset + len) > (block->offset + block->length)) { 2135 return 0; 2136 } 2137 2138 if ((host_addr + len) > chunk_end) { 2139 return 0; 2140 } 2141 2142 return 1; 2143 } 2144 2145 /* 2146 * We're not actually writing here, but doing three things: 2147 * 2148 * 1. Identify the chunk the buffer belongs to. 2149 * 2. If the chunk is full or the buffer doesn't belong to the current 2150 * chunk, then start a new chunk and flush() the old chunk. 2151 * 3. To keep the hardware busy, we also group chunks into batches 2152 * and only require that a batch gets acknowledged in the completion 2153 * qeueue instead of each individual chunk. 2154 */ 2155 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma, 2156 uint64_t block_offset, uint64_t offset, 2157 uint64_t len) 2158 { 2159 uint64_t current_addr = block_offset + offset; 2160 uint64_t index = rdma->current_index; 2161 uint64_t chunk = rdma->current_chunk; 2162 int ret; 2163 2164 /* If we cannot merge it, we flush the current buffer first. */ 2165 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) { 2166 ret = qemu_rdma_write_flush(f, rdma); 2167 if (ret) { 2168 return ret; 2169 } 2170 rdma->current_length = 0; 2171 rdma->current_addr = current_addr; 2172 2173 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2174 offset, len, &index, &chunk); 2175 if (ret) { 2176 error_report("ram block search failed"); 2177 return ret; 2178 } 2179 rdma->current_index = index; 2180 rdma->current_chunk = chunk; 2181 } 2182 2183 /* merge it */ 2184 rdma->current_length += len; 2185 2186 /* flush it if buffer is too large */ 2187 if (rdma->current_length >= RDMA_MERGE_MAX) { 2188 return qemu_rdma_write_flush(f, rdma); 2189 } 2190 2191 return 0; 2192 } 2193 2194 static void qemu_rdma_cleanup(RDMAContext *rdma) 2195 { 2196 struct rdma_cm_event *cm_event; 2197 int ret, idx; 2198 2199 if (rdma->cm_id && rdma->connected) { 2200 if (rdma->error_state) { 2201 RDMAControlHeader head = { .len = 0, 2202 .type = RDMA_CONTROL_ERROR, 2203 .repeat = 1, 2204 }; 2205 error_report("Early error. Sending error."); 2206 qemu_rdma_post_send_control(rdma, NULL, &head); 2207 } 2208 2209 ret = rdma_disconnect(rdma->cm_id); 2210 if (!ret) { 2211 trace_qemu_rdma_cleanup_waiting_for_disconnect(); 2212 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2213 if (!ret) { 2214 rdma_ack_cm_event(cm_event); 2215 } 2216 } 2217 trace_qemu_rdma_cleanup_disconnect(); 2218 rdma->connected = false; 2219 } 2220 2221 g_free(rdma->dest_blocks); 2222 rdma->dest_blocks = NULL; 2223 2224 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2225 if (rdma->wr_data[idx].control_mr) { 2226 rdma->total_registrations--; 2227 ibv_dereg_mr(rdma->wr_data[idx].control_mr); 2228 } 2229 rdma->wr_data[idx].control_mr = NULL; 2230 } 2231 2232 if (rdma->local_ram_blocks.block) { 2233 while (rdma->local_ram_blocks.nb_blocks) { 2234 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); 2235 } 2236 } 2237 2238 if (rdma->qp) { 2239 rdma_destroy_qp(rdma->cm_id); 2240 rdma->qp = NULL; 2241 } 2242 if (rdma->cq) { 2243 ibv_destroy_cq(rdma->cq); 2244 rdma->cq = NULL; 2245 } 2246 if (rdma->comp_channel) { 2247 ibv_destroy_comp_channel(rdma->comp_channel); 2248 rdma->comp_channel = NULL; 2249 } 2250 if (rdma->pd) { 2251 ibv_dealloc_pd(rdma->pd); 2252 rdma->pd = NULL; 2253 } 2254 if (rdma->cm_id) { 2255 rdma_destroy_id(rdma->cm_id); 2256 rdma->cm_id = NULL; 2257 } 2258 if (rdma->listen_id) { 2259 rdma_destroy_id(rdma->listen_id); 2260 rdma->listen_id = NULL; 2261 } 2262 if (rdma->channel) { 2263 rdma_destroy_event_channel(rdma->channel); 2264 rdma->channel = NULL; 2265 } 2266 g_free(rdma->host); 2267 rdma->host = NULL; 2268 } 2269 2270 2271 static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all) 2272 { 2273 int ret, idx; 2274 Error *local_err = NULL, **temp = &local_err; 2275 2276 /* 2277 * Will be validated against destination's actual capabilities 2278 * after the connect() completes. 2279 */ 2280 rdma->pin_all = pin_all; 2281 2282 ret = qemu_rdma_resolve_host(rdma, temp); 2283 if (ret) { 2284 goto err_rdma_source_init; 2285 } 2286 2287 ret = qemu_rdma_alloc_pd_cq(rdma); 2288 if (ret) { 2289 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()" 2290 " limits may be too low. Please check $ ulimit -a # and " 2291 "search for 'ulimit -l' in the output"); 2292 goto err_rdma_source_init; 2293 } 2294 2295 ret = qemu_rdma_alloc_qp(rdma); 2296 if (ret) { 2297 ERROR(temp, "rdma migration: error allocating qp!"); 2298 goto err_rdma_source_init; 2299 } 2300 2301 ret = qemu_rdma_init_ram_blocks(rdma); 2302 if (ret) { 2303 ERROR(temp, "rdma migration: error initializing ram blocks!"); 2304 goto err_rdma_source_init; 2305 } 2306 2307 /* Build the hash that maps from offset to RAMBlock */ 2308 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 2309 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) { 2310 g_hash_table_insert(rdma->blockmap, 2311 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset, 2312 &rdma->local_ram_blocks.block[idx]); 2313 } 2314 2315 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2316 ret = qemu_rdma_reg_control(rdma, idx); 2317 if (ret) { 2318 ERROR(temp, "rdma migration: error registering %d control!", 2319 idx); 2320 goto err_rdma_source_init; 2321 } 2322 } 2323 2324 return 0; 2325 2326 err_rdma_source_init: 2327 error_propagate(errp, local_err); 2328 qemu_rdma_cleanup(rdma); 2329 return -1; 2330 } 2331 2332 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp) 2333 { 2334 RDMACapabilities cap = { 2335 .version = RDMA_CONTROL_VERSION_CURRENT, 2336 .flags = 0, 2337 }; 2338 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2339 .retry_count = 5, 2340 .private_data = &cap, 2341 .private_data_len = sizeof(cap), 2342 }; 2343 struct rdma_cm_event *cm_event; 2344 int ret; 2345 2346 /* 2347 * Only negotiate the capability with destination if the user 2348 * on the source first requested the capability. 2349 */ 2350 if (rdma->pin_all) { 2351 trace_qemu_rdma_connect_pin_all_requested(); 2352 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2353 } 2354 2355 caps_to_network(&cap); 2356 2357 ret = rdma_connect(rdma->cm_id, &conn_param); 2358 if (ret) { 2359 perror("rdma_connect"); 2360 ERROR(errp, "connecting to destination!"); 2361 goto err_rdma_source_connect; 2362 } 2363 2364 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2365 if (ret) { 2366 perror("rdma_get_cm_event after rdma_connect"); 2367 ERROR(errp, "connecting to destination!"); 2368 rdma_ack_cm_event(cm_event); 2369 goto err_rdma_source_connect; 2370 } 2371 2372 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2373 perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect"); 2374 ERROR(errp, "connecting to destination!"); 2375 rdma_ack_cm_event(cm_event); 2376 goto err_rdma_source_connect; 2377 } 2378 rdma->connected = true; 2379 2380 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2381 network_to_caps(&cap); 2382 2383 /* 2384 * Verify that the *requested* capabilities are supported by the destination 2385 * and disable them otherwise. 2386 */ 2387 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2388 ERROR(errp, "Server cannot support pinning all memory. " 2389 "Will register memory dynamically."); 2390 rdma->pin_all = false; 2391 } 2392 2393 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2394 2395 rdma_ack_cm_event(cm_event); 2396 2397 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2398 if (ret) { 2399 ERROR(errp, "posting second control recv!"); 2400 goto err_rdma_source_connect; 2401 } 2402 2403 rdma->control_ready_expected = 1; 2404 rdma->nb_sent = 0; 2405 return 0; 2406 2407 err_rdma_source_connect: 2408 qemu_rdma_cleanup(rdma); 2409 return -1; 2410 } 2411 2412 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2413 { 2414 int ret, idx; 2415 struct rdma_cm_id *listen_id; 2416 char ip[40] = "unknown"; 2417 struct rdma_addrinfo *res, *e; 2418 char port_str[16]; 2419 2420 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2421 rdma->wr_data[idx].control_len = 0; 2422 rdma->wr_data[idx].control_curr = NULL; 2423 } 2424 2425 if (!rdma->host || !rdma->host[0]) { 2426 ERROR(errp, "RDMA host is not set!"); 2427 rdma->error_state = -EINVAL; 2428 return -1; 2429 } 2430 /* create CM channel */ 2431 rdma->channel = rdma_create_event_channel(); 2432 if (!rdma->channel) { 2433 ERROR(errp, "could not create rdma event channel"); 2434 rdma->error_state = -EINVAL; 2435 return -1; 2436 } 2437 2438 /* create CM id */ 2439 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2440 if (ret) { 2441 ERROR(errp, "could not create cm_id!"); 2442 goto err_dest_init_create_listen_id; 2443 } 2444 2445 snprintf(port_str, 16, "%d", rdma->port); 2446 port_str[15] = '\0'; 2447 2448 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2449 if (ret < 0) { 2450 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 2451 goto err_dest_init_bind_addr; 2452 } 2453 2454 for (e = res; e != NULL; e = e->ai_next) { 2455 inet_ntop(e->ai_family, 2456 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2457 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2458 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2459 if (ret) { 2460 continue; 2461 } 2462 if (e->ai_family == AF_INET6) { 2463 ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs); 2464 if (ret) { 2465 continue; 2466 } 2467 } 2468 break; 2469 } 2470 2471 if (!e) { 2472 ERROR(errp, "Error: could not rdma_bind_addr!"); 2473 goto err_dest_init_bind_addr; 2474 } 2475 2476 rdma->listen_id = listen_id; 2477 qemu_rdma_dump_gid("dest_init", listen_id); 2478 return 0; 2479 2480 err_dest_init_bind_addr: 2481 rdma_destroy_id(listen_id); 2482 err_dest_init_create_listen_id: 2483 rdma_destroy_event_channel(rdma->channel); 2484 rdma->channel = NULL; 2485 rdma->error_state = ret; 2486 return ret; 2487 2488 } 2489 2490 static void *qemu_rdma_data_init(const char *host_port, Error **errp) 2491 { 2492 RDMAContext *rdma = NULL; 2493 InetSocketAddress *addr; 2494 2495 if (host_port) { 2496 rdma = g_new0(RDMAContext, 1); 2497 rdma->current_index = -1; 2498 rdma->current_chunk = -1; 2499 2500 addr = inet_parse(host_port, NULL); 2501 if (addr != NULL) { 2502 rdma->port = atoi(addr->port); 2503 rdma->host = g_strdup(addr->host); 2504 } else { 2505 ERROR(errp, "bad RDMA migration address '%s'", host_port); 2506 g_free(rdma); 2507 rdma = NULL; 2508 } 2509 2510 qapi_free_InetSocketAddress(addr); 2511 } 2512 2513 return rdma; 2514 } 2515 2516 /* 2517 * QEMUFile interface to the control channel. 2518 * SEND messages for control only. 2519 * VM's ram is handled with regular RDMA messages. 2520 */ 2521 static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, 2522 int64_t pos, size_t size) 2523 { 2524 QEMUFileRDMA *r = opaque; 2525 QEMUFile *f = r->file; 2526 RDMAContext *rdma = r->rdma; 2527 size_t remaining = size; 2528 uint8_t * data = (void *) buf; 2529 int ret; 2530 2531 CHECK_ERROR_STATE(); 2532 2533 /* 2534 * Push out any writes that 2535 * we're queued up for VM's ram. 2536 */ 2537 ret = qemu_rdma_write_flush(f, rdma); 2538 if (ret < 0) { 2539 rdma->error_state = ret; 2540 return ret; 2541 } 2542 2543 while (remaining) { 2544 RDMAControlHeader head; 2545 2546 r->len = MIN(remaining, RDMA_SEND_INCREMENT); 2547 remaining -= r->len; 2548 2549 /* Guaranteed to fit due to RDMA_SEND_INCREMENT MIN above */ 2550 head.len = (uint32_t)r->len; 2551 head.type = RDMA_CONTROL_QEMU_FILE; 2552 2553 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); 2554 2555 if (ret < 0) { 2556 rdma->error_state = ret; 2557 return ret; 2558 } 2559 2560 data += r->len; 2561 } 2562 2563 return size; 2564 } 2565 2566 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2567 size_t size, int idx) 2568 { 2569 size_t len = 0; 2570 2571 if (rdma->wr_data[idx].control_len) { 2572 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2573 2574 len = MIN(size, rdma->wr_data[idx].control_len); 2575 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2576 rdma->wr_data[idx].control_curr += len; 2577 rdma->wr_data[idx].control_len -= len; 2578 } 2579 2580 return len; 2581 } 2582 2583 /* 2584 * QEMUFile interface to the control channel. 2585 * RDMA links don't use bytestreams, so we have to 2586 * return bytes to QEMUFile opportunistically. 2587 */ 2588 static ssize_t qemu_rdma_get_buffer(void *opaque, uint8_t *buf, 2589 int64_t pos, size_t size) 2590 { 2591 QEMUFileRDMA *r = opaque; 2592 RDMAContext *rdma = r->rdma; 2593 RDMAControlHeader head; 2594 int ret = 0; 2595 2596 CHECK_ERROR_STATE(); 2597 2598 /* 2599 * First, we hold on to the last SEND message we 2600 * were given and dish out the bytes until we run 2601 * out of bytes. 2602 */ 2603 r->len = qemu_rdma_fill(r->rdma, buf, size, 0); 2604 if (r->len) { 2605 return r->len; 2606 } 2607 2608 /* 2609 * Once we run out, we block and wait for another 2610 * SEND message to arrive. 2611 */ 2612 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); 2613 2614 if (ret < 0) { 2615 rdma->error_state = ret; 2616 return ret; 2617 } 2618 2619 /* 2620 * SEND was received with new bytes, now try again. 2621 */ 2622 return qemu_rdma_fill(r->rdma, buf, size, 0); 2623 } 2624 2625 /* 2626 * Block until all the outstanding chunks have been delivered by the hardware. 2627 */ 2628 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) 2629 { 2630 int ret; 2631 2632 if (qemu_rdma_write_flush(f, rdma) < 0) { 2633 return -EIO; 2634 } 2635 2636 while (rdma->nb_sent) { 2637 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2638 if (ret < 0) { 2639 error_report("rdma migration: complete polling error!"); 2640 return -EIO; 2641 } 2642 } 2643 2644 qemu_rdma_unregister_waiting(rdma); 2645 2646 return 0; 2647 } 2648 2649 static int qemu_rdma_close(void *opaque) 2650 { 2651 trace_qemu_rdma_close(); 2652 QEMUFileRDMA *r = opaque; 2653 if (r->rdma) { 2654 qemu_rdma_cleanup(r->rdma); 2655 g_free(r->rdma); 2656 } 2657 g_free(r); 2658 return 0; 2659 } 2660 2661 /* 2662 * Parameters: 2663 * @offset == 0 : 2664 * This means that 'block_offset' is a full virtual address that does not 2665 * belong to a RAMBlock of the virtual machine and instead 2666 * represents a private malloc'd memory area that the caller wishes to 2667 * transfer. 2668 * 2669 * @offset != 0 : 2670 * Offset is an offset to be added to block_offset and used 2671 * to also lookup the corresponding RAMBlock. 2672 * 2673 * @size > 0 : 2674 * Initiate an transfer this size. 2675 * 2676 * @size == 0 : 2677 * A 'hint' or 'advice' that means that we wish to speculatively 2678 * and asynchronously unregister this memory. In this case, there is no 2679 * guarantee that the unregister will actually happen, for example, 2680 * if the memory is being actively transmitted. Additionally, the memory 2681 * may be re-registered at any future time if a write within the same 2682 * chunk was requested again, even if you attempted to unregister it 2683 * here. 2684 * 2685 * @size < 0 : TODO, not yet supported 2686 * Unregister the memory NOW. This means that the caller does not 2687 * expect there to be any future RDMA transfers and we just want to clean 2688 * things up. This is used in case the upper layer owns the memory and 2689 * cannot wait for qemu_fclose() to occur. 2690 * 2691 * @bytes_sent : User-specificed pointer to indicate how many bytes were 2692 * sent. Usually, this will not be more than a few bytes of 2693 * the protocol because most transfers are sent asynchronously. 2694 */ 2695 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, 2696 ram_addr_t block_offset, ram_addr_t offset, 2697 size_t size, uint64_t *bytes_sent) 2698 { 2699 QEMUFileRDMA *rfile = opaque; 2700 RDMAContext *rdma = rfile->rdma; 2701 int ret; 2702 2703 CHECK_ERROR_STATE(); 2704 2705 qemu_fflush(f); 2706 2707 if (size > 0) { 2708 /* 2709 * Add this page to the current 'chunk'. If the chunk 2710 * is full, or the page doen't belong to the current chunk, 2711 * an actual RDMA write will occur and a new chunk will be formed. 2712 */ 2713 ret = qemu_rdma_write(f, rdma, block_offset, offset, size); 2714 if (ret < 0) { 2715 error_report("rdma migration: write error! %d", ret); 2716 goto err; 2717 } 2718 2719 /* 2720 * We always return 1 bytes because the RDMA 2721 * protocol is completely asynchronous. We do not yet know 2722 * whether an identified chunk is zero or not because we're 2723 * waiting for other pages to potentially be merged with 2724 * the current chunk. So, we have to call qemu_update_position() 2725 * later on when the actual write occurs. 2726 */ 2727 if (bytes_sent) { 2728 *bytes_sent = 1; 2729 } 2730 } else { 2731 uint64_t index, chunk; 2732 2733 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long 2734 if (size < 0) { 2735 ret = qemu_rdma_drain_cq(f, rdma); 2736 if (ret < 0) { 2737 fprintf(stderr, "rdma: failed to synchronously drain" 2738 " completion queue before unregistration.\n"); 2739 goto err; 2740 } 2741 } 2742 */ 2743 2744 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2745 offset, size, &index, &chunk); 2746 2747 if (ret) { 2748 error_report("ram block search failed"); 2749 goto err; 2750 } 2751 2752 qemu_rdma_signal_unregister(rdma, index, chunk, 0); 2753 2754 /* 2755 * TODO: Synchronous, guaranteed unregistration (should not occur during 2756 * fast-path). Otherwise, unregisters will process on the next call to 2757 * qemu_rdma_drain_cq() 2758 if (size < 0) { 2759 qemu_rdma_unregister_waiting(rdma); 2760 } 2761 */ 2762 } 2763 2764 /* 2765 * Drain the Completion Queue if possible, but do not block, 2766 * just poll. 2767 * 2768 * If nothing to poll, the end of the iteration will do this 2769 * again to make sure we don't overflow the request queue. 2770 */ 2771 while (1) { 2772 uint64_t wr_id, wr_id_in; 2773 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL); 2774 if (ret < 0) { 2775 error_report("rdma migration: polling error! %d", ret); 2776 goto err; 2777 } 2778 2779 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 2780 2781 if (wr_id == RDMA_WRID_NONE) { 2782 break; 2783 } 2784 } 2785 2786 return RAM_SAVE_CONTROL_DELAYED; 2787 err: 2788 rdma->error_state = ret; 2789 return ret; 2790 } 2791 2792 static int qemu_rdma_accept(RDMAContext *rdma) 2793 { 2794 RDMACapabilities cap; 2795 struct rdma_conn_param conn_param = { 2796 .responder_resources = 2, 2797 .private_data = &cap, 2798 .private_data_len = sizeof(cap), 2799 }; 2800 struct rdma_cm_event *cm_event; 2801 struct ibv_context *verbs; 2802 int ret = -EINVAL; 2803 int idx; 2804 2805 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2806 if (ret) { 2807 goto err_rdma_dest_wait; 2808 } 2809 2810 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 2811 rdma_ack_cm_event(cm_event); 2812 goto err_rdma_dest_wait; 2813 } 2814 2815 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2816 2817 network_to_caps(&cap); 2818 2819 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 2820 error_report("Unknown source RDMA version: %d, bailing...", 2821 cap.version); 2822 rdma_ack_cm_event(cm_event); 2823 goto err_rdma_dest_wait; 2824 } 2825 2826 /* 2827 * Respond with only the capabilities this version of QEMU knows about. 2828 */ 2829 cap.flags &= known_capabilities; 2830 2831 /* 2832 * Enable the ones that we do know about. 2833 * Add other checks here as new ones are introduced. 2834 */ 2835 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 2836 rdma->pin_all = true; 2837 } 2838 2839 rdma->cm_id = cm_event->id; 2840 verbs = cm_event->id->verbs; 2841 2842 rdma_ack_cm_event(cm_event); 2843 2844 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 2845 2846 caps_to_network(&cap); 2847 2848 trace_qemu_rdma_accept_pin_verbsc(verbs); 2849 2850 if (!rdma->verbs) { 2851 rdma->verbs = verbs; 2852 } else if (rdma->verbs != verbs) { 2853 error_report("ibv context not matching %p, %p!", rdma->verbs, 2854 verbs); 2855 goto err_rdma_dest_wait; 2856 } 2857 2858 qemu_rdma_dump_id("dest_init", verbs); 2859 2860 ret = qemu_rdma_alloc_pd_cq(rdma); 2861 if (ret) { 2862 error_report("rdma migration: error allocating pd and cq!"); 2863 goto err_rdma_dest_wait; 2864 } 2865 2866 ret = qemu_rdma_alloc_qp(rdma); 2867 if (ret) { 2868 error_report("rdma migration: error allocating qp!"); 2869 goto err_rdma_dest_wait; 2870 } 2871 2872 ret = qemu_rdma_init_ram_blocks(rdma); 2873 if (ret) { 2874 error_report("rdma migration: error initializing ram blocks!"); 2875 goto err_rdma_dest_wait; 2876 } 2877 2878 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2879 ret = qemu_rdma_reg_control(rdma, idx); 2880 if (ret) { 2881 error_report("rdma: error registering %d control", idx); 2882 goto err_rdma_dest_wait; 2883 } 2884 } 2885 2886 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 2887 2888 ret = rdma_accept(rdma->cm_id, &conn_param); 2889 if (ret) { 2890 error_report("rdma_accept returns %d", ret); 2891 goto err_rdma_dest_wait; 2892 } 2893 2894 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2895 if (ret) { 2896 error_report("rdma_accept get_cm_event failed %d", ret); 2897 goto err_rdma_dest_wait; 2898 } 2899 2900 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2901 error_report("rdma_accept not event established"); 2902 rdma_ack_cm_event(cm_event); 2903 goto err_rdma_dest_wait; 2904 } 2905 2906 rdma_ack_cm_event(cm_event); 2907 rdma->connected = true; 2908 2909 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2910 if (ret) { 2911 error_report("rdma migration: error posting second control recv"); 2912 goto err_rdma_dest_wait; 2913 } 2914 2915 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 2916 2917 return 0; 2918 2919 err_rdma_dest_wait: 2920 rdma->error_state = ret; 2921 qemu_rdma_cleanup(rdma); 2922 return ret; 2923 } 2924 2925 static int dest_ram_sort_func(const void *a, const void *b) 2926 { 2927 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index; 2928 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index; 2929 2930 return (a_index < b_index) ? -1 : (a_index != b_index); 2931 } 2932 2933 /* 2934 * During each iteration of the migration, we listen for instructions 2935 * by the source VM to perform dynamic page registrations before they 2936 * can perform RDMA operations. 2937 * 2938 * We respond with the 'rkey'. 2939 * 2940 * Keep doing this until the source tells us to stop. 2941 */ 2942 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) 2943 { 2944 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 2945 .type = RDMA_CONTROL_REGISTER_RESULT, 2946 .repeat = 0, 2947 }; 2948 RDMAControlHeader unreg_resp = { .len = 0, 2949 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 2950 .repeat = 0, 2951 }; 2952 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 2953 .repeat = 1 }; 2954 QEMUFileRDMA *rfile = opaque; 2955 RDMAContext *rdma = rfile->rdma; 2956 RDMALocalBlocks *local = &rdma->local_ram_blocks; 2957 RDMAControlHeader head; 2958 RDMARegister *reg, *registers; 2959 RDMACompress *comp; 2960 RDMARegisterResult *reg_result; 2961 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 2962 RDMALocalBlock *block; 2963 void *host_addr; 2964 int ret = 0; 2965 int idx = 0; 2966 int count = 0; 2967 int i = 0; 2968 2969 CHECK_ERROR_STATE(); 2970 2971 do { 2972 trace_qemu_rdma_registration_handle_wait(); 2973 2974 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE); 2975 2976 if (ret < 0) { 2977 break; 2978 } 2979 2980 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 2981 error_report("rdma: Too many requests in this message (%d)." 2982 "Bailing.", head.repeat); 2983 ret = -EIO; 2984 break; 2985 } 2986 2987 switch (head.type) { 2988 case RDMA_CONTROL_COMPRESS: 2989 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 2990 network_to_compress(comp); 2991 2992 trace_qemu_rdma_registration_handle_compress(comp->length, 2993 comp->block_idx, 2994 comp->offset); 2995 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) { 2996 error_report("rdma: 'compress' bad block index %u (vs %d)", 2997 (unsigned int)comp->block_idx, 2998 rdma->local_ram_blocks.nb_blocks); 2999 ret = -EIO; 3000 goto out; 3001 } 3002 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 3003 3004 host_addr = block->local_host_addr + 3005 (comp->offset - block->offset); 3006 3007 ram_handle_compressed(host_addr, comp->value, comp->length); 3008 break; 3009 3010 case RDMA_CONTROL_REGISTER_FINISHED: 3011 trace_qemu_rdma_registration_handle_finished(); 3012 goto out; 3013 3014 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 3015 trace_qemu_rdma_registration_handle_ram_blocks(); 3016 3017 /* Sort our local RAM Block list so it's the same as the source, 3018 * we can do this since we've filled in a src_index in the list 3019 * as we received the RAMBlock list earlier. 3020 */ 3021 qsort(rdma->local_ram_blocks.block, 3022 rdma->local_ram_blocks.nb_blocks, 3023 sizeof(RDMALocalBlock), dest_ram_sort_func); 3024 if (rdma->pin_all) { 3025 ret = qemu_rdma_reg_whole_ram_blocks(rdma); 3026 if (ret) { 3027 error_report("rdma migration: error dest " 3028 "registering ram blocks"); 3029 goto out; 3030 } 3031 } 3032 3033 /* 3034 * Dest uses this to prepare to transmit the RAMBlock descriptions 3035 * to the source VM after connection setup. 3036 * Both sides use the "remote" structure to communicate and update 3037 * their "local" descriptions with what was sent. 3038 */ 3039 for (i = 0; i < local->nb_blocks; i++) { 3040 rdma->dest_blocks[i].remote_host_addr = 3041 (uintptr_t)(local->block[i].local_host_addr); 3042 3043 if (rdma->pin_all) { 3044 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 3045 } 3046 3047 rdma->dest_blocks[i].offset = local->block[i].offset; 3048 rdma->dest_blocks[i].length = local->block[i].length; 3049 3050 dest_block_to_network(&rdma->dest_blocks[i]); 3051 trace_qemu_rdma_registration_handle_ram_blocks_loop( 3052 local->block[i].block_name, 3053 local->block[i].offset, 3054 local->block[i].length, 3055 local->block[i].local_host_addr, 3056 local->block[i].src_index); 3057 } 3058 3059 blocks.len = rdma->local_ram_blocks.nb_blocks 3060 * sizeof(RDMADestBlock); 3061 3062 3063 ret = qemu_rdma_post_send_control(rdma, 3064 (uint8_t *) rdma->dest_blocks, &blocks); 3065 3066 if (ret < 0) { 3067 error_report("rdma migration: error sending remote info"); 3068 goto out; 3069 } 3070 3071 break; 3072 case RDMA_CONTROL_REGISTER_REQUEST: 3073 trace_qemu_rdma_registration_handle_register(head.repeat); 3074 3075 reg_resp.repeat = head.repeat; 3076 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3077 3078 for (count = 0; count < head.repeat; count++) { 3079 uint64_t chunk; 3080 uint8_t *chunk_start, *chunk_end; 3081 3082 reg = ®isters[count]; 3083 network_to_register(reg); 3084 3085 reg_result = &results[count]; 3086 3087 trace_qemu_rdma_registration_handle_register_loop(count, 3088 reg->current_index, reg->key.current_addr, reg->chunks); 3089 3090 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) { 3091 error_report("rdma: 'register' bad block index %u (vs %d)", 3092 (unsigned int)reg->current_index, 3093 rdma->local_ram_blocks.nb_blocks); 3094 ret = -ENOENT; 3095 goto out; 3096 } 3097 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3098 if (block->is_ram_block) { 3099 if (block->offset > reg->key.current_addr) { 3100 error_report("rdma: bad register address for block %s" 3101 " offset: %" PRIx64 " current_addr: %" PRIx64, 3102 block->block_name, block->offset, 3103 reg->key.current_addr); 3104 ret = -ERANGE; 3105 goto out; 3106 } 3107 host_addr = (block->local_host_addr + 3108 (reg->key.current_addr - block->offset)); 3109 chunk = ram_chunk_index(block->local_host_addr, 3110 (uint8_t *) host_addr); 3111 } else { 3112 chunk = reg->key.chunk; 3113 host_addr = block->local_host_addr + 3114 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3115 /* Check for particularly bad chunk value */ 3116 if (host_addr < (void *)block->local_host_addr) { 3117 error_report("rdma: bad chunk for block %s" 3118 " chunk: %" PRIx64, 3119 block->block_name, reg->key.chunk); 3120 ret = -ERANGE; 3121 goto out; 3122 } 3123 } 3124 chunk_start = ram_chunk_start(block, chunk); 3125 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3126 if (qemu_rdma_register_and_get_keys(rdma, block, 3127 (uintptr_t)host_addr, NULL, ®_result->rkey, 3128 chunk, chunk_start, chunk_end)) { 3129 error_report("cannot get rkey"); 3130 ret = -EINVAL; 3131 goto out; 3132 } 3133 3134 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3135 3136 trace_qemu_rdma_registration_handle_register_rkey( 3137 reg_result->rkey); 3138 3139 result_to_network(reg_result); 3140 } 3141 3142 ret = qemu_rdma_post_send_control(rdma, 3143 (uint8_t *) results, ®_resp); 3144 3145 if (ret < 0) { 3146 error_report("Failed to send control buffer"); 3147 goto out; 3148 } 3149 break; 3150 case RDMA_CONTROL_UNREGISTER_REQUEST: 3151 trace_qemu_rdma_registration_handle_unregister(head.repeat); 3152 unreg_resp.repeat = head.repeat; 3153 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3154 3155 for (count = 0; count < head.repeat; count++) { 3156 reg = ®isters[count]; 3157 network_to_register(reg); 3158 3159 trace_qemu_rdma_registration_handle_unregister_loop(count, 3160 reg->current_index, reg->key.chunk); 3161 3162 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3163 3164 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3165 block->pmr[reg->key.chunk] = NULL; 3166 3167 if (ret != 0) { 3168 perror("rdma unregistration chunk failed"); 3169 ret = -ret; 3170 goto out; 3171 } 3172 3173 rdma->total_registrations--; 3174 3175 trace_qemu_rdma_registration_handle_unregister_success( 3176 reg->key.chunk); 3177 } 3178 3179 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp); 3180 3181 if (ret < 0) { 3182 error_report("Failed to send control buffer"); 3183 goto out; 3184 } 3185 break; 3186 case RDMA_CONTROL_REGISTER_RESULT: 3187 error_report("Invalid RESULT message at dest."); 3188 ret = -EIO; 3189 goto out; 3190 default: 3191 error_report("Unknown control message %s", control_desc[head.type]); 3192 ret = -EIO; 3193 goto out; 3194 } 3195 } while (1); 3196 out: 3197 if (ret < 0) { 3198 rdma->error_state = ret; 3199 } 3200 return ret; 3201 } 3202 3203 /* Destination: 3204 * Called via a ram_control_load_hook during the initial RAM load section which 3205 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks 3206 * on the source. 3207 * We've already built our local RAMBlock list, but not yet sent the list to 3208 * the source. 3209 */ 3210 static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char *name) 3211 { 3212 RDMAContext *rdma = rfile->rdma; 3213 int curr; 3214 int found = -1; 3215 3216 /* Find the matching RAMBlock in our local list */ 3217 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { 3218 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { 3219 found = curr; 3220 break; 3221 } 3222 } 3223 3224 if (found == -1) { 3225 error_report("RAMBlock '%s' not found on destination", name); 3226 return -ENOENT; 3227 } 3228 3229 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index; 3230 trace_rdma_block_notification_handle(name, rdma->next_src_index); 3231 rdma->next_src_index++; 3232 3233 return 0; 3234 } 3235 3236 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) 3237 { 3238 switch (flags) { 3239 case RAM_CONTROL_BLOCK_REG: 3240 return rdma_block_notification_handle(opaque, data); 3241 3242 case RAM_CONTROL_HOOK: 3243 return qemu_rdma_registration_handle(f, opaque); 3244 3245 default: 3246 /* Shouldn't be called with any other values */ 3247 abort(); 3248 } 3249 } 3250 3251 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, 3252 uint64_t flags, void *data) 3253 { 3254 QEMUFileRDMA *rfile = opaque; 3255 RDMAContext *rdma = rfile->rdma; 3256 3257 CHECK_ERROR_STATE(); 3258 3259 trace_qemu_rdma_registration_start(flags); 3260 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3261 qemu_fflush(f); 3262 3263 return 0; 3264 } 3265 3266 /* 3267 * Inform dest that dynamic registrations are done for now. 3268 * First, flush writes, if any. 3269 */ 3270 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, 3271 uint64_t flags, void *data) 3272 { 3273 Error *local_err = NULL, **errp = &local_err; 3274 QEMUFileRDMA *rfile = opaque; 3275 RDMAContext *rdma = rfile->rdma; 3276 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3277 int ret = 0; 3278 3279 CHECK_ERROR_STATE(); 3280 3281 qemu_fflush(f); 3282 ret = qemu_rdma_drain_cq(f, rdma); 3283 3284 if (ret < 0) { 3285 goto err; 3286 } 3287 3288 if (flags == RAM_CONTROL_SETUP) { 3289 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3290 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3291 int reg_result_idx, i, nb_dest_blocks; 3292 3293 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3294 trace_qemu_rdma_registration_stop_ram(); 3295 3296 /* 3297 * Make sure that we parallelize the pinning on both sides. 3298 * For very large guests, doing this serially takes a really 3299 * long time, so we have to 'interleave' the pinning locally 3300 * with the control messages by performing the pinning on this 3301 * side before we receive the control response from the other 3302 * side that the pinning has completed. 3303 */ 3304 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3305 ®_result_idx, rdma->pin_all ? 3306 qemu_rdma_reg_whole_ram_blocks : NULL); 3307 if (ret < 0) { 3308 ERROR(errp, "receiving remote info!"); 3309 return ret; 3310 } 3311 3312 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3313 3314 /* 3315 * The protocol uses two different sets of rkeys (mutually exclusive): 3316 * 1. One key to represent the virtual address of the entire ram block. 3317 * (dynamic chunk registration disabled - pin everything with one rkey.) 3318 * 2. One to represent individual chunks within a ram block. 3319 * (dynamic chunk registration enabled - pin individual chunks.) 3320 * 3321 * Once the capability is successfully negotiated, the destination transmits 3322 * the keys to use (or sends them later) including the virtual addresses 3323 * and then propagates the remote ram block descriptions to his local copy. 3324 */ 3325 3326 if (local->nb_blocks != nb_dest_blocks) { 3327 ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) " 3328 "Your QEMU command line parameters are probably " 3329 "not identical on both the source and destination.", 3330 local->nb_blocks, nb_dest_blocks); 3331 rdma->error_state = -EINVAL; 3332 return -EINVAL; 3333 } 3334 3335 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 3336 memcpy(rdma->dest_blocks, 3337 rdma->wr_data[reg_result_idx].control_curr, resp.len); 3338 for (i = 0; i < nb_dest_blocks; i++) { 3339 network_to_dest_block(&rdma->dest_blocks[i]); 3340 3341 /* We require that the blocks are in the same order */ 3342 if (rdma->dest_blocks[i].length != local->block[i].length) { 3343 ERROR(errp, "Block %s/%d has a different length %" PRIu64 3344 "vs %" PRIu64, local->block[i].block_name, i, 3345 local->block[i].length, 3346 rdma->dest_blocks[i].length); 3347 rdma->error_state = -EINVAL; 3348 return -EINVAL; 3349 } 3350 local->block[i].remote_host_addr = 3351 rdma->dest_blocks[i].remote_host_addr; 3352 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; 3353 } 3354 } 3355 3356 trace_qemu_rdma_registration_stop(flags); 3357 3358 head.type = RDMA_CONTROL_REGISTER_FINISHED; 3359 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); 3360 3361 if (ret < 0) { 3362 goto err; 3363 } 3364 3365 return 0; 3366 err: 3367 rdma->error_state = ret; 3368 return ret; 3369 } 3370 3371 static int qemu_rdma_get_fd(void *opaque) 3372 { 3373 QEMUFileRDMA *rfile = opaque; 3374 RDMAContext *rdma = rfile->rdma; 3375 3376 return rdma->comp_channel->fd; 3377 } 3378 3379 static const QEMUFileOps rdma_read_ops = { 3380 .get_buffer = qemu_rdma_get_buffer, 3381 .get_fd = qemu_rdma_get_fd, 3382 .close = qemu_rdma_close, 3383 .hook_ram_load = rdma_load_hook, 3384 }; 3385 3386 static const QEMUFileOps rdma_write_ops = { 3387 .put_buffer = qemu_rdma_put_buffer, 3388 .close = qemu_rdma_close, 3389 .before_ram_iterate = qemu_rdma_registration_start, 3390 .after_ram_iterate = qemu_rdma_registration_stop, 3391 .save_page = qemu_rdma_save_page, 3392 }; 3393 3394 static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) 3395 { 3396 QEMUFileRDMA *r; 3397 3398 if (qemu_file_mode_is_not_valid(mode)) { 3399 return NULL; 3400 } 3401 3402 r = g_new0(QEMUFileRDMA, 1); 3403 r->rdma = rdma; 3404 3405 if (mode[0] == 'w') { 3406 r->file = qemu_fopen_ops(r, &rdma_write_ops); 3407 } else { 3408 r->file = qemu_fopen_ops(r, &rdma_read_ops); 3409 } 3410 3411 return r->file; 3412 } 3413 3414 static void rdma_accept_incoming_migration(void *opaque) 3415 { 3416 RDMAContext *rdma = opaque; 3417 int ret; 3418 QEMUFile *f; 3419 Error *local_err = NULL, **errp = &local_err; 3420 3421 trace_qemu_rdma_accept_incoming_migration(); 3422 ret = qemu_rdma_accept(rdma); 3423 3424 if (ret) { 3425 ERROR(errp, "RDMA Migration initialization failed!"); 3426 return; 3427 } 3428 3429 trace_qemu_rdma_accept_incoming_migration_accepted(); 3430 3431 f = qemu_fopen_rdma(rdma, "rb"); 3432 if (f == NULL) { 3433 ERROR(errp, "could not qemu_fopen_rdma!"); 3434 qemu_rdma_cleanup(rdma); 3435 return; 3436 } 3437 3438 rdma->migration_started_on_destination = 1; 3439 process_incoming_migration(f); 3440 } 3441 3442 void rdma_start_incoming_migration(const char *host_port, Error **errp) 3443 { 3444 int ret; 3445 RDMAContext *rdma; 3446 Error *local_err = NULL; 3447 3448 trace_rdma_start_incoming_migration(); 3449 rdma = qemu_rdma_data_init(host_port, &local_err); 3450 3451 if (rdma == NULL) { 3452 goto err; 3453 } 3454 3455 ret = qemu_rdma_dest_init(rdma, &local_err); 3456 3457 if (ret) { 3458 goto err; 3459 } 3460 3461 trace_rdma_start_incoming_migration_after_dest_init(); 3462 3463 ret = rdma_listen(rdma->listen_id, 5); 3464 3465 if (ret) { 3466 ERROR(errp, "listening on socket!"); 3467 goto err; 3468 } 3469 3470 trace_rdma_start_incoming_migration_after_rdma_listen(); 3471 3472 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3473 NULL, (void *)(intptr_t)rdma); 3474 return; 3475 err: 3476 error_propagate(errp, local_err); 3477 g_free(rdma); 3478 } 3479 3480 void rdma_start_outgoing_migration(void *opaque, 3481 const char *host_port, Error **errp) 3482 { 3483 MigrationState *s = opaque; 3484 Error *local_err = NULL, **temp = &local_err; 3485 RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err); 3486 int ret = 0; 3487 3488 if (rdma == NULL) { 3489 ERROR(temp, "Failed to initialize RDMA data structures! %d", ret); 3490 goto err; 3491 } 3492 3493 ret = qemu_rdma_source_init(rdma, &local_err, 3494 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]); 3495 3496 if (ret) { 3497 goto err; 3498 } 3499 3500 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 3501 ret = qemu_rdma_connect(rdma, &local_err); 3502 3503 if (ret) { 3504 goto err; 3505 } 3506 3507 trace_rdma_start_outgoing_migration_after_rdma_connect(); 3508 3509 s->file = qemu_fopen_rdma(rdma, "wb"); 3510 migrate_fd_connect(s); 3511 return; 3512 err: 3513 error_propagate(errp, local_err); 3514 g_free(rdma); 3515 migrate_fd_error(s); 3516 } 3517