1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * 6 * Authors: 7 * Michael R. Hines <mrhines@us.ibm.com> 8 * Jiuxing Liu <jl@us.ibm.com> 9 * 10 * This work is licensed under the terms of the GNU GPL, version 2 or 11 * later. See the COPYING file in the top-level directory. 12 * 13 */ 14 #include "qemu-common.h" 15 #include "migration/migration.h" 16 #include "migration/qemu-file.h" 17 #include "exec/cpu-common.h" 18 #include "qemu/error-report.h" 19 #include "qemu/main-loop.h" 20 #include "qemu/sockets.h" 21 #include "qemu/bitmap.h" 22 #include "block/coroutine.h" 23 #include <stdio.h> 24 #include <sys/types.h> 25 #include <sys/socket.h> 26 #include <netdb.h> 27 #include <arpa/inet.h> 28 #include <string.h> 29 #include <rdma/rdma_cma.h> 30 #include "trace.h" 31 32 /* 33 * Print and error on both the Monitor and the Log file. 34 */ 35 #define ERROR(errp, fmt, ...) \ 36 do { \ 37 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \ 38 if (errp && (*(errp) == NULL)) { \ 39 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \ 40 } \ 41 } while (0) 42 43 #define RDMA_RESOLVE_TIMEOUT_MS 10000 44 45 /* Do not merge data if larger than this. */ 46 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 47 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 48 49 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 50 51 /* 52 * This is only for non-live state being migrated. 53 * Instead of RDMA_WRITE messages, we use RDMA_SEND 54 * messages for that state, which requires a different 55 * delivery design than main memory. 56 */ 57 #define RDMA_SEND_INCREMENT 32768 58 59 /* 60 * Maximum size infiniband SEND message 61 */ 62 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 63 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 64 65 #define RDMA_CONTROL_VERSION_CURRENT 1 66 /* 67 * Capabilities for negotiation. 68 */ 69 #define RDMA_CAPABILITY_PIN_ALL 0x01 70 71 /* 72 * Add the other flags above to this list of known capabilities 73 * as they are introduced. 74 */ 75 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 76 77 #define CHECK_ERROR_STATE() \ 78 do { \ 79 if (rdma->error_state) { \ 80 if (!rdma->error_reported) { \ 81 error_report("RDMA is in an error state waiting migration" \ 82 " to abort!"); \ 83 rdma->error_reported = 1; \ 84 } \ 85 return rdma->error_state; \ 86 } \ 87 } while (0); 88 89 /* 90 * A work request ID is 64-bits and we split up these bits 91 * into 3 parts: 92 * 93 * bits 0-15 : type of control message, 2^16 94 * bits 16-29: ram block index, 2^14 95 * bits 30-63: ram block chunk number, 2^34 96 * 97 * The last two bit ranges are only used for RDMA writes, 98 * in order to track their completion and potentially 99 * also track unregistration status of the message. 100 */ 101 #define RDMA_WRID_TYPE_SHIFT 0UL 102 #define RDMA_WRID_BLOCK_SHIFT 16UL 103 #define RDMA_WRID_CHUNK_SHIFT 30UL 104 105 #define RDMA_WRID_TYPE_MASK \ 106 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 107 108 #define RDMA_WRID_BLOCK_MASK \ 109 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 110 111 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 112 113 /* 114 * RDMA migration protocol: 115 * 1. RDMA Writes (data messages, i.e. RAM) 116 * 2. IB Send/Recv (control channel messages) 117 */ 118 enum { 119 RDMA_WRID_NONE = 0, 120 RDMA_WRID_RDMA_WRITE = 1, 121 RDMA_WRID_SEND_CONTROL = 2000, 122 RDMA_WRID_RECV_CONTROL = 4000, 123 }; 124 125 static const char *wrid_desc[] = { 126 [RDMA_WRID_NONE] = "NONE", 127 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA", 128 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND", 129 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", 130 }; 131 132 /* 133 * Work request IDs for IB SEND messages only (not RDMA writes). 134 * This is used by the migration protocol to transmit 135 * control messages (such as device state and registration commands) 136 * 137 * We could use more WRs, but we have enough for now. 138 */ 139 enum { 140 RDMA_WRID_READY = 0, 141 RDMA_WRID_DATA, 142 RDMA_WRID_CONTROL, 143 RDMA_WRID_MAX, 144 }; 145 146 /* 147 * SEND/RECV IB Control Messages. 148 */ 149 enum { 150 RDMA_CONTROL_NONE = 0, 151 RDMA_CONTROL_ERROR, 152 RDMA_CONTROL_READY, /* ready to receive */ 153 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 154 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 155 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 156 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 157 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 158 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 159 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 160 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 161 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 162 }; 163 164 static const char *control_desc[] = { 165 [RDMA_CONTROL_NONE] = "NONE", 166 [RDMA_CONTROL_ERROR] = "ERROR", 167 [RDMA_CONTROL_READY] = "READY", 168 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 169 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 170 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 171 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 172 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 173 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 174 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 175 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 176 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 177 }; 178 179 /* 180 * Memory and MR structures used to represent an IB Send/Recv work request. 181 * This is *not* used for RDMA writes, only IB Send/Recv. 182 */ 183 typedef struct { 184 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 185 struct ibv_mr *control_mr; /* registration metadata */ 186 size_t control_len; /* length of the message */ 187 uint8_t *control_curr; /* start of unconsumed bytes */ 188 } RDMAWorkRequestData; 189 190 /* 191 * Negotiate RDMA capabilities during connection-setup time. 192 */ 193 typedef struct { 194 uint32_t version; 195 uint32_t flags; 196 } RDMACapabilities; 197 198 static void caps_to_network(RDMACapabilities *cap) 199 { 200 cap->version = htonl(cap->version); 201 cap->flags = htonl(cap->flags); 202 } 203 204 static void network_to_caps(RDMACapabilities *cap) 205 { 206 cap->version = ntohl(cap->version); 207 cap->flags = ntohl(cap->flags); 208 } 209 210 /* 211 * Representation of a RAMBlock from an RDMA perspective. 212 * This is not transmitted, only local. 213 * This and subsequent structures cannot be linked lists 214 * because we're using a single IB message to transmit 215 * the information. It's small anyway, so a list is overkill. 216 */ 217 typedef struct RDMALocalBlock { 218 char *block_name; 219 uint8_t *local_host_addr; /* local virtual address */ 220 uint64_t remote_host_addr; /* remote virtual address */ 221 uint64_t offset; 222 uint64_t length; 223 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 224 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 225 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 226 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 227 int index; /* which block are we */ 228 unsigned int src_index; /* (Only used on dest) */ 229 bool is_ram_block; 230 int nb_chunks; 231 unsigned long *transit_bitmap; 232 unsigned long *unregister_bitmap; 233 } RDMALocalBlock; 234 235 /* 236 * Also represents a RAMblock, but only on the dest. 237 * This gets transmitted by the dest during connection-time 238 * to the source VM and then is used to populate the 239 * corresponding RDMALocalBlock with 240 * the information needed to perform the actual RDMA. 241 */ 242 typedef struct QEMU_PACKED RDMADestBlock { 243 uint64_t remote_host_addr; 244 uint64_t offset; 245 uint64_t length; 246 uint32_t remote_rkey; 247 uint32_t padding; 248 } RDMADestBlock; 249 250 static uint64_t htonll(uint64_t v) 251 { 252 union { uint32_t lv[2]; uint64_t llv; } u; 253 u.lv[0] = htonl(v >> 32); 254 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 255 return u.llv; 256 } 257 258 static uint64_t ntohll(uint64_t v) { 259 union { uint32_t lv[2]; uint64_t llv; } u; 260 u.llv = v; 261 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 262 } 263 264 static void dest_block_to_network(RDMADestBlock *db) 265 { 266 db->remote_host_addr = htonll(db->remote_host_addr); 267 db->offset = htonll(db->offset); 268 db->length = htonll(db->length); 269 db->remote_rkey = htonl(db->remote_rkey); 270 } 271 272 static void network_to_dest_block(RDMADestBlock *db) 273 { 274 db->remote_host_addr = ntohll(db->remote_host_addr); 275 db->offset = ntohll(db->offset); 276 db->length = ntohll(db->length); 277 db->remote_rkey = ntohl(db->remote_rkey); 278 } 279 280 /* 281 * Virtual address of the above structures used for transmitting 282 * the RAMBlock descriptions at connection-time. 283 * This structure is *not* transmitted. 284 */ 285 typedef struct RDMALocalBlocks { 286 int nb_blocks; 287 bool init; /* main memory init complete */ 288 RDMALocalBlock *block; 289 } RDMALocalBlocks; 290 291 /* 292 * Main data structure for RDMA state. 293 * While there is only one copy of this structure being allocated right now, 294 * this is the place where one would start if you wanted to consider 295 * having more than one RDMA connection open at the same time. 296 */ 297 typedef struct RDMAContext { 298 char *host; 299 int port; 300 301 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 302 303 /* 304 * This is used by *_exchange_send() to figure out whether or not 305 * the initial "READY" message has already been received or not. 306 * This is because other functions may potentially poll() and detect 307 * the READY message before send() does, in which case we need to 308 * know if it completed. 309 */ 310 int control_ready_expected; 311 312 /* number of outstanding writes */ 313 int nb_sent; 314 315 /* store info about current buffer so that we can 316 merge it with future sends */ 317 uint64_t current_addr; 318 uint64_t current_length; 319 /* index of ram block the current buffer belongs to */ 320 int current_index; 321 /* index of the chunk in the current ram block */ 322 int current_chunk; 323 324 bool pin_all; 325 326 /* 327 * infiniband-specific variables for opening the device 328 * and maintaining connection state and so forth. 329 * 330 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 331 * cm_id->verbs, cm_id->channel, and cm_id->qp. 332 */ 333 struct rdma_cm_id *cm_id; /* connection manager ID */ 334 struct rdma_cm_id *listen_id; 335 bool connected; 336 337 struct ibv_context *verbs; 338 struct rdma_event_channel *channel; 339 struct ibv_qp *qp; /* queue pair */ 340 struct ibv_comp_channel *comp_channel; /* completion channel */ 341 struct ibv_pd *pd; /* protection domain */ 342 struct ibv_cq *cq; /* completion queue */ 343 344 /* 345 * If a previous write failed (perhaps because of a failed 346 * memory registration, then do not attempt any future work 347 * and remember the error state. 348 */ 349 int error_state; 350 int error_reported; 351 352 /* 353 * Description of ram blocks used throughout the code. 354 */ 355 RDMALocalBlocks local_ram_blocks; 356 RDMADestBlock *dest_blocks; 357 358 /* Index of the next RAMBlock received during block registration */ 359 unsigned int next_src_index; 360 361 /* 362 * Migration on *destination* started. 363 * Then use coroutine yield function. 364 * Source runs in a thread, so we don't care. 365 */ 366 int migration_started_on_destination; 367 368 int total_registrations; 369 int total_writes; 370 371 int unregister_current, unregister_next; 372 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 373 374 GHashTable *blockmap; 375 } RDMAContext; 376 377 /* 378 * Interface to the rest of the migration call stack. 379 */ 380 typedef struct QEMUFileRDMA { 381 RDMAContext *rdma; 382 size_t len; 383 void *file; 384 } QEMUFileRDMA; 385 386 /* 387 * Main structure for IB Send/Recv control messages. 388 * This gets prepended at the beginning of every Send/Recv. 389 */ 390 typedef struct QEMU_PACKED { 391 uint32_t len; /* Total length of data portion */ 392 uint32_t type; /* which control command to perform */ 393 uint32_t repeat; /* number of commands in data portion of same type */ 394 uint32_t padding; 395 } RDMAControlHeader; 396 397 static void control_to_network(RDMAControlHeader *control) 398 { 399 control->type = htonl(control->type); 400 control->len = htonl(control->len); 401 control->repeat = htonl(control->repeat); 402 } 403 404 static void network_to_control(RDMAControlHeader *control) 405 { 406 control->type = ntohl(control->type); 407 control->len = ntohl(control->len); 408 control->repeat = ntohl(control->repeat); 409 } 410 411 /* 412 * Register a single Chunk. 413 * Information sent by the source VM to inform the dest 414 * to register an single chunk of memory before we can perform 415 * the actual RDMA operation. 416 */ 417 typedef struct QEMU_PACKED { 418 union QEMU_PACKED { 419 uint64_t current_addr; /* offset into the ram_addr_t space */ 420 uint64_t chunk; /* chunk to lookup if unregistering */ 421 } key; 422 uint32_t current_index; /* which ramblock the chunk belongs to */ 423 uint32_t padding; 424 uint64_t chunks; /* how many sequential chunks to register */ 425 } RDMARegister; 426 427 static void register_to_network(RDMAContext *rdma, RDMARegister *reg) 428 { 429 RDMALocalBlock *local_block; 430 local_block = &rdma->local_ram_blocks.block[reg->current_index]; 431 432 if (local_block->is_ram_block) { 433 /* 434 * current_addr as passed in is an address in the local ram_addr_t 435 * space, we need to translate this for the destination 436 */ 437 reg->key.current_addr -= local_block->offset; 438 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset; 439 } 440 reg->key.current_addr = htonll(reg->key.current_addr); 441 reg->current_index = htonl(reg->current_index); 442 reg->chunks = htonll(reg->chunks); 443 } 444 445 static void network_to_register(RDMARegister *reg) 446 { 447 reg->key.current_addr = ntohll(reg->key.current_addr); 448 reg->current_index = ntohl(reg->current_index); 449 reg->chunks = ntohll(reg->chunks); 450 } 451 452 typedef struct QEMU_PACKED { 453 uint32_t value; /* if zero, we will madvise() */ 454 uint32_t block_idx; /* which ram block index */ 455 uint64_t offset; /* Address in remote ram_addr_t space */ 456 uint64_t length; /* length of the chunk */ 457 } RDMACompress; 458 459 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp) 460 { 461 comp->value = htonl(comp->value); 462 /* 463 * comp->offset as passed in is an address in the local ram_addr_t 464 * space, we need to translate this for the destination 465 */ 466 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset; 467 comp->offset += rdma->dest_blocks[comp->block_idx].offset; 468 comp->block_idx = htonl(comp->block_idx); 469 comp->offset = htonll(comp->offset); 470 comp->length = htonll(comp->length); 471 } 472 473 static void network_to_compress(RDMACompress *comp) 474 { 475 comp->value = ntohl(comp->value); 476 comp->block_idx = ntohl(comp->block_idx); 477 comp->offset = ntohll(comp->offset); 478 comp->length = ntohll(comp->length); 479 } 480 481 /* 482 * The result of the dest's memory registration produces an "rkey" 483 * which the source VM must reference in order to perform 484 * the RDMA operation. 485 */ 486 typedef struct QEMU_PACKED { 487 uint32_t rkey; 488 uint32_t padding; 489 uint64_t host_addr; 490 } RDMARegisterResult; 491 492 static void result_to_network(RDMARegisterResult *result) 493 { 494 result->rkey = htonl(result->rkey); 495 result->host_addr = htonll(result->host_addr); 496 }; 497 498 static void network_to_result(RDMARegisterResult *result) 499 { 500 result->rkey = ntohl(result->rkey); 501 result->host_addr = ntohll(result->host_addr); 502 }; 503 504 const char *print_wrid(int wrid); 505 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 506 uint8_t *data, RDMAControlHeader *resp, 507 int *resp_idx, 508 int (*callback)(RDMAContext *rdma)); 509 510 static inline uint64_t ram_chunk_index(const uint8_t *start, 511 const uint8_t *host) 512 { 513 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 514 } 515 516 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 517 uint64_t i) 518 { 519 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 520 (i << RDMA_REG_CHUNK_SHIFT)); 521 } 522 523 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 524 uint64_t i) 525 { 526 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 527 (1UL << RDMA_REG_CHUNK_SHIFT); 528 529 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 530 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 531 } 532 533 return result; 534 } 535 536 static int rdma_add_block(RDMAContext *rdma, const char *block_name, 537 void *host_addr, 538 ram_addr_t block_offset, uint64_t length) 539 { 540 RDMALocalBlocks *local = &rdma->local_ram_blocks; 541 RDMALocalBlock *block; 542 RDMALocalBlock *old = local->block; 543 544 local->block = g_malloc0(sizeof(RDMALocalBlock) * (local->nb_blocks + 1)); 545 546 if (local->nb_blocks) { 547 int x; 548 549 if (rdma->blockmap) { 550 for (x = 0; x < local->nb_blocks; x++) { 551 g_hash_table_remove(rdma->blockmap, 552 (void *)(uintptr_t)old[x].offset); 553 g_hash_table_insert(rdma->blockmap, 554 (void *)(uintptr_t)old[x].offset, 555 &local->block[x]); 556 } 557 } 558 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 559 g_free(old); 560 } 561 562 block = &local->block[local->nb_blocks]; 563 564 block->block_name = g_strdup(block_name); 565 block->local_host_addr = host_addr; 566 block->offset = block_offset; 567 block->length = length; 568 block->index = local->nb_blocks; 569 block->src_index = ~0U; /* Filled in by the receipt of the block list */ 570 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 571 block->transit_bitmap = bitmap_new(block->nb_chunks); 572 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 573 block->unregister_bitmap = bitmap_new(block->nb_chunks); 574 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 575 block->remote_keys = g_malloc0(block->nb_chunks * sizeof(uint32_t)); 576 577 block->is_ram_block = local->init ? false : true; 578 579 if (rdma->blockmap) { 580 g_hash_table_insert(rdma->blockmap, (void *) block_offset, block); 581 } 582 583 trace_rdma_add_block(block_name, local->nb_blocks, 584 (uintptr_t) block->local_host_addr, 585 block->offset, block->length, 586 (uintptr_t) (block->local_host_addr + block->length), 587 BITS_TO_LONGS(block->nb_chunks) * 588 sizeof(unsigned long) * 8, 589 block->nb_chunks); 590 591 local->nb_blocks++; 592 593 return 0; 594 } 595 596 /* 597 * Memory regions need to be registered with the device and queue pairs setup 598 * in advanced before the migration starts. This tells us where the RAM blocks 599 * are so that we can register them individually. 600 */ 601 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr, 602 ram_addr_t block_offset, ram_addr_t length, void *opaque) 603 { 604 return rdma_add_block(opaque, block_name, host_addr, block_offset, length); 605 } 606 607 /* 608 * Identify the RAMBlocks and their quantity. They will be references to 609 * identify chunk boundaries inside each RAMBlock and also be referenced 610 * during dynamic page registration. 611 */ 612 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma) 613 { 614 RDMALocalBlocks *local = &rdma->local_ram_blocks; 615 616 assert(rdma->blockmap == NULL); 617 memset(local, 0, sizeof *local); 618 qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma); 619 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 620 rdma->dest_blocks = (RDMADestBlock *) g_malloc0(sizeof(RDMADestBlock) * 621 rdma->local_ram_blocks.nb_blocks); 622 local->init = true; 623 return 0; 624 } 625 626 /* 627 * Note: If used outside of cleanup, the caller must ensure that the destination 628 * block structures are also updated 629 */ 630 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) 631 { 632 RDMALocalBlocks *local = &rdma->local_ram_blocks; 633 RDMALocalBlock *old = local->block; 634 int x; 635 636 if (rdma->blockmap) { 637 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset); 638 } 639 if (block->pmr) { 640 int j; 641 642 for (j = 0; j < block->nb_chunks; j++) { 643 if (!block->pmr[j]) { 644 continue; 645 } 646 ibv_dereg_mr(block->pmr[j]); 647 rdma->total_registrations--; 648 } 649 g_free(block->pmr); 650 block->pmr = NULL; 651 } 652 653 if (block->mr) { 654 ibv_dereg_mr(block->mr); 655 rdma->total_registrations--; 656 block->mr = NULL; 657 } 658 659 g_free(block->transit_bitmap); 660 block->transit_bitmap = NULL; 661 662 g_free(block->unregister_bitmap); 663 block->unregister_bitmap = NULL; 664 665 g_free(block->remote_keys); 666 block->remote_keys = NULL; 667 668 g_free(block->block_name); 669 block->block_name = NULL; 670 671 if (rdma->blockmap) { 672 for (x = 0; x < local->nb_blocks; x++) { 673 g_hash_table_remove(rdma->blockmap, 674 (void *)(uintptr_t)old[x].offset); 675 } 676 } 677 678 if (local->nb_blocks > 1) { 679 680 local->block = g_malloc0(sizeof(RDMALocalBlock) * 681 (local->nb_blocks - 1)); 682 683 if (block->index) { 684 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 685 } 686 687 if (block->index < (local->nb_blocks - 1)) { 688 memcpy(local->block + block->index, old + (block->index + 1), 689 sizeof(RDMALocalBlock) * 690 (local->nb_blocks - (block->index + 1))); 691 } 692 } else { 693 assert(block == local->block); 694 local->block = NULL; 695 } 696 697 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr, 698 block->offset, block->length, 699 (uintptr_t)(block->local_host_addr + block->length), 700 BITS_TO_LONGS(block->nb_chunks) * 701 sizeof(unsigned long) * 8, block->nb_chunks); 702 703 g_free(old); 704 705 local->nb_blocks--; 706 707 if (local->nb_blocks && rdma->blockmap) { 708 for (x = 0; x < local->nb_blocks; x++) { 709 g_hash_table_insert(rdma->blockmap, 710 (void *)(uintptr_t)local->block[x].offset, 711 &local->block[x]); 712 } 713 } 714 715 return 0; 716 } 717 718 /* 719 * Put in the log file which RDMA device was opened and the details 720 * associated with that device. 721 */ 722 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 723 { 724 struct ibv_port_attr port; 725 726 if (ibv_query_port(verbs, 1, &port)) { 727 error_report("Failed to query port information"); 728 return; 729 } 730 731 printf("%s RDMA Device opened: kernel name %s " 732 "uverbs device name %s, " 733 "infiniband_verbs class device path %s, " 734 "infiniband class device path %s, " 735 "transport: (%d) %s\n", 736 who, 737 verbs->device->name, 738 verbs->device->dev_name, 739 verbs->device->dev_path, 740 verbs->device->ibdev_path, 741 port.link_layer, 742 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" : 743 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 744 ? "Ethernet" : "Unknown")); 745 } 746 747 /* 748 * Put in the log file the RDMA gid addressing information, 749 * useful for folks who have trouble understanding the 750 * RDMA device hierarchy in the kernel. 751 */ 752 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 753 { 754 char sgid[33]; 755 char dgid[33]; 756 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 757 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 758 trace_qemu_rdma_dump_gid(who, sgid, dgid); 759 } 760 761 /* 762 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 763 * We will try the next addrinfo struct, and fail if there are 764 * no other valid addresses to bind against. 765 * 766 * If user is listening on '[::]', then we will not have a opened a device 767 * yet and have no way of verifying if the device is RoCE or not. 768 * 769 * In this case, the source VM will throw an error for ALL types of 770 * connections (both IPv4 and IPv6) if the destination machine does not have 771 * a regular infiniband network available for use. 772 * 773 * The only way to guarantee that an error is thrown for broken kernels is 774 * for the management software to choose a *specific* interface at bind time 775 * and validate what time of hardware it is. 776 * 777 * Unfortunately, this puts the user in a fix: 778 * 779 * If the source VM connects with an IPv4 address without knowing that the 780 * destination has bound to '[::]' the migration will unconditionally fail 781 * unless the management software is explicitly listening on the the IPv4 782 * address while using a RoCE-based device. 783 * 784 * If the source VM connects with an IPv6 address, then we're OK because we can 785 * throw an error on the source (and similarly on the destination). 786 * 787 * But in mixed environments, this will be broken for a while until it is fixed 788 * inside linux. 789 * 790 * We do provide a *tiny* bit of help in this function: We can list all of the 791 * devices in the system and check to see if all the devices are RoCE or 792 * Infiniband. 793 * 794 * If we detect that we have a *pure* RoCE environment, then we can safely 795 * thrown an error even if the management software has specified '[::]' as the 796 * bind address. 797 * 798 * However, if there is are multiple hetergeneous devices, then we cannot make 799 * this assumption and the user just has to be sure they know what they are 800 * doing. 801 * 802 * Patches are being reviewed on linux-rdma. 803 */ 804 static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs) 805 { 806 struct ibv_port_attr port_attr; 807 808 /* This bug only exists in linux, to our knowledge. */ 809 #ifdef CONFIG_LINUX 810 811 /* 812 * Verbs are only NULL if management has bound to '[::]'. 813 * 814 * Let's iterate through all the devices and see if there any pure IB 815 * devices (non-ethernet). 816 * 817 * If not, then we can safely proceed with the migration. 818 * Otherwise, there are no guarantees until the bug is fixed in linux. 819 */ 820 if (!verbs) { 821 int num_devices, x; 822 struct ibv_device ** dev_list = ibv_get_device_list(&num_devices); 823 bool roce_found = false; 824 bool ib_found = false; 825 826 for (x = 0; x < num_devices; x++) { 827 verbs = ibv_open_device(dev_list[x]); 828 if (!verbs) { 829 if (errno == EPERM) { 830 continue; 831 } else { 832 return -EINVAL; 833 } 834 } 835 836 if (ibv_query_port(verbs, 1, &port_attr)) { 837 ibv_close_device(verbs); 838 ERROR(errp, "Could not query initial IB port"); 839 return -EINVAL; 840 } 841 842 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 843 ib_found = true; 844 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 845 roce_found = true; 846 } 847 848 ibv_close_device(verbs); 849 850 } 851 852 if (roce_found) { 853 if (ib_found) { 854 fprintf(stderr, "WARN: migrations may fail:" 855 " IPv6 over RoCE / iWARP in linux" 856 " is broken. But since you appear to have a" 857 " mixed RoCE / IB environment, be sure to only" 858 " migrate over the IB fabric until the kernel " 859 " fixes the bug.\n"); 860 } else { 861 ERROR(errp, "You only have RoCE / iWARP devices in your systems" 862 " and your management software has specified '[::]'" 863 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 864 return -ENONET; 865 } 866 } 867 868 return 0; 869 } 870 871 /* 872 * If we have a verbs context, that means that some other than '[::]' was 873 * used by the management software for binding. In which case we can 874 * actually warn the user about a potentially broken kernel. 875 */ 876 877 /* IB ports start with 1, not 0 */ 878 if (ibv_query_port(verbs, 1, &port_attr)) { 879 ERROR(errp, "Could not query initial IB port"); 880 return -EINVAL; 881 } 882 883 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 884 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 " 885 "(but patches on linux-rdma in progress)"); 886 return -ENONET; 887 } 888 889 #endif 890 891 return 0; 892 } 893 894 /* 895 * Figure out which RDMA device corresponds to the requested IP hostname 896 * Also create the initial connection manager identifiers for opening 897 * the connection. 898 */ 899 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 900 { 901 int ret; 902 struct rdma_addrinfo *res; 903 char port_str[16]; 904 struct rdma_cm_event *cm_event; 905 char ip[40] = "unknown"; 906 struct rdma_addrinfo *e; 907 908 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 909 ERROR(errp, "RDMA hostname has not been set"); 910 return -EINVAL; 911 } 912 913 /* create CM channel */ 914 rdma->channel = rdma_create_event_channel(); 915 if (!rdma->channel) { 916 ERROR(errp, "could not create CM channel"); 917 return -EINVAL; 918 } 919 920 /* create CM id */ 921 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 922 if (ret) { 923 ERROR(errp, "could not create channel id"); 924 goto err_resolve_create_id; 925 } 926 927 snprintf(port_str, 16, "%d", rdma->port); 928 port_str[15] = '\0'; 929 930 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 931 if (ret < 0) { 932 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 933 goto err_resolve_get_addr; 934 } 935 936 for (e = res; e != NULL; e = e->ai_next) { 937 inet_ntop(e->ai_family, 938 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 939 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 940 941 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 942 RDMA_RESOLVE_TIMEOUT_MS); 943 if (!ret) { 944 if (e->ai_family == AF_INET6) { 945 ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs); 946 if (ret) { 947 continue; 948 } 949 } 950 goto route; 951 } 952 } 953 954 ERROR(errp, "could not resolve address %s", rdma->host); 955 goto err_resolve_get_addr; 956 957 route: 958 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 959 960 ret = rdma_get_cm_event(rdma->channel, &cm_event); 961 if (ret) { 962 ERROR(errp, "could not perform event_addr_resolved"); 963 goto err_resolve_get_addr; 964 } 965 966 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 967 ERROR(errp, "result not equal to event_addr_resolved %s", 968 rdma_event_str(cm_event->event)); 969 perror("rdma_resolve_addr"); 970 rdma_ack_cm_event(cm_event); 971 ret = -EINVAL; 972 goto err_resolve_get_addr; 973 } 974 rdma_ack_cm_event(cm_event); 975 976 /* resolve route */ 977 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 978 if (ret) { 979 ERROR(errp, "could not resolve rdma route"); 980 goto err_resolve_get_addr; 981 } 982 983 ret = rdma_get_cm_event(rdma->channel, &cm_event); 984 if (ret) { 985 ERROR(errp, "could not perform event_route_resolved"); 986 goto err_resolve_get_addr; 987 } 988 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 989 ERROR(errp, "result not equal to event_route_resolved: %s", 990 rdma_event_str(cm_event->event)); 991 rdma_ack_cm_event(cm_event); 992 ret = -EINVAL; 993 goto err_resolve_get_addr; 994 } 995 rdma_ack_cm_event(cm_event); 996 rdma->verbs = rdma->cm_id->verbs; 997 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 998 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 999 return 0; 1000 1001 err_resolve_get_addr: 1002 rdma_destroy_id(rdma->cm_id); 1003 rdma->cm_id = NULL; 1004 err_resolve_create_id: 1005 rdma_destroy_event_channel(rdma->channel); 1006 rdma->channel = NULL; 1007 return ret; 1008 } 1009 1010 /* 1011 * Create protection domain and completion queues 1012 */ 1013 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma) 1014 { 1015 /* allocate pd */ 1016 rdma->pd = ibv_alloc_pd(rdma->verbs); 1017 if (!rdma->pd) { 1018 error_report("failed to allocate protection domain"); 1019 return -1; 1020 } 1021 1022 /* create completion channel */ 1023 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs); 1024 if (!rdma->comp_channel) { 1025 error_report("failed to allocate completion channel"); 1026 goto err_alloc_pd_cq; 1027 } 1028 1029 /* 1030 * Completion queue can be filled by both read and write work requests, 1031 * so must reflect the sum of both possible queue sizes. 1032 */ 1033 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1034 NULL, rdma->comp_channel, 0); 1035 if (!rdma->cq) { 1036 error_report("failed to allocate completion queue"); 1037 goto err_alloc_pd_cq; 1038 } 1039 1040 return 0; 1041 1042 err_alloc_pd_cq: 1043 if (rdma->pd) { 1044 ibv_dealloc_pd(rdma->pd); 1045 } 1046 if (rdma->comp_channel) { 1047 ibv_destroy_comp_channel(rdma->comp_channel); 1048 } 1049 rdma->pd = NULL; 1050 rdma->comp_channel = NULL; 1051 return -1; 1052 1053 } 1054 1055 /* 1056 * Create queue pairs. 1057 */ 1058 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1059 { 1060 struct ibv_qp_init_attr attr = { 0 }; 1061 int ret; 1062 1063 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1064 attr.cap.max_recv_wr = 3; 1065 attr.cap.max_send_sge = 1; 1066 attr.cap.max_recv_sge = 1; 1067 attr.send_cq = rdma->cq; 1068 attr.recv_cq = rdma->cq; 1069 attr.qp_type = IBV_QPT_RC; 1070 1071 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr); 1072 if (ret) { 1073 return -1; 1074 } 1075 1076 rdma->qp = rdma->cm_id->qp; 1077 return 0; 1078 } 1079 1080 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma) 1081 { 1082 int i; 1083 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1084 1085 for (i = 0; i < local->nb_blocks; i++) { 1086 local->block[i].mr = 1087 ibv_reg_mr(rdma->pd, 1088 local->block[i].local_host_addr, 1089 local->block[i].length, 1090 IBV_ACCESS_LOCAL_WRITE | 1091 IBV_ACCESS_REMOTE_WRITE 1092 ); 1093 if (!local->block[i].mr) { 1094 perror("Failed to register local dest ram block!\n"); 1095 break; 1096 } 1097 rdma->total_registrations++; 1098 } 1099 1100 if (i >= local->nb_blocks) { 1101 return 0; 1102 } 1103 1104 for (i--; i >= 0; i--) { 1105 ibv_dereg_mr(local->block[i].mr); 1106 rdma->total_registrations--; 1107 } 1108 1109 return -1; 1110 1111 } 1112 1113 /* 1114 * Find the ram block that corresponds to the page requested to be 1115 * transmitted by QEMU. 1116 * 1117 * Once the block is found, also identify which 'chunk' within that 1118 * block that the page belongs to. 1119 * 1120 * This search cannot fail or the migration will fail. 1121 */ 1122 static int qemu_rdma_search_ram_block(RDMAContext *rdma, 1123 uintptr_t block_offset, 1124 uint64_t offset, 1125 uint64_t length, 1126 uint64_t *block_index, 1127 uint64_t *chunk_index) 1128 { 1129 uint64_t current_addr = block_offset + offset; 1130 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1131 (void *) block_offset); 1132 assert(block); 1133 assert(current_addr >= block->offset); 1134 assert((current_addr + length) <= (block->offset + block->length)); 1135 1136 *block_index = block->index; 1137 *chunk_index = ram_chunk_index(block->local_host_addr, 1138 block->local_host_addr + (current_addr - block->offset)); 1139 1140 return 0; 1141 } 1142 1143 /* 1144 * Register a chunk with IB. If the chunk was already registered 1145 * previously, then skip. 1146 * 1147 * Also return the keys associated with the registration needed 1148 * to perform the actual RDMA operation. 1149 */ 1150 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1151 RDMALocalBlock *block, uintptr_t host_addr, 1152 uint32_t *lkey, uint32_t *rkey, int chunk, 1153 uint8_t *chunk_start, uint8_t *chunk_end) 1154 { 1155 if (block->mr) { 1156 if (lkey) { 1157 *lkey = block->mr->lkey; 1158 } 1159 if (rkey) { 1160 *rkey = block->mr->rkey; 1161 } 1162 return 0; 1163 } 1164 1165 /* allocate memory to store chunk MRs */ 1166 if (!block->pmr) { 1167 block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *)); 1168 } 1169 1170 /* 1171 * If 'rkey', then we're the destination, so grant access to the source. 1172 * 1173 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1174 */ 1175 if (!block->pmr[chunk]) { 1176 uint64_t len = chunk_end - chunk_start; 1177 1178 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1179 1180 block->pmr[chunk] = ibv_reg_mr(rdma->pd, 1181 chunk_start, len, 1182 (rkey ? (IBV_ACCESS_LOCAL_WRITE | 1183 IBV_ACCESS_REMOTE_WRITE) : 0)); 1184 1185 if (!block->pmr[chunk]) { 1186 perror("Failed to register chunk!"); 1187 fprintf(stderr, "Chunk details: block: %d chunk index %d" 1188 " start %" PRIuPTR " end %" PRIuPTR 1189 " host %" PRIuPTR 1190 " local %" PRIuPTR " registrations: %d\n", 1191 block->index, chunk, (uintptr_t)chunk_start, 1192 (uintptr_t)chunk_end, host_addr, 1193 (uintptr_t)block->local_host_addr, 1194 rdma->total_registrations); 1195 return -1; 1196 } 1197 rdma->total_registrations++; 1198 } 1199 1200 if (lkey) { 1201 *lkey = block->pmr[chunk]->lkey; 1202 } 1203 if (rkey) { 1204 *rkey = block->pmr[chunk]->rkey; 1205 } 1206 return 0; 1207 } 1208 1209 /* 1210 * Register (at connection time) the memory used for control 1211 * channel messages. 1212 */ 1213 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1214 { 1215 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1216 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1217 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1218 if (rdma->wr_data[idx].control_mr) { 1219 rdma->total_registrations++; 1220 return 0; 1221 } 1222 error_report("qemu_rdma_reg_control failed"); 1223 return -1; 1224 } 1225 1226 const char *print_wrid(int wrid) 1227 { 1228 if (wrid >= RDMA_WRID_RECV_CONTROL) { 1229 return wrid_desc[RDMA_WRID_RECV_CONTROL]; 1230 } 1231 return wrid_desc[wrid]; 1232 } 1233 1234 /* 1235 * RDMA requires memory registration (mlock/pinning), but this is not good for 1236 * overcommitment. 1237 * 1238 * In preparation for the future where LRU information or workload-specific 1239 * writable writable working set memory access behavior is available to QEMU 1240 * it would be nice to have in place the ability to UN-register/UN-pin 1241 * particular memory regions from the RDMA hardware when it is determine that 1242 * those regions of memory will likely not be accessed again in the near future. 1243 * 1244 * While we do not yet have such information right now, the following 1245 * compile-time option allows us to perform a non-optimized version of this 1246 * behavior. 1247 * 1248 * By uncommenting this option, you will cause *all* RDMA transfers to be 1249 * unregistered immediately after the transfer completes on both sides of the 1250 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode. 1251 * 1252 * This will have a terrible impact on migration performance, so until future 1253 * workload information or LRU information is available, do not attempt to use 1254 * this feature except for basic testing. 1255 */ 1256 //#define RDMA_UNREGISTRATION_EXAMPLE 1257 1258 /* 1259 * Perform a non-optimized memory unregistration after every transfer 1260 * for demonstration purposes, only if pin-all is not requested. 1261 * 1262 * Potential optimizations: 1263 * 1. Start a new thread to run this function continuously 1264 - for bit clearing 1265 - and for receipt of unregister messages 1266 * 2. Use an LRU. 1267 * 3. Use workload hints. 1268 */ 1269 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1270 { 1271 while (rdma->unregistrations[rdma->unregister_current]) { 1272 int ret; 1273 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1274 uint64_t chunk = 1275 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1276 uint64_t index = 1277 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1278 RDMALocalBlock *block = 1279 &(rdma->local_ram_blocks.block[index]); 1280 RDMARegister reg = { .current_index = index }; 1281 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1282 }; 1283 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1284 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1285 .repeat = 1, 1286 }; 1287 1288 trace_qemu_rdma_unregister_waiting_proc(chunk, 1289 rdma->unregister_current); 1290 1291 rdma->unregistrations[rdma->unregister_current] = 0; 1292 rdma->unregister_current++; 1293 1294 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1295 rdma->unregister_current = 0; 1296 } 1297 1298 1299 /* 1300 * Unregistration is speculative (because migration is single-threaded 1301 * and we cannot break the protocol's inifinband message ordering). 1302 * Thus, if the memory is currently being used for transmission, 1303 * then abort the attempt to unregister and try again 1304 * later the next time a completion is received for this memory. 1305 */ 1306 clear_bit(chunk, block->unregister_bitmap); 1307 1308 if (test_bit(chunk, block->transit_bitmap)) { 1309 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1310 continue; 1311 } 1312 1313 trace_qemu_rdma_unregister_waiting_send(chunk); 1314 1315 ret = ibv_dereg_mr(block->pmr[chunk]); 1316 block->pmr[chunk] = NULL; 1317 block->remote_keys[chunk] = 0; 1318 1319 if (ret != 0) { 1320 perror("unregistration chunk failed"); 1321 return -ret; 1322 } 1323 rdma->total_registrations--; 1324 1325 reg.key.chunk = chunk; 1326 register_to_network(rdma, ®); 1327 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1328 &resp, NULL, NULL); 1329 if (ret < 0) { 1330 return ret; 1331 } 1332 1333 trace_qemu_rdma_unregister_waiting_complete(chunk); 1334 } 1335 1336 return 0; 1337 } 1338 1339 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1340 uint64_t chunk) 1341 { 1342 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1343 1344 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1345 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1346 1347 return result; 1348 } 1349 1350 /* 1351 * Set bit for unregistration in the next iteration. 1352 * We cannot transmit right here, but will unpin later. 1353 */ 1354 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index, 1355 uint64_t chunk, uint64_t wr_id) 1356 { 1357 if (rdma->unregistrations[rdma->unregister_next] != 0) { 1358 error_report("rdma migration: queue is full"); 1359 } else { 1360 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1361 1362 if (!test_and_set_bit(chunk, block->unregister_bitmap)) { 1363 trace_qemu_rdma_signal_unregister_append(chunk, 1364 rdma->unregister_next); 1365 1366 rdma->unregistrations[rdma->unregister_next++] = 1367 qemu_rdma_make_wrid(wr_id, index, chunk); 1368 1369 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) { 1370 rdma->unregister_next = 0; 1371 } 1372 } else { 1373 trace_qemu_rdma_signal_unregister_already(chunk); 1374 } 1375 } 1376 } 1377 1378 /* 1379 * Consult the connection manager to see a work request 1380 * (of any kind) has completed. 1381 * Return the work request ID that completed. 1382 */ 1383 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out, 1384 uint32_t *byte_len) 1385 { 1386 int ret; 1387 struct ibv_wc wc; 1388 uint64_t wr_id; 1389 1390 ret = ibv_poll_cq(rdma->cq, 1, &wc); 1391 1392 if (!ret) { 1393 *wr_id_out = RDMA_WRID_NONE; 1394 return 0; 1395 } 1396 1397 if (ret < 0) { 1398 error_report("ibv_poll_cq return %d", ret); 1399 return ret; 1400 } 1401 1402 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1403 1404 if (wc.status != IBV_WC_SUCCESS) { 1405 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n", 1406 wc.status, ibv_wc_status_str(wc.status)); 1407 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]); 1408 1409 return -1; 1410 } 1411 1412 if (rdma->control_ready_expected && 1413 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1414 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL], 1415 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent); 1416 rdma->control_ready_expected = 0; 1417 } 1418 1419 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1420 uint64_t chunk = 1421 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1422 uint64_t index = 1423 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1424 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1425 1426 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent, 1427 index, chunk, block->local_host_addr, 1428 (void *)(uintptr_t)block->remote_host_addr); 1429 1430 clear_bit(chunk, block->transit_bitmap); 1431 1432 if (rdma->nb_sent > 0) { 1433 rdma->nb_sent--; 1434 } 1435 1436 if (!rdma->pin_all) { 1437 /* 1438 * FYI: If one wanted to signal a specific chunk to be unregistered 1439 * using LRU or workload-specific information, this is the function 1440 * you would call to do so. That chunk would then get asynchronously 1441 * unregistered later. 1442 */ 1443 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1444 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id); 1445 #endif 1446 } 1447 } else { 1448 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent); 1449 } 1450 1451 *wr_id_out = wc.wr_id; 1452 if (byte_len) { 1453 *byte_len = wc.byte_len; 1454 } 1455 1456 return 0; 1457 } 1458 1459 /* 1460 * Block until the next work request has completed. 1461 * 1462 * First poll to see if a work request has already completed, 1463 * otherwise block. 1464 * 1465 * If we encounter completed work requests for IDs other than 1466 * the one we're interested in, then that's generally an error. 1467 * 1468 * The only exception is actual RDMA Write completions. These 1469 * completions only need to be recorded, but do not actually 1470 * need further processing. 1471 */ 1472 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested, 1473 uint32_t *byte_len) 1474 { 1475 int num_cq_events = 0, ret = 0; 1476 struct ibv_cq *cq; 1477 void *cq_ctx; 1478 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1479 1480 if (ibv_req_notify_cq(rdma->cq, 0)) { 1481 return -1; 1482 } 1483 /* poll cq first */ 1484 while (wr_id != wrid_requested) { 1485 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1486 if (ret < 0) { 1487 return ret; 1488 } 1489 1490 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1491 1492 if (wr_id == RDMA_WRID_NONE) { 1493 break; 1494 } 1495 if (wr_id != wrid_requested) { 1496 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1497 wrid_requested, print_wrid(wr_id), wr_id); 1498 } 1499 } 1500 1501 if (wr_id == wrid_requested) { 1502 return 0; 1503 } 1504 1505 while (1) { 1506 /* 1507 * Coroutine doesn't start until process_incoming_migration() 1508 * so don't yield unless we know we're running inside of a coroutine. 1509 */ 1510 if (rdma->migration_started_on_destination) { 1511 yield_until_fd_readable(rdma->comp_channel->fd); 1512 } 1513 1514 if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) { 1515 perror("ibv_get_cq_event"); 1516 goto err_block_for_wrid; 1517 } 1518 1519 num_cq_events++; 1520 1521 if (ibv_req_notify_cq(cq, 0)) { 1522 goto err_block_for_wrid; 1523 } 1524 1525 while (wr_id != wrid_requested) { 1526 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1527 if (ret < 0) { 1528 goto err_block_for_wrid; 1529 } 1530 1531 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1532 1533 if (wr_id == RDMA_WRID_NONE) { 1534 break; 1535 } 1536 if (wr_id != wrid_requested) { 1537 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1538 wrid_requested, print_wrid(wr_id), wr_id); 1539 } 1540 } 1541 1542 if (wr_id == wrid_requested) { 1543 goto success_block_for_wrid; 1544 } 1545 } 1546 1547 success_block_for_wrid: 1548 if (num_cq_events) { 1549 ibv_ack_cq_events(cq, num_cq_events); 1550 } 1551 return 0; 1552 1553 err_block_for_wrid: 1554 if (num_cq_events) { 1555 ibv_ack_cq_events(cq, num_cq_events); 1556 } 1557 return ret; 1558 } 1559 1560 /* 1561 * Post a SEND message work request for the control channel 1562 * containing some data and block until the post completes. 1563 */ 1564 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1565 RDMAControlHeader *head) 1566 { 1567 int ret = 0; 1568 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1569 struct ibv_send_wr *bad_wr; 1570 struct ibv_sge sge = { 1571 .addr = (uintptr_t)(wr->control), 1572 .length = head->len + sizeof(RDMAControlHeader), 1573 .lkey = wr->control_mr->lkey, 1574 }; 1575 struct ibv_send_wr send_wr = { 1576 .wr_id = RDMA_WRID_SEND_CONTROL, 1577 .opcode = IBV_WR_SEND, 1578 .send_flags = IBV_SEND_SIGNALED, 1579 .sg_list = &sge, 1580 .num_sge = 1, 1581 }; 1582 1583 trace_qemu_rdma_post_send_control(control_desc[head->type]); 1584 1585 /* 1586 * We don't actually need to do a memcpy() in here if we used 1587 * the "sge" properly, but since we're only sending control messages 1588 * (not RAM in a performance-critical path), then its OK for now. 1589 * 1590 * The copy makes the RDMAControlHeader simpler to manipulate 1591 * for the time being. 1592 */ 1593 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1594 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1595 control_to_network((void *) wr->control); 1596 1597 if (buf) { 1598 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1599 } 1600 1601 1602 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1603 1604 if (ret > 0) { 1605 error_report("Failed to use post IB SEND for control"); 1606 return -ret; 1607 } 1608 1609 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1610 if (ret < 0) { 1611 error_report("rdma migration: send polling control error"); 1612 } 1613 1614 return ret; 1615 } 1616 1617 /* 1618 * Post a RECV work request in anticipation of some future receipt 1619 * of data on the control channel. 1620 */ 1621 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx) 1622 { 1623 struct ibv_recv_wr *bad_wr; 1624 struct ibv_sge sge = { 1625 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1626 .length = RDMA_CONTROL_MAX_BUFFER, 1627 .lkey = rdma->wr_data[idx].control_mr->lkey, 1628 }; 1629 1630 struct ibv_recv_wr recv_wr = { 1631 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1632 .sg_list = &sge, 1633 .num_sge = 1, 1634 }; 1635 1636 1637 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1638 return -1; 1639 } 1640 1641 return 0; 1642 } 1643 1644 /* 1645 * Block and wait for a RECV control channel message to arrive. 1646 */ 1647 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1648 RDMAControlHeader *head, int expecting, int idx) 1649 { 1650 uint32_t byte_len; 1651 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1652 &byte_len); 1653 1654 if (ret < 0) { 1655 error_report("rdma migration: recv polling control error!"); 1656 return ret; 1657 } 1658 1659 network_to_control((void *) rdma->wr_data[idx].control); 1660 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1661 1662 trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]); 1663 1664 if (expecting == RDMA_CONTROL_NONE) { 1665 trace_qemu_rdma_exchange_get_response_none(control_desc[head->type], 1666 head->type); 1667 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1668 error_report("Was expecting a %s (%d) control message" 1669 ", but got: %s (%d), length: %d", 1670 control_desc[expecting], expecting, 1671 control_desc[head->type], head->type, head->len); 1672 return -EIO; 1673 } 1674 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1675 error_report("too long length: %d", head->len); 1676 return -EINVAL; 1677 } 1678 if (sizeof(*head) + head->len != byte_len) { 1679 error_report("Malformed length: %d byte_len %d", head->len, byte_len); 1680 return -EINVAL; 1681 } 1682 1683 return 0; 1684 } 1685 1686 /* 1687 * When a RECV work request has completed, the work request's 1688 * buffer is pointed at the header. 1689 * 1690 * This will advance the pointer to the data portion 1691 * of the control message of the work request's buffer that 1692 * was populated after the work request finished. 1693 */ 1694 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1695 RDMAControlHeader *head) 1696 { 1697 rdma->wr_data[idx].control_len = head->len; 1698 rdma->wr_data[idx].control_curr = 1699 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1700 } 1701 1702 /* 1703 * This is an 'atomic' high-level operation to deliver a single, unified 1704 * control-channel message. 1705 * 1706 * Additionally, if the user is expecting some kind of reply to this message, 1707 * they can request a 'resp' response message be filled in by posting an 1708 * additional work request on behalf of the user and waiting for an additional 1709 * completion. 1710 * 1711 * The extra (optional) response is used during registration to us from having 1712 * to perform an *additional* exchange of message just to provide a response by 1713 * instead piggy-backing on the acknowledgement. 1714 */ 1715 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1716 uint8_t *data, RDMAControlHeader *resp, 1717 int *resp_idx, 1718 int (*callback)(RDMAContext *rdma)) 1719 { 1720 int ret = 0; 1721 1722 /* 1723 * Wait until the dest is ready before attempting to deliver the message 1724 * by waiting for a READY message. 1725 */ 1726 if (rdma->control_ready_expected) { 1727 RDMAControlHeader resp; 1728 ret = qemu_rdma_exchange_get_response(rdma, 1729 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY); 1730 if (ret < 0) { 1731 return ret; 1732 } 1733 } 1734 1735 /* 1736 * If the user is expecting a response, post a WR in anticipation of it. 1737 */ 1738 if (resp) { 1739 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA); 1740 if (ret) { 1741 error_report("rdma migration: error posting" 1742 " extra control recv for anticipated result!"); 1743 return ret; 1744 } 1745 } 1746 1747 /* 1748 * Post a WR to replace the one we just consumed for the READY message. 1749 */ 1750 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1751 if (ret) { 1752 error_report("rdma migration: error posting first control recv!"); 1753 return ret; 1754 } 1755 1756 /* 1757 * Deliver the control message that was requested. 1758 */ 1759 ret = qemu_rdma_post_send_control(rdma, data, head); 1760 1761 if (ret < 0) { 1762 error_report("Failed to send control buffer!"); 1763 return ret; 1764 } 1765 1766 /* 1767 * If we're expecting a response, block and wait for it. 1768 */ 1769 if (resp) { 1770 if (callback) { 1771 trace_qemu_rdma_exchange_send_issue_callback(); 1772 ret = callback(rdma); 1773 if (ret < 0) { 1774 return ret; 1775 } 1776 } 1777 1778 trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]); 1779 ret = qemu_rdma_exchange_get_response(rdma, resp, 1780 resp->type, RDMA_WRID_DATA); 1781 1782 if (ret < 0) { 1783 return ret; 1784 } 1785 1786 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1787 if (resp_idx) { 1788 *resp_idx = RDMA_WRID_DATA; 1789 } 1790 trace_qemu_rdma_exchange_send_received(control_desc[resp->type]); 1791 } 1792 1793 rdma->control_ready_expected = 1; 1794 1795 return 0; 1796 } 1797 1798 /* 1799 * This is an 'atomic' high-level operation to receive a single, unified 1800 * control-channel message. 1801 */ 1802 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 1803 int expecting) 1804 { 1805 RDMAControlHeader ready = { 1806 .len = 0, 1807 .type = RDMA_CONTROL_READY, 1808 .repeat = 1, 1809 }; 1810 int ret; 1811 1812 /* 1813 * Inform the source that we're ready to receive a message. 1814 */ 1815 ret = qemu_rdma_post_send_control(rdma, NULL, &ready); 1816 1817 if (ret < 0) { 1818 error_report("Failed to send control buffer!"); 1819 return ret; 1820 } 1821 1822 /* 1823 * Block and wait for the message. 1824 */ 1825 ret = qemu_rdma_exchange_get_response(rdma, head, 1826 expecting, RDMA_WRID_READY); 1827 1828 if (ret < 0) { 1829 return ret; 1830 } 1831 1832 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 1833 1834 /* 1835 * Post a new RECV work request to replace the one we just consumed. 1836 */ 1837 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1838 if (ret) { 1839 error_report("rdma migration: error posting second control recv!"); 1840 return ret; 1841 } 1842 1843 return 0; 1844 } 1845 1846 /* 1847 * Write an actual chunk of memory using RDMA. 1848 * 1849 * If we're using dynamic registration on the dest-side, we have to 1850 * send a registration command first. 1851 */ 1852 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, 1853 int current_index, uint64_t current_addr, 1854 uint64_t length) 1855 { 1856 struct ibv_sge sge; 1857 struct ibv_send_wr send_wr = { 0 }; 1858 struct ibv_send_wr *bad_wr; 1859 int reg_result_idx, ret, count = 0; 1860 uint64_t chunk, chunks; 1861 uint8_t *chunk_start, *chunk_end; 1862 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 1863 RDMARegister reg; 1864 RDMARegisterResult *reg_result; 1865 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 1866 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1867 .type = RDMA_CONTROL_REGISTER_REQUEST, 1868 .repeat = 1, 1869 }; 1870 1871 retry: 1872 sge.addr = (uintptr_t)(block->local_host_addr + 1873 (current_addr - block->offset)); 1874 sge.length = length; 1875 1876 chunk = ram_chunk_index(block->local_host_addr, 1877 (uint8_t *)(uintptr_t)sge.addr); 1878 chunk_start = ram_chunk_start(block, chunk); 1879 1880 if (block->is_ram_block) { 1881 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 1882 1883 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1884 chunks--; 1885 } 1886 } else { 1887 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 1888 1889 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1890 chunks--; 1891 } 1892 } 1893 1894 trace_qemu_rdma_write_one_top(chunks + 1, 1895 (chunks + 1) * 1896 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 1897 1898 chunk_end = ram_chunk_end(block, chunk + chunks); 1899 1900 if (!rdma->pin_all) { 1901 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1902 qemu_rdma_unregister_waiting(rdma); 1903 #endif 1904 } 1905 1906 while (test_bit(chunk, block->transit_bitmap)) { 1907 (void)count; 1908 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 1909 sge.addr, length, rdma->nb_sent, block->nb_chunks); 1910 1911 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 1912 1913 if (ret < 0) { 1914 error_report("Failed to Wait for previous write to complete " 1915 "block %d chunk %" PRIu64 1916 " current %" PRIu64 " len %" PRIu64 " %d", 1917 current_index, chunk, sge.addr, length, rdma->nb_sent); 1918 return ret; 1919 } 1920 } 1921 1922 if (!rdma->pin_all || !block->is_ram_block) { 1923 if (!block->remote_keys[chunk]) { 1924 /* 1925 * This chunk has not yet been registered, so first check to see 1926 * if the entire chunk is zero. If so, tell the other size to 1927 * memset() + madvise() the entire chunk without RDMA. 1928 */ 1929 1930 if (can_use_buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr, 1931 length) 1932 && buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr, 1933 length) == length) { 1934 RDMACompress comp = { 1935 .offset = current_addr, 1936 .value = 0, 1937 .block_idx = current_index, 1938 .length = length, 1939 }; 1940 1941 head.len = sizeof(comp); 1942 head.type = RDMA_CONTROL_COMPRESS; 1943 1944 trace_qemu_rdma_write_one_zero(chunk, sge.length, 1945 current_index, current_addr); 1946 1947 compress_to_network(rdma, &comp); 1948 ret = qemu_rdma_exchange_send(rdma, &head, 1949 (uint8_t *) &comp, NULL, NULL, NULL); 1950 1951 if (ret < 0) { 1952 return -EIO; 1953 } 1954 1955 acct_update_position(f, sge.length, true); 1956 1957 return 1; 1958 } 1959 1960 /* 1961 * Otherwise, tell other side to register. 1962 */ 1963 reg.current_index = current_index; 1964 if (block->is_ram_block) { 1965 reg.key.current_addr = current_addr; 1966 } else { 1967 reg.key.chunk = chunk; 1968 } 1969 reg.chunks = chunks; 1970 1971 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 1972 current_addr); 1973 1974 register_to_network(rdma, ®); 1975 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1976 &resp, ®_result_idx, NULL); 1977 if (ret < 0) { 1978 return ret; 1979 } 1980 1981 /* try to overlap this single registration with the one we sent. */ 1982 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1983 &sge.lkey, NULL, chunk, 1984 chunk_start, chunk_end)) { 1985 error_report("cannot get lkey"); 1986 return -EINVAL; 1987 } 1988 1989 reg_result = (RDMARegisterResult *) 1990 rdma->wr_data[reg_result_idx].control_curr; 1991 1992 network_to_result(reg_result); 1993 1994 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 1995 reg_result->rkey, chunk); 1996 1997 block->remote_keys[chunk] = reg_result->rkey; 1998 block->remote_host_addr = reg_result->host_addr; 1999 } else { 2000 /* already registered before */ 2001 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2002 &sge.lkey, NULL, chunk, 2003 chunk_start, chunk_end)) { 2004 error_report("cannot get lkey!"); 2005 return -EINVAL; 2006 } 2007 } 2008 2009 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 2010 } else { 2011 send_wr.wr.rdma.rkey = block->remote_rkey; 2012 2013 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2014 &sge.lkey, NULL, chunk, 2015 chunk_start, chunk_end)) { 2016 error_report("cannot get lkey!"); 2017 return -EINVAL; 2018 } 2019 } 2020 2021 /* 2022 * Encode the ram block index and chunk within this wrid. 2023 * We will use this information at the time of completion 2024 * to figure out which bitmap to check against and then which 2025 * chunk in the bitmap to look for. 2026 */ 2027 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 2028 current_index, chunk); 2029 2030 send_wr.opcode = IBV_WR_RDMA_WRITE; 2031 send_wr.send_flags = IBV_SEND_SIGNALED; 2032 send_wr.sg_list = &sge; 2033 send_wr.num_sge = 1; 2034 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2035 (current_addr - block->offset); 2036 2037 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2038 sge.length); 2039 2040 /* 2041 * ibv_post_send() does not return negative error numbers, 2042 * per the specification they are positive - no idea why. 2043 */ 2044 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2045 2046 if (ret == ENOMEM) { 2047 trace_qemu_rdma_write_one_queue_full(); 2048 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2049 if (ret < 0) { 2050 error_report("rdma migration: failed to make " 2051 "room in full send queue! %d", ret); 2052 return ret; 2053 } 2054 2055 goto retry; 2056 2057 } else if (ret > 0) { 2058 perror("rdma migration: post rdma write failed"); 2059 return -ret; 2060 } 2061 2062 set_bit(chunk, block->transit_bitmap); 2063 acct_update_position(f, sge.length, false); 2064 rdma->total_writes++; 2065 2066 return 0; 2067 } 2068 2069 /* 2070 * Push out any unwritten RDMA operations. 2071 * 2072 * We support sending out multiple chunks at the same time. 2073 * Not all of them need to get signaled in the completion queue. 2074 */ 2075 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) 2076 { 2077 int ret; 2078 2079 if (!rdma->current_length) { 2080 return 0; 2081 } 2082 2083 ret = qemu_rdma_write_one(f, rdma, 2084 rdma->current_index, rdma->current_addr, rdma->current_length); 2085 2086 if (ret < 0) { 2087 return ret; 2088 } 2089 2090 if (ret == 0) { 2091 rdma->nb_sent++; 2092 trace_qemu_rdma_write_flush(rdma->nb_sent); 2093 } 2094 2095 rdma->current_length = 0; 2096 rdma->current_addr = 0; 2097 2098 return 0; 2099 } 2100 2101 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma, 2102 uint64_t offset, uint64_t len) 2103 { 2104 RDMALocalBlock *block; 2105 uint8_t *host_addr; 2106 uint8_t *chunk_end; 2107 2108 if (rdma->current_index < 0) { 2109 return 0; 2110 } 2111 2112 if (rdma->current_chunk < 0) { 2113 return 0; 2114 } 2115 2116 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2117 host_addr = block->local_host_addr + (offset - block->offset); 2118 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2119 2120 if (rdma->current_length == 0) { 2121 return 0; 2122 } 2123 2124 /* 2125 * Only merge into chunk sequentially. 2126 */ 2127 if (offset != (rdma->current_addr + rdma->current_length)) { 2128 return 0; 2129 } 2130 2131 if (offset < block->offset) { 2132 return 0; 2133 } 2134 2135 if ((offset + len) > (block->offset + block->length)) { 2136 return 0; 2137 } 2138 2139 if ((host_addr + len) > chunk_end) { 2140 return 0; 2141 } 2142 2143 return 1; 2144 } 2145 2146 /* 2147 * We're not actually writing here, but doing three things: 2148 * 2149 * 1. Identify the chunk the buffer belongs to. 2150 * 2. If the chunk is full or the buffer doesn't belong to the current 2151 * chunk, then start a new chunk and flush() the old chunk. 2152 * 3. To keep the hardware busy, we also group chunks into batches 2153 * and only require that a batch gets acknowledged in the completion 2154 * qeueue instead of each individual chunk. 2155 */ 2156 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma, 2157 uint64_t block_offset, uint64_t offset, 2158 uint64_t len) 2159 { 2160 uint64_t current_addr = block_offset + offset; 2161 uint64_t index = rdma->current_index; 2162 uint64_t chunk = rdma->current_chunk; 2163 int ret; 2164 2165 /* If we cannot merge it, we flush the current buffer first. */ 2166 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) { 2167 ret = qemu_rdma_write_flush(f, rdma); 2168 if (ret) { 2169 return ret; 2170 } 2171 rdma->current_length = 0; 2172 rdma->current_addr = current_addr; 2173 2174 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2175 offset, len, &index, &chunk); 2176 if (ret) { 2177 error_report("ram block search failed"); 2178 return ret; 2179 } 2180 rdma->current_index = index; 2181 rdma->current_chunk = chunk; 2182 } 2183 2184 /* merge it */ 2185 rdma->current_length += len; 2186 2187 /* flush it if buffer is too large */ 2188 if (rdma->current_length >= RDMA_MERGE_MAX) { 2189 return qemu_rdma_write_flush(f, rdma); 2190 } 2191 2192 return 0; 2193 } 2194 2195 static void qemu_rdma_cleanup(RDMAContext *rdma) 2196 { 2197 struct rdma_cm_event *cm_event; 2198 int ret, idx; 2199 2200 if (rdma->cm_id && rdma->connected) { 2201 if (rdma->error_state) { 2202 RDMAControlHeader head = { .len = 0, 2203 .type = RDMA_CONTROL_ERROR, 2204 .repeat = 1, 2205 }; 2206 error_report("Early error. Sending error."); 2207 qemu_rdma_post_send_control(rdma, NULL, &head); 2208 } 2209 2210 ret = rdma_disconnect(rdma->cm_id); 2211 if (!ret) { 2212 trace_qemu_rdma_cleanup_waiting_for_disconnect(); 2213 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2214 if (!ret) { 2215 rdma_ack_cm_event(cm_event); 2216 } 2217 } 2218 trace_qemu_rdma_cleanup_disconnect(); 2219 rdma->connected = false; 2220 } 2221 2222 g_free(rdma->dest_blocks); 2223 rdma->dest_blocks = NULL; 2224 2225 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2226 if (rdma->wr_data[idx].control_mr) { 2227 rdma->total_registrations--; 2228 ibv_dereg_mr(rdma->wr_data[idx].control_mr); 2229 } 2230 rdma->wr_data[idx].control_mr = NULL; 2231 } 2232 2233 if (rdma->local_ram_blocks.block) { 2234 while (rdma->local_ram_blocks.nb_blocks) { 2235 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); 2236 } 2237 } 2238 2239 if (rdma->qp) { 2240 rdma_destroy_qp(rdma->cm_id); 2241 rdma->qp = NULL; 2242 } 2243 if (rdma->cq) { 2244 ibv_destroy_cq(rdma->cq); 2245 rdma->cq = NULL; 2246 } 2247 if (rdma->comp_channel) { 2248 ibv_destroy_comp_channel(rdma->comp_channel); 2249 rdma->comp_channel = NULL; 2250 } 2251 if (rdma->pd) { 2252 ibv_dealloc_pd(rdma->pd); 2253 rdma->pd = NULL; 2254 } 2255 if (rdma->cm_id) { 2256 rdma_destroy_id(rdma->cm_id); 2257 rdma->cm_id = NULL; 2258 } 2259 if (rdma->listen_id) { 2260 rdma_destroy_id(rdma->listen_id); 2261 rdma->listen_id = NULL; 2262 } 2263 if (rdma->channel) { 2264 rdma_destroy_event_channel(rdma->channel); 2265 rdma->channel = NULL; 2266 } 2267 g_free(rdma->host); 2268 rdma->host = NULL; 2269 } 2270 2271 2272 static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all) 2273 { 2274 int ret, idx; 2275 Error *local_err = NULL, **temp = &local_err; 2276 2277 /* 2278 * Will be validated against destination's actual capabilities 2279 * after the connect() completes. 2280 */ 2281 rdma->pin_all = pin_all; 2282 2283 ret = qemu_rdma_resolve_host(rdma, temp); 2284 if (ret) { 2285 goto err_rdma_source_init; 2286 } 2287 2288 ret = qemu_rdma_alloc_pd_cq(rdma); 2289 if (ret) { 2290 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()" 2291 " limits may be too low. Please check $ ulimit -a # and " 2292 "search for 'ulimit -l' in the output"); 2293 goto err_rdma_source_init; 2294 } 2295 2296 ret = qemu_rdma_alloc_qp(rdma); 2297 if (ret) { 2298 ERROR(temp, "rdma migration: error allocating qp!"); 2299 goto err_rdma_source_init; 2300 } 2301 2302 ret = qemu_rdma_init_ram_blocks(rdma); 2303 if (ret) { 2304 ERROR(temp, "rdma migration: error initializing ram blocks!"); 2305 goto err_rdma_source_init; 2306 } 2307 2308 /* Build the hash that maps from offset to RAMBlock */ 2309 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 2310 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) { 2311 g_hash_table_insert(rdma->blockmap, 2312 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset, 2313 &rdma->local_ram_blocks.block[idx]); 2314 } 2315 2316 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2317 ret = qemu_rdma_reg_control(rdma, idx); 2318 if (ret) { 2319 ERROR(temp, "rdma migration: error registering %d control!", 2320 idx); 2321 goto err_rdma_source_init; 2322 } 2323 } 2324 2325 return 0; 2326 2327 err_rdma_source_init: 2328 error_propagate(errp, local_err); 2329 qemu_rdma_cleanup(rdma); 2330 return -1; 2331 } 2332 2333 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp) 2334 { 2335 RDMACapabilities cap = { 2336 .version = RDMA_CONTROL_VERSION_CURRENT, 2337 .flags = 0, 2338 }; 2339 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2340 .retry_count = 5, 2341 .private_data = &cap, 2342 .private_data_len = sizeof(cap), 2343 }; 2344 struct rdma_cm_event *cm_event; 2345 int ret; 2346 2347 /* 2348 * Only negotiate the capability with destination if the user 2349 * on the source first requested the capability. 2350 */ 2351 if (rdma->pin_all) { 2352 trace_qemu_rdma_connect_pin_all_requested(); 2353 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2354 } 2355 2356 caps_to_network(&cap); 2357 2358 ret = rdma_connect(rdma->cm_id, &conn_param); 2359 if (ret) { 2360 perror("rdma_connect"); 2361 ERROR(errp, "connecting to destination!"); 2362 goto err_rdma_source_connect; 2363 } 2364 2365 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2366 if (ret) { 2367 perror("rdma_get_cm_event after rdma_connect"); 2368 ERROR(errp, "connecting to destination!"); 2369 rdma_ack_cm_event(cm_event); 2370 goto err_rdma_source_connect; 2371 } 2372 2373 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2374 perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect"); 2375 ERROR(errp, "connecting to destination!"); 2376 rdma_ack_cm_event(cm_event); 2377 goto err_rdma_source_connect; 2378 } 2379 rdma->connected = true; 2380 2381 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2382 network_to_caps(&cap); 2383 2384 /* 2385 * Verify that the *requested* capabilities are supported by the destination 2386 * and disable them otherwise. 2387 */ 2388 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2389 ERROR(errp, "Server cannot support pinning all memory. " 2390 "Will register memory dynamically."); 2391 rdma->pin_all = false; 2392 } 2393 2394 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2395 2396 rdma_ack_cm_event(cm_event); 2397 2398 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2399 if (ret) { 2400 ERROR(errp, "posting second control recv!"); 2401 goto err_rdma_source_connect; 2402 } 2403 2404 rdma->control_ready_expected = 1; 2405 rdma->nb_sent = 0; 2406 return 0; 2407 2408 err_rdma_source_connect: 2409 qemu_rdma_cleanup(rdma); 2410 return -1; 2411 } 2412 2413 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2414 { 2415 int ret, idx; 2416 struct rdma_cm_id *listen_id; 2417 char ip[40] = "unknown"; 2418 struct rdma_addrinfo *res, *e; 2419 char port_str[16]; 2420 2421 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2422 rdma->wr_data[idx].control_len = 0; 2423 rdma->wr_data[idx].control_curr = NULL; 2424 } 2425 2426 if (!rdma->host || !rdma->host[0]) { 2427 ERROR(errp, "RDMA host is not set!"); 2428 rdma->error_state = -EINVAL; 2429 return -1; 2430 } 2431 /* create CM channel */ 2432 rdma->channel = rdma_create_event_channel(); 2433 if (!rdma->channel) { 2434 ERROR(errp, "could not create rdma event channel"); 2435 rdma->error_state = -EINVAL; 2436 return -1; 2437 } 2438 2439 /* create CM id */ 2440 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2441 if (ret) { 2442 ERROR(errp, "could not create cm_id!"); 2443 goto err_dest_init_create_listen_id; 2444 } 2445 2446 snprintf(port_str, 16, "%d", rdma->port); 2447 port_str[15] = '\0'; 2448 2449 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2450 if (ret < 0) { 2451 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 2452 goto err_dest_init_bind_addr; 2453 } 2454 2455 for (e = res; e != NULL; e = e->ai_next) { 2456 inet_ntop(e->ai_family, 2457 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2458 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2459 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2460 if (ret) { 2461 continue; 2462 } 2463 if (e->ai_family == AF_INET6) { 2464 ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs); 2465 if (ret) { 2466 continue; 2467 } 2468 } 2469 break; 2470 } 2471 2472 if (!e) { 2473 ERROR(errp, "Error: could not rdma_bind_addr!"); 2474 goto err_dest_init_bind_addr; 2475 } 2476 2477 rdma->listen_id = listen_id; 2478 qemu_rdma_dump_gid("dest_init", listen_id); 2479 return 0; 2480 2481 err_dest_init_bind_addr: 2482 rdma_destroy_id(listen_id); 2483 err_dest_init_create_listen_id: 2484 rdma_destroy_event_channel(rdma->channel); 2485 rdma->channel = NULL; 2486 rdma->error_state = ret; 2487 return ret; 2488 2489 } 2490 2491 static void *qemu_rdma_data_init(const char *host_port, Error **errp) 2492 { 2493 RDMAContext *rdma = NULL; 2494 InetSocketAddress *addr; 2495 2496 if (host_port) { 2497 rdma = g_malloc0(sizeof(RDMAContext)); 2498 rdma->current_index = -1; 2499 rdma->current_chunk = -1; 2500 2501 addr = inet_parse(host_port, NULL); 2502 if (addr != NULL) { 2503 rdma->port = atoi(addr->port); 2504 rdma->host = g_strdup(addr->host); 2505 } else { 2506 ERROR(errp, "bad RDMA migration address '%s'", host_port); 2507 g_free(rdma); 2508 rdma = NULL; 2509 } 2510 2511 qapi_free_InetSocketAddress(addr); 2512 } 2513 2514 return rdma; 2515 } 2516 2517 /* 2518 * QEMUFile interface to the control channel. 2519 * SEND messages for control only. 2520 * VM's ram is handled with regular RDMA messages. 2521 */ 2522 static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, 2523 int64_t pos, int size) 2524 { 2525 QEMUFileRDMA *r = opaque; 2526 QEMUFile *f = r->file; 2527 RDMAContext *rdma = r->rdma; 2528 size_t remaining = size; 2529 uint8_t * data = (void *) buf; 2530 int ret; 2531 2532 CHECK_ERROR_STATE(); 2533 2534 /* 2535 * Push out any writes that 2536 * we're queued up for VM's ram. 2537 */ 2538 ret = qemu_rdma_write_flush(f, rdma); 2539 if (ret < 0) { 2540 rdma->error_state = ret; 2541 return ret; 2542 } 2543 2544 while (remaining) { 2545 RDMAControlHeader head; 2546 2547 r->len = MIN(remaining, RDMA_SEND_INCREMENT); 2548 remaining -= r->len; 2549 2550 head.len = r->len; 2551 head.type = RDMA_CONTROL_QEMU_FILE; 2552 2553 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); 2554 2555 if (ret < 0) { 2556 rdma->error_state = ret; 2557 return ret; 2558 } 2559 2560 data += r->len; 2561 } 2562 2563 return size; 2564 } 2565 2566 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2567 int size, int idx) 2568 { 2569 size_t len = 0; 2570 2571 if (rdma->wr_data[idx].control_len) { 2572 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2573 2574 len = MIN(size, rdma->wr_data[idx].control_len); 2575 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2576 rdma->wr_data[idx].control_curr += len; 2577 rdma->wr_data[idx].control_len -= len; 2578 } 2579 2580 return len; 2581 } 2582 2583 /* 2584 * QEMUFile interface to the control channel. 2585 * RDMA links don't use bytestreams, so we have to 2586 * return bytes to QEMUFile opportunistically. 2587 */ 2588 static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf, 2589 int64_t pos, int size) 2590 { 2591 QEMUFileRDMA *r = opaque; 2592 RDMAContext *rdma = r->rdma; 2593 RDMAControlHeader head; 2594 int ret = 0; 2595 2596 CHECK_ERROR_STATE(); 2597 2598 /* 2599 * First, we hold on to the last SEND message we 2600 * were given and dish out the bytes until we run 2601 * out of bytes. 2602 */ 2603 r->len = qemu_rdma_fill(r->rdma, buf, size, 0); 2604 if (r->len) { 2605 return r->len; 2606 } 2607 2608 /* 2609 * Once we run out, we block and wait for another 2610 * SEND message to arrive. 2611 */ 2612 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); 2613 2614 if (ret < 0) { 2615 rdma->error_state = ret; 2616 return ret; 2617 } 2618 2619 /* 2620 * SEND was received with new bytes, now try again. 2621 */ 2622 return qemu_rdma_fill(r->rdma, buf, size, 0); 2623 } 2624 2625 /* 2626 * Block until all the outstanding chunks have been delivered by the hardware. 2627 */ 2628 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) 2629 { 2630 int ret; 2631 2632 if (qemu_rdma_write_flush(f, rdma) < 0) { 2633 return -EIO; 2634 } 2635 2636 while (rdma->nb_sent) { 2637 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2638 if (ret < 0) { 2639 error_report("rdma migration: complete polling error!"); 2640 return -EIO; 2641 } 2642 } 2643 2644 qemu_rdma_unregister_waiting(rdma); 2645 2646 return 0; 2647 } 2648 2649 static int qemu_rdma_close(void *opaque) 2650 { 2651 trace_qemu_rdma_close(); 2652 QEMUFileRDMA *r = opaque; 2653 if (r->rdma) { 2654 qemu_rdma_cleanup(r->rdma); 2655 g_free(r->rdma); 2656 } 2657 g_free(r); 2658 return 0; 2659 } 2660 2661 /* 2662 * Parameters: 2663 * @offset == 0 : 2664 * This means that 'block_offset' is a full virtual address that does not 2665 * belong to a RAMBlock of the virtual machine and instead 2666 * represents a private malloc'd memory area that the caller wishes to 2667 * transfer. 2668 * 2669 * @offset != 0 : 2670 * Offset is an offset to be added to block_offset and used 2671 * to also lookup the corresponding RAMBlock. 2672 * 2673 * @size > 0 : 2674 * Initiate an transfer this size. 2675 * 2676 * @size == 0 : 2677 * A 'hint' or 'advice' that means that we wish to speculatively 2678 * and asynchronously unregister this memory. In this case, there is no 2679 * guarantee that the unregister will actually happen, for example, 2680 * if the memory is being actively transmitted. Additionally, the memory 2681 * may be re-registered at any future time if a write within the same 2682 * chunk was requested again, even if you attempted to unregister it 2683 * here. 2684 * 2685 * @size < 0 : TODO, not yet supported 2686 * Unregister the memory NOW. This means that the caller does not 2687 * expect there to be any future RDMA transfers and we just want to clean 2688 * things up. This is used in case the upper layer owns the memory and 2689 * cannot wait for qemu_fclose() to occur. 2690 * 2691 * @bytes_sent : User-specificed pointer to indicate how many bytes were 2692 * sent. Usually, this will not be more than a few bytes of 2693 * the protocol because most transfers are sent asynchronously. 2694 */ 2695 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, 2696 ram_addr_t block_offset, ram_addr_t offset, 2697 size_t size, uint64_t *bytes_sent) 2698 { 2699 QEMUFileRDMA *rfile = opaque; 2700 RDMAContext *rdma = rfile->rdma; 2701 int ret; 2702 2703 CHECK_ERROR_STATE(); 2704 2705 qemu_fflush(f); 2706 2707 if (size > 0) { 2708 /* 2709 * Add this page to the current 'chunk'. If the chunk 2710 * is full, or the page doen't belong to the current chunk, 2711 * an actual RDMA write will occur and a new chunk will be formed. 2712 */ 2713 ret = qemu_rdma_write(f, rdma, block_offset, offset, size); 2714 if (ret < 0) { 2715 error_report("rdma migration: write error! %d", ret); 2716 goto err; 2717 } 2718 2719 /* 2720 * We always return 1 bytes because the RDMA 2721 * protocol is completely asynchronous. We do not yet know 2722 * whether an identified chunk is zero or not because we're 2723 * waiting for other pages to potentially be merged with 2724 * the current chunk. So, we have to call qemu_update_position() 2725 * later on when the actual write occurs. 2726 */ 2727 if (bytes_sent) { 2728 *bytes_sent = 1; 2729 } 2730 } else { 2731 uint64_t index, chunk; 2732 2733 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long 2734 if (size < 0) { 2735 ret = qemu_rdma_drain_cq(f, rdma); 2736 if (ret < 0) { 2737 fprintf(stderr, "rdma: failed to synchronously drain" 2738 " completion queue before unregistration.\n"); 2739 goto err; 2740 } 2741 } 2742 */ 2743 2744 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2745 offset, size, &index, &chunk); 2746 2747 if (ret) { 2748 error_report("ram block search failed"); 2749 goto err; 2750 } 2751 2752 qemu_rdma_signal_unregister(rdma, index, chunk, 0); 2753 2754 /* 2755 * TODO: Synchronous, guaranteed unregistration (should not occur during 2756 * fast-path). Otherwise, unregisters will process on the next call to 2757 * qemu_rdma_drain_cq() 2758 if (size < 0) { 2759 qemu_rdma_unregister_waiting(rdma); 2760 } 2761 */ 2762 } 2763 2764 /* 2765 * Drain the Completion Queue if possible, but do not block, 2766 * just poll. 2767 * 2768 * If nothing to poll, the end of the iteration will do this 2769 * again to make sure we don't overflow the request queue. 2770 */ 2771 while (1) { 2772 uint64_t wr_id, wr_id_in; 2773 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL); 2774 if (ret < 0) { 2775 error_report("rdma migration: polling error! %d", ret); 2776 goto err; 2777 } 2778 2779 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 2780 2781 if (wr_id == RDMA_WRID_NONE) { 2782 break; 2783 } 2784 } 2785 2786 return RAM_SAVE_CONTROL_DELAYED; 2787 err: 2788 rdma->error_state = ret; 2789 return ret; 2790 } 2791 2792 static int qemu_rdma_accept(RDMAContext *rdma) 2793 { 2794 RDMACapabilities cap; 2795 struct rdma_conn_param conn_param = { 2796 .responder_resources = 2, 2797 .private_data = &cap, 2798 .private_data_len = sizeof(cap), 2799 }; 2800 struct rdma_cm_event *cm_event; 2801 struct ibv_context *verbs; 2802 int ret = -EINVAL; 2803 int idx; 2804 2805 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2806 if (ret) { 2807 goto err_rdma_dest_wait; 2808 } 2809 2810 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 2811 rdma_ack_cm_event(cm_event); 2812 goto err_rdma_dest_wait; 2813 } 2814 2815 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2816 2817 network_to_caps(&cap); 2818 2819 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 2820 error_report("Unknown source RDMA version: %d, bailing...", 2821 cap.version); 2822 rdma_ack_cm_event(cm_event); 2823 goto err_rdma_dest_wait; 2824 } 2825 2826 /* 2827 * Respond with only the capabilities this version of QEMU knows about. 2828 */ 2829 cap.flags &= known_capabilities; 2830 2831 /* 2832 * Enable the ones that we do know about. 2833 * Add other checks here as new ones are introduced. 2834 */ 2835 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 2836 rdma->pin_all = true; 2837 } 2838 2839 rdma->cm_id = cm_event->id; 2840 verbs = cm_event->id->verbs; 2841 2842 rdma_ack_cm_event(cm_event); 2843 2844 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 2845 2846 caps_to_network(&cap); 2847 2848 trace_qemu_rdma_accept_pin_verbsc(verbs); 2849 2850 if (!rdma->verbs) { 2851 rdma->verbs = verbs; 2852 } else if (rdma->verbs != verbs) { 2853 error_report("ibv context not matching %p, %p!", rdma->verbs, 2854 verbs); 2855 goto err_rdma_dest_wait; 2856 } 2857 2858 qemu_rdma_dump_id("dest_init", verbs); 2859 2860 ret = qemu_rdma_alloc_pd_cq(rdma); 2861 if (ret) { 2862 error_report("rdma migration: error allocating pd and cq!"); 2863 goto err_rdma_dest_wait; 2864 } 2865 2866 ret = qemu_rdma_alloc_qp(rdma); 2867 if (ret) { 2868 error_report("rdma migration: error allocating qp!"); 2869 goto err_rdma_dest_wait; 2870 } 2871 2872 ret = qemu_rdma_init_ram_blocks(rdma); 2873 if (ret) { 2874 error_report("rdma migration: error initializing ram blocks!"); 2875 goto err_rdma_dest_wait; 2876 } 2877 2878 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2879 ret = qemu_rdma_reg_control(rdma, idx); 2880 if (ret) { 2881 error_report("rdma: error registering %d control", idx); 2882 goto err_rdma_dest_wait; 2883 } 2884 } 2885 2886 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 2887 2888 ret = rdma_accept(rdma->cm_id, &conn_param); 2889 if (ret) { 2890 error_report("rdma_accept returns %d", ret); 2891 goto err_rdma_dest_wait; 2892 } 2893 2894 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2895 if (ret) { 2896 error_report("rdma_accept get_cm_event failed %d", ret); 2897 goto err_rdma_dest_wait; 2898 } 2899 2900 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2901 error_report("rdma_accept not event established"); 2902 rdma_ack_cm_event(cm_event); 2903 goto err_rdma_dest_wait; 2904 } 2905 2906 rdma_ack_cm_event(cm_event); 2907 rdma->connected = true; 2908 2909 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2910 if (ret) { 2911 error_report("rdma migration: error posting second control recv"); 2912 goto err_rdma_dest_wait; 2913 } 2914 2915 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 2916 2917 return 0; 2918 2919 err_rdma_dest_wait: 2920 rdma->error_state = ret; 2921 qemu_rdma_cleanup(rdma); 2922 return ret; 2923 } 2924 2925 static int dest_ram_sort_func(const void *a, const void *b) 2926 { 2927 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index; 2928 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index; 2929 2930 return (a_index < b_index) ? -1 : (a_index != b_index); 2931 } 2932 2933 /* 2934 * During each iteration of the migration, we listen for instructions 2935 * by the source VM to perform dynamic page registrations before they 2936 * can perform RDMA operations. 2937 * 2938 * We respond with the 'rkey'. 2939 * 2940 * Keep doing this until the source tells us to stop. 2941 */ 2942 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) 2943 { 2944 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 2945 .type = RDMA_CONTROL_REGISTER_RESULT, 2946 .repeat = 0, 2947 }; 2948 RDMAControlHeader unreg_resp = { .len = 0, 2949 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 2950 .repeat = 0, 2951 }; 2952 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 2953 .repeat = 1 }; 2954 QEMUFileRDMA *rfile = opaque; 2955 RDMAContext *rdma = rfile->rdma; 2956 RDMALocalBlocks *local = &rdma->local_ram_blocks; 2957 RDMAControlHeader head; 2958 RDMARegister *reg, *registers; 2959 RDMACompress *comp; 2960 RDMARegisterResult *reg_result; 2961 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 2962 RDMALocalBlock *block; 2963 void *host_addr; 2964 int ret = 0; 2965 int idx = 0; 2966 int count = 0; 2967 int i = 0; 2968 2969 CHECK_ERROR_STATE(); 2970 2971 do { 2972 trace_qemu_rdma_registration_handle_wait(); 2973 2974 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE); 2975 2976 if (ret < 0) { 2977 break; 2978 } 2979 2980 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 2981 error_report("rdma: Too many requests in this message (%d)." 2982 "Bailing.", head.repeat); 2983 ret = -EIO; 2984 break; 2985 } 2986 2987 switch (head.type) { 2988 case RDMA_CONTROL_COMPRESS: 2989 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 2990 network_to_compress(comp); 2991 2992 trace_qemu_rdma_registration_handle_compress(comp->length, 2993 comp->block_idx, 2994 comp->offset); 2995 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) { 2996 error_report("rdma: 'compress' bad block index %u (vs %d)", 2997 (unsigned int)comp->block_idx, 2998 rdma->local_ram_blocks.nb_blocks); 2999 ret = -EIO; 3000 goto out; 3001 } 3002 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 3003 3004 host_addr = block->local_host_addr + 3005 (comp->offset - block->offset); 3006 3007 ram_handle_compressed(host_addr, comp->value, comp->length); 3008 break; 3009 3010 case RDMA_CONTROL_REGISTER_FINISHED: 3011 trace_qemu_rdma_registration_handle_finished(); 3012 goto out; 3013 3014 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 3015 trace_qemu_rdma_registration_handle_ram_blocks(); 3016 3017 /* Sort our local RAM Block list so it's the same as the source, 3018 * we can do this since we've filled in a src_index in the list 3019 * as we received the RAMBlock list earlier. 3020 */ 3021 qsort(rdma->local_ram_blocks.block, 3022 rdma->local_ram_blocks.nb_blocks, 3023 sizeof(RDMALocalBlock), dest_ram_sort_func); 3024 if (rdma->pin_all) { 3025 ret = qemu_rdma_reg_whole_ram_blocks(rdma); 3026 if (ret) { 3027 error_report("rdma migration: error dest " 3028 "registering ram blocks"); 3029 goto out; 3030 } 3031 } 3032 3033 /* 3034 * Dest uses this to prepare to transmit the RAMBlock descriptions 3035 * to the source VM after connection setup. 3036 * Both sides use the "remote" structure to communicate and update 3037 * their "local" descriptions with what was sent. 3038 */ 3039 for (i = 0; i < local->nb_blocks; i++) { 3040 rdma->dest_blocks[i].remote_host_addr = 3041 (uintptr_t)(local->block[i].local_host_addr); 3042 3043 if (rdma->pin_all) { 3044 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 3045 } 3046 3047 rdma->dest_blocks[i].offset = local->block[i].offset; 3048 rdma->dest_blocks[i].length = local->block[i].length; 3049 3050 dest_block_to_network(&rdma->dest_blocks[i]); 3051 trace_qemu_rdma_registration_handle_ram_blocks_loop( 3052 local->block[i].block_name, 3053 local->block[i].offset, 3054 local->block[i].length, 3055 local->block[i].local_host_addr, 3056 local->block[i].src_index); 3057 } 3058 3059 blocks.len = rdma->local_ram_blocks.nb_blocks 3060 * sizeof(RDMADestBlock); 3061 3062 3063 ret = qemu_rdma_post_send_control(rdma, 3064 (uint8_t *) rdma->dest_blocks, &blocks); 3065 3066 if (ret < 0) { 3067 error_report("rdma migration: error sending remote info"); 3068 goto out; 3069 } 3070 3071 break; 3072 case RDMA_CONTROL_REGISTER_REQUEST: 3073 trace_qemu_rdma_registration_handle_register(head.repeat); 3074 3075 reg_resp.repeat = head.repeat; 3076 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3077 3078 for (count = 0; count < head.repeat; count++) { 3079 uint64_t chunk; 3080 uint8_t *chunk_start, *chunk_end; 3081 3082 reg = ®isters[count]; 3083 network_to_register(reg); 3084 3085 reg_result = &results[count]; 3086 3087 trace_qemu_rdma_registration_handle_register_loop(count, 3088 reg->current_index, reg->key.current_addr, reg->chunks); 3089 3090 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) { 3091 error_report("rdma: 'register' bad block index %u (vs %d)", 3092 (unsigned int)reg->current_index, 3093 rdma->local_ram_blocks.nb_blocks); 3094 ret = -ENOENT; 3095 goto out; 3096 } 3097 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3098 if (block->is_ram_block) { 3099 if (block->offset > reg->key.current_addr) { 3100 error_report("rdma: bad register address for block %s" 3101 " offset: %" PRIx64 " current_addr: %" PRIx64, 3102 block->block_name, block->offset, 3103 reg->key.current_addr); 3104 ret = -ERANGE; 3105 goto out; 3106 } 3107 host_addr = (block->local_host_addr + 3108 (reg->key.current_addr - block->offset)); 3109 chunk = ram_chunk_index(block->local_host_addr, 3110 (uint8_t *) host_addr); 3111 } else { 3112 chunk = reg->key.chunk; 3113 host_addr = block->local_host_addr + 3114 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3115 /* Check for particularly bad chunk value */ 3116 if (host_addr < (void *)block->local_host_addr) { 3117 error_report("rdma: bad chunk for block %s" 3118 " chunk: %" PRIx64, 3119 block->block_name, reg->key.chunk); 3120 ret = -ERANGE; 3121 goto out; 3122 } 3123 } 3124 chunk_start = ram_chunk_start(block, chunk); 3125 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3126 if (qemu_rdma_register_and_get_keys(rdma, block, 3127 (uintptr_t)host_addr, NULL, ®_result->rkey, 3128 chunk, chunk_start, chunk_end)) { 3129 error_report("cannot get rkey"); 3130 ret = -EINVAL; 3131 goto out; 3132 } 3133 3134 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3135 3136 trace_qemu_rdma_registration_handle_register_rkey( 3137 reg_result->rkey); 3138 3139 result_to_network(reg_result); 3140 } 3141 3142 ret = qemu_rdma_post_send_control(rdma, 3143 (uint8_t *) results, ®_resp); 3144 3145 if (ret < 0) { 3146 error_report("Failed to send control buffer"); 3147 goto out; 3148 } 3149 break; 3150 case RDMA_CONTROL_UNREGISTER_REQUEST: 3151 trace_qemu_rdma_registration_handle_unregister(head.repeat); 3152 unreg_resp.repeat = head.repeat; 3153 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3154 3155 for (count = 0; count < head.repeat; count++) { 3156 reg = ®isters[count]; 3157 network_to_register(reg); 3158 3159 trace_qemu_rdma_registration_handle_unregister_loop(count, 3160 reg->current_index, reg->key.chunk); 3161 3162 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3163 3164 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3165 block->pmr[reg->key.chunk] = NULL; 3166 3167 if (ret != 0) { 3168 perror("rdma unregistration chunk failed"); 3169 ret = -ret; 3170 goto out; 3171 } 3172 3173 rdma->total_registrations--; 3174 3175 trace_qemu_rdma_registration_handle_unregister_success( 3176 reg->key.chunk); 3177 } 3178 3179 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp); 3180 3181 if (ret < 0) { 3182 error_report("Failed to send control buffer"); 3183 goto out; 3184 } 3185 break; 3186 case RDMA_CONTROL_REGISTER_RESULT: 3187 error_report("Invalid RESULT message at dest."); 3188 ret = -EIO; 3189 goto out; 3190 default: 3191 error_report("Unknown control message %s", control_desc[head.type]); 3192 ret = -EIO; 3193 goto out; 3194 } 3195 } while (1); 3196 out: 3197 if (ret < 0) { 3198 rdma->error_state = ret; 3199 } 3200 return ret; 3201 } 3202 3203 /* Destination: 3204 * Called via a ram_control_load_hook during the initial RAM load section which 3205 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks 3206 * on the source. 3207 * We've already built our local RAMBlock list, but not yet sent the list to 3208 * the source. 3209 */ 3210 static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char *name) 3211 { 3212 RDMAContext *rdma = rfile->rdma; 3213 int curr; 3214 int found = -1; 3215 3216 /* Find the matching RAMBlock in our local list */ 3217 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { 3218 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { 3219 found = curr; 3220 break; 3221 } 3222 } 3223 3224 if (found == -1) { 3225 error_report("RAMBlock '%s' not found on destination", name); 3226 return -ENOENT; 3227 } 3228 3229 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index; 3230 trace_rdma_block_notification_handle(name, rdma->next_src_index); 3231 rdma->next_src_index++; 3232 3233 return 0; 3234 } 3235 3236 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) 3237 { 3238 switch (flags) { 3239 case RAM_CONTROL_BLOCK_REG: 3240 return rdma_block_notification_handle(opaque, data); 3241 3242 case RAM_CONTROL_HOOK: 3243 return qemu_rdma_registration_handle(f, opaque); 3244 3245 default: 3246 /* Shouldn't be called with any other values */ 3247 abort(); 3248 } 3249 } 3250 3251 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, 3252 uint64_t flags, void *data) 3253 { 3254 QEMUFileRDMA *rfile = opaque; 3255 RDMAContext *rdma = rfile->rdma; 3256 3257 CHECK_ERROR_STATE(); 3258 3259 trace_qemu_rdma_registration_start(flags); 3260 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3261 qemu_fflush(f); 3262 3263 return 0; 3264 } 3265 3266 /* 3267 * Inform dest that dynamic registrations are done for now. 3268 * First, flush writes, if any. 3269 */ 3270 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, 3271 uint64_t flags, void *data) 3272 { 3273 Error *local_err = NULL, **errp = &local_err; 3274 QEMUFileRDMA *rfile = opaque; 3275 RDMAContext *rdma = rfile->rdma; 3276 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3277 int ret = 0; 3278 3279 CHECK_ERROR_STATE(); 3280 3281 qemu_fflush(f); 3282 ret = qemu_rdma_drain_cq(f, rdma); 3283 3284 if (ret < 0) { 3285 goto err; 3286 } 3287 3288 if (flags == RAM_CONTROL_SETUP) { 3289 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3290 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3291 int reg_result_idx, i, nb_dest_blocks; 3292 3293 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3294 trace_qemu_rdma_registration_stop_ram(); 3295 3296 /* 3297 * Make sure that we parallelize the pinning on both sides. 3298 * For very large guests, doing this serially takes a really 3299 * long time, so we have to 'interleave' the pinning locally 3300 * with the control messages by performing the pinning on this 3301 * side before we receive the control response from the other 3302 * side that the pinning has completed. 3303 */ 3304 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3305 ®_result_idx, rdma->pin_all ? 3306 qemu_rdma_reg_whole_ram_blocks : NULL); 3307 if (ret < 0) { 3308 ERROR(errp, "receiving remote info!"); 3309 return ret; 3310 } 3311 3312 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3313 3314 /* 3315 * The protocol uses two different sets of rkeys (mutually exclusive): 3316 * 1. One key to represent the virtual address of the entire ram block. 3317 * (dynamic chunk registration disabled - pin everything with one rkey.) 3318 * 2. One to represent individual chunks within a ram block. 3319 * (dynamic chunk registration enabled - pin individual chunks.) 3320 * 3321 * Once the capability is successfully negotiated, the destination transmits 3322 * the keys to use (or sends them later) including the virtual addresses 3323 * and then propagates the remote ram block descriptions to his local copy. 3324 */ 3325 3326 if (local->nb_blocks != nb_dest_blocks) { 3327 ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) " 3328 "Your QEMU command line parameters are probably " 3329 "not identical on both the source and destination.", 3330 local->nb_blocks, nb_dest_blocks); 3331 rdma->error_state = -EINVAL; 3332 return -EINVAL; 3333 } 3334 3335 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 3336 memcpy(rdma->dest_blocks, 3337 rdma->wr_data[reg_result_idx].control_curr, resp.len); 3338 for (i = 0; i < nb_dest_blocks; i++) { 3339 network_to_dest_block(&rdma->dest_blocks[i]); 3340 3341 /* We require that the blocks are in the same order */ 3342 if (rdma->dest_blocks[i].length != local->block[i].length) { 3343 ERROR(errp, "Block %s/%d has a different length %" PRIu64 3344 "vs %" PRIu64, local->block[i].block_name, i, 3345 local->block[i].length, 3346 rdma->dest_blocks[i].length); 3347 rdma->error_state = -EINVAL; 3348 return -EINVAL; 3349 } 3350 local->block[i].remote_host_addr = 3351 rdma->dest_blocks[i].remote_host_addr; 3352 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; 3353 } 3354 } 3355 3356 trace_qemu_rdma_registration_stop(flags); 3357 3358 head.type = RDMA_CONTROL_REGISTER_FINISHED; 3359 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); 3360 3361 if (ret < 0) { 3362 goto err; 3363 } 3364 3365 return 0; 3366 err: 3367 rdma->error_state = ret; 3368 return ret; 3369 } 3370 3371 static int qemu_rdma_get_fd(void *opaque) 3372 { 3373 QEMUFileRDMA *rfile = opaque; 3374 RDMAContext *rdma = rfile->rdma; 3375 3376 return rdma->comp_channel->fd; 3377 } 3378 3379 static const QEMUFileOps rdma_read_ops = { 3380 .get_buffer = qemu_rdma_get_buffer, 3381 .get_fd = qemu_rdma_get_fd, 3382 .close = qemu_rdma_close, 3383 .hook_ram_load = rdma_load_hook, 3384 }; 3385 3386 static const QEMUFileOps rdma_write_ops = { 3387 .put_buffer = qemu_rdma_put_buffer, 3388 .close = qemu_rdma_close, 3389 .before_ram_iterate = qemu_rdma_registration_start, 3390 .after_ram_iterate = qemu_rdma_registration_stop, 3391 .save_page = qemu_rdma_save_page, 3392 }; 3393 3394 static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) 3395 { 3396 QEMUFileRDMA *r; 3397 3398 if (qemu_file_mode_is_not_valid(mode)) { 3399 return NULL; 3400 } 3401 3402 r = g_malloc0(sizeof(QEMUFileRDMA)); 3403 r->rdma = rdma; 3404 3405 if (mode[0] == 'w') { 3406 r->file = qemu_fopen_ops(r, &rdma_write_ops); 3407 } else { 3408 r->file = qemu_fopen_ops(r, &rdma_read_ops); 3409 } 3410 3411 return r->file; 3412 } 3413 3414 static void rdma_accept_incoming_migration(void *opaque) 3415 { 3416 RDMAContext *rdma = opaque; 3417 int ret; 3418 QEMUFile *f; 3419 Error *local_err = NULL, **errp = &local_err; 3420 3421 trace_qemu_rdma_accept_incoming_migration(); 3422 ret = qemu_rdma_accept(rdma); 3423 3424 if (ret) { 3425 ERROR(errp, "RDMA Migration initialization failed!"); 3426 return; 3427 } 3428 3429 trace_qemu_rdma_accept_incoming_migration_accepted(); 3430 3431 f = qemu_fopen_rdma(rdma, "rb"); 3432 if (f == NULL) { 3433 ERROR(errp, "could not qemu_fopen_rdma!"); 3434 qemu_rdma_cleanup(rdma); 3435 return; 3436 } 3437 3438 rdma->migration_started_on_destination = 1; 3439 process_incoming_migration(f); 3440 } 3441 3442 void rdma_start_incoming_migration(const char *host_port, Error **errp) 3443 { 3444 int ret; 3445 RDMAContext *rdma; 3446 Error *local_err = NULL; 3447 3448 trace_rdma_start_incoming_migration(); 3449 rdma = qemu_rdma_data_init(host_port, &local_err); 3450 3451 if (rdma == NULL) { 3452 goto err; 3453 } 3454 3455 ret = qemu_rdma_dest_init(rdma, &local_err); 3456 3457 if (ret) { 3458 goto err; 3459 } 3460 3461 trace_rdma_start_incoming_migration_after_dest_init(); 3462 3463 ret = rdma_listen(rdma->listen_id, 5); 3464 3465 if (ret) { 3466 ERROR(errp, "listening on socket!"); 3467 goto err; 3468 } 3469 3470 trace_rdma_start_incoming_migration_after_rdma_listen(); 3471 3472 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3473 NULL, (void *)(intptr_t)rdma); 3474 return; 3475 err: 3476 error_propagate(errp, local_err); 3477 g_free(rdma); 3478 } 3479 3480 void rdma_start_outgoing_migration(void *opaque, 3481 const char *host_port, Error **errp) 3482 { 3483 MigrationState *s = opaque; 3484 Error *local_err = NULL, **temp = &local_err; 3485 RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err); 3486 int ret = 0; 3487 3488 if (rdma == NULL) { 3489 ERROR(temp, "Failed to initialize RDMA data structures! %d", ret); 3490 goto err; 3491 } 3492 3493 ret = qemu_rdma_source_init(rdma, &local_err, 3494 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]); 3495 3496 if (ret) { 3497 goto err; 3498 } 3499 3500 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 3501 ret = qemu_rdma_connect(rdma, &local_err); 3502 3503 if (ret) { 3504 goto err; 3505 } 3506 3507 trace_rdma_start_outgoing_migration_after_rdma_connect(); 3508 3509 s->file = qemu_fopen_rdma(rdma, "wb"); 3510 migrate_fd_connect(s); 3511 return; 3512 err: 3513 error_propagate(errp, local_err); 3514 g_free(rdma); 3515 migrate_fd_error(s); 3516 } 3517