1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * 6 * Authors: 7 * Michael R. Hines <mrhines@us.ibm.com> 8 * Jiuxing Liu <jl@us.ibm.com> 9 * 10 * This work is licensed under the terms of the GNU GPL, version 2 or 11 * later. See the COPYING file in the top-level directory. 12 * 13 */ 14 #include "qemu-common.h" 15 #include "migration/migration.h" 16 #include "migration/qemu-file.h" 17 #include "exec/cpu-common.h" 18 #include "qemu/main-loop.h" 19 #include "qemu/sockets.h" 20 #include "qemu/bitmap.h" 21 #include "block/coroutine.h" 22 #include <stdio.h> 23 #include <sys/types.h> 24 #include <sys/socket.h> 25 #include <netdb.h> 26 #include <arpa/inet.h> 27 #include <string.h> 28 #include <rdma/rdma_cma.h> 29 #include "trace.h" 30 31 /* 32 * Print and error on both the Monitor and the Log file. 33 */ 34 #define ERROR(errp, fmt, ...) \ 35 do { \ 36 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \ 37 if (errp && (*(errp) == NULL)) { \ 38 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \ 39 } \ 40 } while (0) 41 42 #define RDMA_RESOLVE_TIMEOUT_MS 10000 43 44 /* Do not merge data if larger than this. */ 45 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 46 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 47 48 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 49 50 /* 51 * This is only for non-live state being migrated. 52 * Instead of RDMA_WRITE messages, we use RDMA_SEND 53 * messages for that state, which requires a different 54 * delivery design than main memory. 55 */ 56 #define RDMA_SEND_INCREMENT 32768 57 58 /* 59 * Maximum size infiniband SEND message 60 */ 61 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 62 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 63 64 #define RDMA_CONTROL_VERSION_CURRENT 1 65 /* 66 * Capabilities for negotiation. 67 */ 68 #define RDMA_CAPABILITY_PIN_ALL 0x01 69 70 /* 71 * Add the other flags above to this list of known capabilities 72 * as they are introduced. 73 */ 74 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 75 76 #define CHECK_ERROR_STATE() \ 77 do { \ 78 if (rdma->error_state) { \ 79 if (!rdma->error_reported) { \ 80 error_report("RDMA is in an error state waiting migration" \ 81 " to abort!"); \ 82 rdma->error_reported = 1; \ 83 } \ 84 return rdma->error_state; \ 85 } \ 86 } while (0); 87 88 /* 89 * A work request ID is 64-bits and we split up these bits 90 * into 3 parts: 91 * 92 * bits 0-15 : type of control message, 2^16 93 * bits 16-29: ram block index, 2^14 94 * bits 30-63: ram block chunk number, 2^34 95 * 96 * The last two bit ranges are only used for RDMA writes, 97 * in order to track their completion and potentially 98 * also track unregistration status of the message. 99 */ 100 #define RDMA_WRID_TYPE_SHIFT 0UL 101 #define RDMA_WRID_BLOCK_SHIFT 16UL 102 #define RDMA_WRID_CHUNK_SHIFT 30UL 103 104 #define RDMA_WRID_TYPE_MASK \ 105 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 106 107 #define RDMA_WRID_BLOCK_MASK \ 108 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 109 110 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 111 112 /* 113 * RDMA migration protocol: 114 * 1. RDMA Writes (data messages, i.e. RAM) 115 * 2. IB Send/Recv (control channel messages) 116 */ 117 enum { 118 RDMA_WRID_NONE = 0, 119 RDMA_WRID_RDMA_WRITE = 1, 120 RDMA_WRID_SEND_CONTROL = 2000, 121 RDMA_WRID_RECV_CONTROL = 4000, 122 }; 123 124 static const char *wrid_desc[] = { 125 [RDMA_WRID_NONE] = "NONE", 126 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA", 127 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND", 128 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", 129 }; 130 131 /* 132 * Work request IDs for IB SEND messages only (not RDMA writes). 133 * This is used by the migration protocol to transmit 134 * control messages (such as device state and registration commands) 135 * 136 * We could use more WRs, but we have enough for now. 137 */ 138 enum { 139 RDMA_WRID_READY = 0, 140 RDMA_WRID_DATA, 141 RDMA_WRID_CONTROL, 142 RDMA_WRID_MAX, 143 }; 144 145 /* 146 * SEND/RECV IB Control Messages. 147 */ 148 enum { 149 RDMA_CONTROL_NONE = 0, 150 RDMA_CONTROL_ERROR, 151 RDMA_CONTROL_READY, /* ready to receive */ 152 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 153 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 154 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 155 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 156 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 157 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 158 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 159 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 160 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 161 }; 162 163 static const char *control_desc[] = { 164 [RDMA_CONTROL_NONE] = "NONE", 165 [RDMA_CONTROL_ERROR] = "ERROR", 166 [RDMA_CONTROL_READY] = "READY", 167 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 168 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 169 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 170 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 171 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 172 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 173 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 174 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 175 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 176 }; 177 178 /* 179 * Memory and MR structures used to represent an IB Send/Recv work request. 180 * This is *not* used for RDMA writes, only IB Send/Recv. 181 */ 182 typedef struct { 183 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 184 struct ibv_mr *control_mr; /* registration metadata */ 185 size_t control_len; /* length of the message */ 186 uint8_t *control_curr; /* start of unconsumed bytes */ 187 } RDMAWorkRequestData; 188 189 /* 190 * Negotiate RDMA capabilities during connection-setup time. 191 */ 192 typedef struct { 193 uint32_t version; 194 uint32_t flags; 195 } RDMACapabilities; 196 197 static void caps_to_network(RDMACapabilities *cap) 198 { 199 cap->version = htonl(cap->version); 200 cap->flags = htonl(cap->flags); 201 } 202 203 static void network_to_caps(RDMACapabilities *cap) 204 { 205 cap->version = ntohl(cap->version); 206 cap->flags = ntohl(cap->flags); 207 } 208 209 /* 210 * Representation of a RAMBlock from an RDMA perspective. 211 * This is not transmitted, only local. 212 * This and subsequent structures cannot be linked lists 213 * because we're using a single IB message to transmit 214 * the information. It's small anyway, so a list is overkill. 215 */ 216 typedef struct RDMALocalBlock { 217 uint8_t *local_host_addr; /* local virtual address */ 218 uint64_t remote_host_addr; /* remote virtual address */ 219 uint64_t offset; 220 uint64_t length; 221 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 222 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 223 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 224 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 225 int index; /* which block are we */ 226 bool is_ram_block; 227 int nb_chunks; 228 unsigned long *transit_bitmap; 229 unsigned long *unregister_bitmap; 230 } RDMALocalBlock; 231 232 /* 233 * Also represents a RAMblock, but only on the dest. 234 * This gets transmitted by the dest during connection-time 235 * to the source VM and then is used to populate the 236 * corresponding RDMALocalBlock with 237 * the information needed to perform the actual RDMA. 238 */ 239 typedef struct QEMU_PACKED RDMADestBlock { 240 uint64_t remote_host_addr; 241 uint64_t offset; 242 uint64_t length; 243 uint32_t remote_rkey; 244 uint32_t padding; 245 } RDMADestBlock; 246 247 static uint64_t htonll(uint64_t v) 248 { 249 union { uint32_t lv[2]; uint64_t llv; } u; 250 u.lv[0] = htonl(v >> 32); 251 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 252 return u.llv; 253 } 254 255 static uint64_t ntohll(uint64_t v) { 256 union { uint32_t lv[2]; uint64_t llv; } u; 257 u.llv = v; 258 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 259 } 260 261 static void dest_block_to_network(RDMADestBlock *db) 262 { 263 db->remote_host_addr = htonll(db->remote_host_addr); 264 db->offset = htonll(db->offset); 265 db->length = htonll(db->length); 266 db->remote_rkey = htonl(db->remote_rkey); 267 } 268 269 static void network_to_dest_block(RDMADestBlock *db) 270 { 271 db->remote_host_addr = ntohll(db->remote_host_addr); 272 db->offset = ntohll(db->offset); 273 db->length = ntohll(db->length); 274 db->remote_rkey = ntohl(db->remote_rkey); 275 } 276 277 /* 278 * Virtual address of the above structures used for transmitting 279 * the RAMBlock descriptions at connection-time. 280 * This structure is *not* transmitted. 281 */ 282 typedef struct RDMALocalBlocks { 283 int nb_blocks; 284 bool init; /* main memory init complete */ 285 RDMALocalBlock *block; 286 } RDMALocalBlocks; 287 288 /* 289 * Main data structure for RDMA state. 290 * While there is only one copy of this structure being allocated right now, 291 * this is the place where one would start if you wanted to consider 292 * having more than one RDMA connection open at the same time. 293 */ 294 typedef struct RDMAContext { 295 char *host; 296 int port; 297 298 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 299 300 /* 301 * This is used by *_exchange_send() to figure out whether or not 302 * the initial "READY" message has already been received or not. 303 * This is because other functions may potentially poll() and detect 304 * the READY message before send() does, in which case we need to 305 * know if it completed. 306 */ 307 int control_ready_expected; 308 309 /* number of outstanding writes */ 310 int nb_sent; 311 312 /* store info about current buffer so that we can 313 merge it with future sends */ 314 uint64_t current_addr; 315 uint64_t current_length; 316 /* index of ram block the current buffer belongs to */ 317 int current_index; 318 /* index of the chunk in the current ram block */ 319 int current_chunk; 320 321 bool pin_all; 322 323 /* 324 * infiniband-specific variables for opening the device 325 * and maintaining connection state and so forth. 326 * 327 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 328 * cm_id->verbs, cm_id->channel, and cm_id->qp. 329 */ 330 struct rdma_cm_id *cm_id; /* connection manager ID */ 331 struct rdma_cm_id *listen_id; 332 bool connected; 333 334 struct ibv_context *verbs; 335 struct rdma_event_channel *channel; 336 struct ibv_qp *qp; /* queue pair */ 337 struct ibv_comp_channel *comp_channel; /* completion channel */ 338 struct ibv_pd *pd; /* protection domain */ 339 struct ibv_cq *cq; /* completion queue */ 340 341 /* 342 * If a previous write failed (perhaps because of a failed 343 * memory registration, then do not attempt any future work 344 * and remember the error state. 345 */ 346 int error_state; 347 int error_reported; 348 349 /* 350 * Description of ram blocks used throughout the code. 351 */ 352 RDMALocalBlocks local_ram_blocks; 353 RDMADestBlock *dest_blocks; 354 355 /* 356 * Migration on *destination* started. 357 * Then use coroutine yield function. 358 * Source runs in a thread, so we don't care. 359 */ 360 int migration_started_on_destination; 361 362 int total_registrations; 363 int total_writes; 364 365 int unregister_current, unregister_next; 366 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 367 368 GHashTable *blockmap; 369 } RDMAContext; 370 371 /* 372 * Interface to the rest of the migration call stack. 373 */ 374 typedef struct QEMUFileRDMA { 375 RDMAContext *rdma; 376 size_t len; 377 void *file; 378 } QEMUFileRDMA; 379 380 /* 381 * Main structure for IB Send/Recv control messages. 382 * This gets prepended at the beginning of every Send/Recv. 383 */ 384 typedef struct QEMU_PACKED { 385 uint32_t len; /* Total length of data portion */ 386 uint32_t type; /* which control command to perform */ 387 uint32_t repeat; /* number of commands in data portion of same type */ 388 uint32_t padding; 389 } RDMAControlHeader; 390 391 static void control_to_network(RDMAControlHeader *control) 392 { 393 control->type = htonl(control->type); 394 control->len = htonl(control->len); 395 control->repeat = htonl(control->repeat); 396 } 397 398 static void network_to_control(RDMAControlHeader *control) 399 { 400 control->type = ntohl(control->type); 401 control->len = ntohl(control->len); 402 control->repeat = ntohl(control->repeat); 403 } 404 405 /* 406 * Register a single Chunk. 407 * Information sent by the source VM to inform the dest 408 * to register an single chunk of memory before we can perform 409 * the actual RDMA operation. 410 */ 411 typedef struct QEMU_PACKED { 412 union QEMU_PACKED { 413 uint64_t current_addr; /* offset into the ramblock of the chunk */ 414 uint64_t chunk; /* chunk to lookup if unregistering */ 415 } key; 416 uint32_t current_index; /* which ramblock the chunk belongs to */ 417 uint32_t padding; 418 uint64_t chunks; /* how many sequential chunks to register */ 419 } RDMARegister; 420 421 static void register_to_network(RDMARegister *reg) 422 { 423 reg->key.current_addr = htonll(reg->key.current_addr); 424 reg->current_index = htonl(reg->current_index); 425 reg->chunks = htonll(reg->chunks); 426 } 427 428 static void network_to_register(RDMARegister *reg) 429 { 430 reg->key.current_addr = ntohll(reg->key.current_addr); 431 reg->current_index = ntohl(reg->current_index); 432 reg->chunks = ntohll(reg->chunks); 433 } 434 435 typedef struct QEMU_PACKED { 436 uint32_t value; /* if zero, we will madvise() */ 437 uint32_t block_idx; /* which ram block index */ 438 uint64_t offset; /* where in the remote ramblock this chunk */ 439 uint64_t length; /* length of the chunk */ 440 } RDMACompress; 441 442 static void compress_to_network(RDMACompress *comp) 443 { 444 comp->value = htonl(comp->value); 445 comp->block_idx = htonl(comp->block_idx); 446 comp->offset = htonll(comp->offset); 447 comp->length = htonll(comp->length); 448 } 449 450 static void network_to_compress(RDMACompress *comp) 451 { 452 comp->value = ntohl(comp->value); 453 comp->block_idx = ntohl(comp->block_idx); 454 comp->offset = ntohll(comp->offset); 455 comp->length = ntohll(comp->length); 456 } 457 458 /* 459 * The result of the dest's memory registration produces an "rkey" 460 * which the source VM must reference in order to perform 461 * the RDMA operation. 462 */ 463 typedef struct QEMU_PACKED { 464 uint32_t rkey; 465 uint32_t padding; 466 uint64_t host_addr; 467 } RDMARegisterResult; 468 469 static void result_to_network(RDMARegisterResult *result) 470 { 471 result->rkey = htonl(result->rkey); 472 result->host_addr = htonll(result->host_addr); 473 }; 474 475 static void network_to_result(RDMARegisterResult *result) 476 { 477 result->rkey = ntohl(result->rkey); 478 result->host_addr = ntohll(result->host_addr); 479 }; 480 481 const char *print_wrid(int wrid); 482 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 483 uint8_t *data, RDMAControlHeader *resp, 484 int *resp_idx, 485 int (*callback)(RDMAContext *rdma)); 486 487 static inline uint64_t ram_chunk_index(const uint8_t *start, 488 const uint8_t *host) 489 { 490 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 491 } 492 493 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 494 uint64_t i) 495 { 496 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 497 (i << RDMA_REG_CHUNK_SHIFT)); 498 } 499 500 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 501 uint64_t i) 502 { 503 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 504 (1UL << RDMA_REG_CHUNK_SHIFT); 505 506 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 507 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 508 } 509 510 return result; 511 } 512 513 static int rdma_add_block(RDMAContext *rdma, void *host_addr, 514 ram_addr_t block_offset, uint64_t length) 515 { 516 RDMALocalBlocks *local = &rdma->local_ram_blocks; 517 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 518 (void *)(uintptr_t)block_offset); 519 RDMALocalBlock *old = local->block; 520 521 assert(block == NULL); 522 523 local->block = g_malloc0(sizeof(RDMALocalBlock) * (local->nb_blocks + 1)); 524 525 if (local->nb_blocks) { 526 int x; 527 528 for (x = 0; x < local->nb_blocks; x++) { 529 g_hash_table_remove(rdma->blockmap, 530 (void *)(uintptr_t)old[x].offset); 531 g_hash_table_insert(rdma->blockmap, 532 (void *)(uintptr_t)old[x].offset, 533 &local->block[x]); 534 } 535 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 536 g_free(old); 537 } 538 539 block = &local->block[local->nb_blocks]; 540 541 block->local_host_addr = host_addr; 542 block->offset = block_offset; 543 block->length = length; 544 block->index = local->nb_blocks; 545 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 546 block->transit_bitmap = bitmap_new(block->nb_chunks); 547 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 548 block->unregister_bitmap = bitmap_new(block->nb_chunks); 549 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 550 block->remote_keys = g_malloc0(block->nb_chunks * sizeof(uint32_t)); 551 552 block->is_ram_block = local->init ? false : true; 553 554 g_hash_table_insert(rdma->blockmap, (void *) block_offset, block); 555 556 trace_rdma_add_block(local->nb_blocks, (uintptr_t) block->local_host_addr, 557 block->offset, block->length, 558 (uintptr_t) (block->local_host_addr + block->length), 559 BITS_TO_LONGS(block->nb_chunks) * 560 sizeof(unsigned long) * 8, 561 block->nb_chunks); 562 563 local->nb_blocks++; 564 565 return 0; 566 } 567 568 /* 569 * Memory regions need to be registered with the device and queue pairs setup 570 * in advanced before the migration starts. This tells us where the RAM blocks 571 * are so that we can register them individually. 572 */ 573 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr, 574 ram_addr_t block_offset, ram_addr_t length, void *opaque) 575 { 576 return rdma_add_block(opaque, host_addr, block_offset, length); 577 } 578 579 /* 580 * Identify the RAMBlocks and their quantity. They will be references to 581 * identify chunk boundaries inside each RAMBlock and also be referenced 582 * during dynamic page registration. 583 */ 584 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma) 585 { 586 RDMALocalBlocks *local = &rdma->local_ram_blocks; 587 588 assert(rdma->blockmap == NULL); 589 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 590 memset(local, 0, sizeof *local); 591 qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma); 592 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 593 rdma->dest_blocks = (RDMADestBlock *) g_malloc0(sizeof(RDMADestBlock) * 594 rdma->local_ram_blocks.nb_blocks); 595 local->init = true; 596 return 0; 597 } 598 599 static int rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset) 600 { 601 RDMALocalBlocks *local = &rdma->local_ram_blocks; 602 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 603 (void *) block_offset); 604 RDMALocalBlock *old = local->block; 605 int x; 606 607 assert(block); 608 609 if (block->pmr) { 610 int j; 611 612 for (j = 0; j < block->nb_chunks; j++) { 613 if (!block->pmr[j]) { 614 continue; 615 } 616 ibv_dereg_mr(block->pmr[j]); 617 rdma->total_registrations--; 618 } 619 g_free(block->pmr); 620 block->pmr = NULL; 621 } 622 623 if (block->mr) { 624 ibv_dereg_mr(block->mr); 625 rdma->total_registrations--; 626 block->mr = NULL; 627 } 628 629 g_free(block->transit_bitmap); 630 block->transit_bitmap = NULL; 631 632 g_free(block->unregister_bitmap); 633 block->unregister_bitmap = NULL; 634 635 g_free(block->remote_keys); 636 block->remote_keys = NULL; 637 638 for (x = 0; x < local->nb_blocks; x++) { 639 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)old[x].offset); 640 } 641 642 if (local->nb_blocks > 1) { 643 644 local->block = g_malloc0(sizeof(RDMALocalBlock) * 645 (local->nb_blocks - 1)); 646 647 if (block->index) { 648 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 649 } 650 651 if (block->index < (local->nb_blocks - 1)) { 652 memcpy(local->block + block->index, old + (block->index + 1), 653 sizeof(RDMALocalBlock) * 654 (local->nb_blocks - (block->index + 1))); 655 } 656 } else { 657 assert(block == local->block); 658 local->block = NULL; 659 } 660 661 trace_rdma_delete_block(local->nb_blocks, 662 (uintptr_t)block->local_host_addr, 663 block->offset, block->length, 664 (uintptr_t)(block->local_host_addr + block->length), 665 BITS_TO_LONGS(block->nb_chunks) * 666 sizeof(unsigned long) * 8, block->nb_chunks); 667 668 g_free(old); 669 670 local->nb_blocks--; 671 672 if (local->nb_blocks) { 673 for (x = 0; x < local->nb_blocks; x++) { 674 g_hash_table_insert(rdma->blockmap, 675 (void *)(uintptr_t)local->block[x].offset, 676 &local->block[x]); 677 } 678 } 679 680 return 0; 681 } 682 683 /* 684 * Put in the log file which RDMA device was opened and the details 685 * associated with that device. 686 */ 687 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 688 { 689 struct ibv_port_attr port; 690 691 if (ibv_query_port(verbs, 1, &port)) { 692 error_report("Failed to query port information"); 693 return; 694 } 695 696 printf("%s RDMA Device opened: kernel name %s " 697 "uverbs device name %s, " 698 "infiniband_verbs class device path %s, " 699 "infiniband class device path %s, " 700 "transport: (%d) %s\n", 701 who, 702 verbs->device->name, 703 verbs->device->dev_name, 704 verbs->device->dev_path, 705 verbs->device->ibdev_path, 706 port.link_layer, 707 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" : 708 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 709 ? "Ethernet" : "Unknown")); 710 } 711 712 /* 713 * Put in the log file the RDMA gid addressing information, 714 * useful for folks who have trouble understanding the 715 * RDMA device hierarchy in the kernel. 716 */ 717 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 718 { 719 char sgid[33]; 720 char dgid[33]; 721 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 722 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 723 trace_qemu_rdma_dump_gid(who, sgid, dgid); 724 } 725 726 /* 727 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 728 * We will try the next addrinfo struct, and fail if there are 729 * no other valid addresses to bind against. 730 * 731 * If user is listening on '[::]', then we will not have a opened a device 732 * yet and have no way of verifying if the device is RoCE or not. 733 * 734 * In this case, the source VM will throw an error for ALL types of 735 * connections (both IPv4 and IPv6) if the destination machine does not have 736 * a regular infiniband network available for use. 737 * 738 * The only way to guarantee that an error is thrown for broken kernels is 739 * for the management software to choose a *specific* interface at bind time 740 * and validate what time of hardware it is. 741 * 742 * Unfortunately, this puts the user in a fix: 743 * 744 * If the source VM connects with an IPv4 address without knowing that the 745 * destination has bound to '[::]' the migration will unconditionally fail 746 * unless the management software is explicitly listening on the the IPv4 747 * address while using a RoCE-based device. 748 * 749 * If the source VM connects with an IPv6 address, then we're OK because we can 750 * throw an error on the source (and similarly on the destination). 751 * 752 * But in mixed environments, this will be broken for a while until it is fixed 753 * inside linux. 754 * 755 * We do provide a *tiny* bit of help in this function: We can list all of the 756 * devices in the system and check to see if all the devices are RoCE or 757 * Infiniband. 758 * 759 * If we detect that we have a *pure* RoCE environment, then we can safely 760 * thrown an error even if the management software has specified '[::]' as the 761 * bind address. 762 * 763 * However, if there is are multiple hetergeneous devices, then we cannot make 764 * this assumption and the user just has to be sure they know what they are 765 * doing. 766 * 767 * Patches are being reviewed on linux-rdma. 768 */ 769 static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs) 770 { 771 struct ibv_port_attr port_attr; 772 773 /* This bug only exists in linux, to our knowledge. */ 774 #ifdef CONFIG_LINUX 775 776 /* 777 * Verbs are only NULL if management has bound to '[::]'. 778 * 779 * Let's iterate through all the devices and see if there any pure IB 780 * devices (non-ethernet). 781 * 782 * If not, then we can safely proceed with the migration. 783 * Otherwise, there are no guarantees until the bug is fixed in linux. 784 */ 785 if (!verbs) { 786 int num_devices, x; 787 struct ibv_device ** dev_list = ibv_get_device_list(&num_devices); 788 bool roce_found = false; 789 bool ib_found = false; 790 791 for (x = 0; x < num_devices; x++) { 792 verbs = ibv_open_device(dev_list[x]); 793 if (!verbs) { 794 if (errno == EPERM) { 795 continue; 796 } else { 797 return -EINVAL; 798 } 799 } 800 801 if (ibv_query_port(verbs, 1, &port_attr)) { 802 ibv_close_device(verbs); 803 ERROR(errp, "Could not query initial IB port"); 804 return -EINVAL; 805 } 806 807 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 808 ib_found = true; 809 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 810 roce_found = true; 811 } 812 813 ibv_close_device(verbs); 814 815 } 816 817 if (roce_found) { 818 if (ib_found) { 819 fprintf(stderr, "WARN: migrations may fail:" 820 " IPv6 over RoCE / iWARP in linux" 821 " is broken. But since you appear to have a" 822 " mixed RoCE / IB environment, be sure to only" 823 " migrate over the IB fabric until the kernel " 824 " fixes the bug.\n"); 825 } else { 826 ERROR(errp, "You only have RoCE / iWARP devices in your systems" 827 " and your management software has specified '[::]'" 828 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 829 return -ENONET; 830 } 831 } 832 833 return 0; 834 } 835 836 /* 837 * If we have a verbs context, that means that some other than '[::]' was 838 * used by the management software for binding. In which case we can 839 * actually warn the user about a potentially broken kernel. 840 */ 841 842 /* IB ports start with 1, not 0 */ 843 if (ibv_query_port(verbs, 1, &port_attr)) { 844 ERROR(errp, "Could not query initial IB port"); 845 return -EINVAL; 846 } 847 848 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 849 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 " 850 "(but patches on linux-rdma in progress)"); 851 return -ENONET; 852 } 853 854 #endif 855 856 return 0; 857 } 858 859 /* 860 * Figure out which RDMA device corresponds to the requested IP hostname 861 * Also create the initial connection manager identifiers for opening 862 * the connection. 863 */ 864 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 865 { 866 int ret; 867 struct rdma_addrinfo *res; 868 char port_str[16]; 869 struct rdma_cm_event *cm_event; 870 char ip[40] = "unknown"; 871 struct rdma_addrinfo *e; 872 873 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 874 ERROR(errp, "RDMA hostname has not been set"); 875 return -EINVAL; 876 } 877 878 /* create CM channel */ 879 rdma->channel = rdma_create_event_channel(); 880 if (!rdma->channel) { 881 ERROR(errp, "could not create CM channel"); 882 return -EINVAL; 883 } 884 885 /* create CM id */ 886 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 887 if (ret) { 888 ERROR(errp, "could not create channel id"); 889 goto err_resolve_create_id; 890 } 891 892 snprintf(port_str, 16, "%d", rdma->port); 893 port_str[15] = '\0'; 894 895 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 896 if (ret < 0) { 897 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 898 goto err_resolve_get_addr; 899 } 900 901 for (e = res; e != NULL; e = e->ai_next) { 902 inet_ntop(e->ai_family, 903 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 904 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 905 906 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 907 RDMA_RESOLVE_TIMEOUT_MS); 908 if (!ret) { 909 if (e->ai_family == AF_INET6) { 910 ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs); 911 if (ret) { 912 continue; 913 } 914 } 915 goto route; 916 } 917 } 918 919 ERROR(errp, "could not resolve address %s", rdma->host); 920 goto err_resolve_get_addr; 921 922 route: 923 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 924 925 ret = rdma_get_cm_event(rdma->channel, &cm_event); 926 if (ret) { 927 ERROR(errp, "could not perform event_addr_resolved"); 928 goto err_resolve_get_addr; 929 } 930 931 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 932 ERROR(errp, "result not equal to event_addr_resolved %s", 933 rdma_event_str(cm_event->event)); 934 perror("rdma_resolve_addr"); 935 rdma_ack_cm_event(cm_event); 936 ret = -EINVAL; 937 goto err_resolve_get_addr; 938 } 939 rdma_ack_cm_event(cm_event); 940 941 /* resolve route */ 942 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 943 if (ret) { 944 ERROR(errp, "could not resolve rdma route"); 945 goto err_resolve_get_addr; 946 } 947 948 ret = rdma_get_cm_event(rdma->channel, &cm_event); 949 if (ret) { 950 ERROR(errp, "could not perform event_route_resolved"); 951 goto err_resolve_get_addr; 952 } 953 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 954 ERROR(errp, "result not equal to event_route_resolved: %s", 955 rdma_event_str(cm_event->event)); 956 rdma_ack_cm_event(cm_event); 957 ret = -EINVAL; 958 goto err_resolve_get_addr; 959 } 960 rdma_ack_cm_event(cm_event); 961 rdma->verbs = rdma->cm_id->verbs; 962 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 963 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 964 return 0; 965 966 err_resolve_get_addr: 967 rdma_destroy_id(rdma->cm_id); 968 rdma->cm_id = NULL; 969 err_resolve_create_id: 970 rdma_destroy_event_channel(rdma->channel); 971 rdma->channel = NULL; 972 return ret; 973 } 974 975 /* 976 * Create protection domain and completion queues 977 */ 978 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma) 979 { 980 /* allocate pd */ 981 rdma->pd = ibv_alloc_pd(rdma->verbs); 982 if (!rdma->pd) { 983 error_report("failed to allocate protection domain"); 984 return -1; 985 } 986 987 /* create completion channel */ 988 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs); 989 if (!rdma->comp_channel) { 990 error_report("failed to allocate completion channel"); 991 goto err_alloc_pd_cq; 992 } 993 994 /* 995 * Completion queue can be filled by both read and write work requests, 996 * so must reflect the sum of both possible queue sizes. 997 */ 998 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 999 NULL, rdma->comp_channel, 0); 1000 if (!rdma->cq) { 1001 error_report("failed to allocate completion queue"); 1002 goto err_alloc_pd_cq; 1003 } 1004 1005 return 0; 1006 1007 err_alloc_pd_cq: 1008 if (rdma->pd) { 1009 ibv_dealloc_pd(rdma->pd); 1010 } 1011 if (rdma->comp_channel) { 1012 ibv_destroy_comp_channel(rdma->comp_channel); 1013 } 1014 rdma->pd = NULL; 1015 rdma->comp_channel = NULL; 1016 return -1; 1017 1018 } 1019 1020 /* 1021 * Create queue pairs. 1022 */ 1023 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1024 { 1025 struct ibv_qp_init_attr attr = { 0 }; 1026 int ret; 1027 1028 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1029 attr.cap.max_recv_wr = 3; 1030 attr.cap.max_send_sge = 1; 1031 attr.cap.max_recv_sge = 1; 1032 attr.send_cq = rdma->cq; 1033 attr.recv_cq = rdma->cq; 1034 attr.qp_type = IBV_QPT_RC; 1035 1036 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr); 1037 if (ret) { 1038 return -1; 1039 } 1040 1041 rdma->qp = rdma->cm_id->qp; 1042 return 0; 1043 } 1044 1045 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma) 1046 { 1047 int i; 1048 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1049 1050 for (i = 0; i < local->nb_blocks; i++) { 1051 local->block[i].mr = 1052 ibv_reg_mr(rdma->pd, 1053 local->block[i].local_host_addr, 1054 local->block[i].length, 1055 IBV_ACCESS_LOCAL_WRITE | 1056 IBV_ACCESS_REMOTE_WRITE 1057 ); 1058 if (!local->block[i].mr) { 1059 perror("Failed to register local dest ram block!\n"); 1060 break; 1061 } 1062 rdma->total_registrations++; 1063 } 1064 1065 if (i >= local->nb_blocks) { 1066 return 0; 1067 } 1068 1069 for (i--; i >= 0; i--) { 1070 ibv_dereg_mr(local->block[i].mr); 1071 rdma->total_registrations--; 1072 } 1073 1074 return -1; 1075 1076 } 1077 1078 /* 1079 * Find the ram block that corresponds to the page requested to be 1080 * transmitted by QEMU. 1081 * 1082 * Once the block is found, also identify which 'chunk' within that 1083 * block that the page belongs to. 1084 * 1085 * This search cannot fail or the migration will fail. 1086 */ 1087 static int qemu_rdma_search_ram_block(RDMAContext *rdma, 1088 uintptr_t block_offset, 1089 uint64_t offset, 1090 uint64_t length, 1091 uint64_t *block_index, 1092 uint64_t *chunk_index) 1093 { 1094 uint64_t current_addr = block_offset + offset; 1095 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1096 (void *) block_offset); 1097 assert(block); 1098 assert(current_addr >= block->offset); 1099 assert((current_addr + length) <= (block->offset + block->length)); 1100 1101 *block_index = block->index; 1102 *chunk_index = ram_chunk_index(block->local_host_addr, 1103 block->local_host_addr + (current_addr - block->offset)); 1104 1105 return 0; 1106 } 1107 1108 /* 1109 * Register a chunk with IB. If the chunk was already registered 1110 * previously, then skip. 1111 * 1112 * Also return the keys associated with the registration needed 1113 * to perform the actual RDMA operation. 1114 */ 1115 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1116 RDMALocalBlock *block, uintptr_t host_addr, 1117 uint32_t *lkey, uint32_t *rkey, int chunk, 1118 uint8_t *chunk_start, uint8_t *chunk_end) 1119 { 1120 if (block->mr) { 1121 if (lkey) { 1122 *lkey = block->mr->lkey; 1123 } 1124 if (rkey) { 1125 *rkey = block->mr->rkey; 1126 } 1127 return 0; 1128 } 1129 1130 /* allocate memory to store chunk MRs */ 1131 if (!block->pmr) { 1132 block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *)); 1133 } 1134 1135 /* 1136 * If 'rkey', then we're the destination, so grant access to the source. 1137 * 1138 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1139 */ 1140 if (!block->pmr[chunk]) { 1141 uint64_t len = chunk_end - chunk_start; 1142 1143 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1144 1145 block->pmr[chunk] = ibv_reg_mr(rdma->pd, 1146 chunk_start, len, 1147 (rkey ? (IBV_ACCESS_LOCAL_WRITE | 1148 IBV_ACCESS_REMOTE_WRITE) : 0)); 1149 1150 if (!block->pmr[chunk]) { 1151 perror("Failed to register chunk!"); 1152 fprintf(stderr, "Chunk details: block: %d chunk index %d" 1153 " start %" PRIuPTR " end %" PRIuPTR 1154 " host %" PRIuPTR 1155 " local %" PRIuPTR " registrations: %d\n", 1156 block->index, chunk, (uintptr_t)chunk_start, 1157 (uintptr_t)chunk_end, host_addr, 1158 (uintptr_t)block->local_host_addr, 1159 rdma->total_registrations); 1160 return -1; 1161 } 1162 rdma->total_registrations++; 1163 } 1164 1165 if (lkey) { 1166 *lkey = block->pmr[chunk]->lkey; 1167 } 1168 if (rkey) { 1169 *rkey = block->pmr[chunk]->rkey; 1170 } 1171 return 0; 1172 } 1173 1174 /* 1175 * Register (at connection time) the memory used for control 1176 * channel messages. 1177 */ 1178 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1179 { 1180 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1181 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1182 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1183 if (rdma->wr_data[idx].control_mr) { 1184 rdma->total_registrations++; 1185 return 0; 1186 } 1187 error_report("qemu_rdma_reg_control failed"); 1188 return -1; 1189 } 1190 1191 const char *print_wrid(int wrid) 1192 { 1193 if (wrid >= RDMA_WRID_RECV_CONTROL) { 1194 return wrid_desc[RDMA_WRID_RECV_CONTROL]; 1195 } 1196 return wrid_desc[wrid]; 1197 } 1198 1199 /* 1200 * RDMA requires memory registration (mlock/pinning), but this is not good for 1201 * overcommitment. 1202 * 1203 * In preparation for the future where LRU information or workload-specific 1204 * writable writable working set memory access behavior is available to QEMU 1205 * it would be nice to have in place the ability to UN-register/UN-pin 1206 * particular memory regions from the RDMA hardware when it is determine that 1207 * those regions of memory will likely not be accessed again in the near future. 1208 * 1209 * While we do not yet have such information right now, the following 1210 * compile-time option allows us to perform a non-optimized version of this 1211 * behavior. 1212 * 1213 * By uncommenting this option, you will cause *all* RDMA transfers to be 1214 * unregistered immediately after the transfer completes on both sides of the 1215 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode. 1216 * 1217 * This will have a terrible impact on migration performance, so until future 1218 * workload information or LRU information is available, do not attempt to use 1219 * this feature except for basic testing. 1220 */ 1221 //#define RDMA_UNREGISTRATION_EXAMPLE 1222 1223 /* 1224 * Perform a non-optimized memory unregistration after every transfer 1225 * for demonsration purposes, only if pin-all is not requested. 1226 * 1227 * Potential optimizations: 1228 * 1. Start a new thread to run this function continuously 1229 - for bit clearing 1230 - and for receipt of unregister messages 1231 * 2. Use an LRU. 1232 * 3. Use workload hints. 1233 */ 1234 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1235 { 1236 while (rdma->unregistrations[rdma->unregister_current]) { 1237 int ret; 1238 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1239 uint64_t chunk = 1240 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1241 uint64_t index = 1242 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1243 RDMALocalBlock *block = 1244 &(rdma->local_ram_blocks.block[index]); 1245 RDMARegister reg = { .current_index = index }; 1246 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1247 }; 1248 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1249 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1250 .repeat = 1, 1251 }; 1252 1253 trace_qemu_rdma_unregister_waiting_proc(chunk, 1254 rdma->unregister_current); 1255 1256 rdma->unregistrations[rdma->unregister_current] = 0; 1257 rdma->unregister_current++; 1258 1259 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1260 rdma->unregister_current = 0; 1261 } 1262 1263 1264 /* 1265 * Unregistration is speculative (because migration is single-threaded 1266 * and we cannot break the protocol's inifinband message ordering). 1267 * Thus, if the memory is currently being used for transmission, 1268 * then abort the attempt to unregister and try again 1269 * later the next time a completion is received for this memory. 1270 */ 1271 clear_bit(chunk, block->unregister_bitmap); 1272 1273 if (test_bit(chunk, block->transit_bitmap)) { 1274 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1275 continue; 1276 } 1277 1278 trace_qemu_rdma_unregister_waiting_send(chunk); 1279 1280 ret = ibv_dereg_mr(block->pmr[chunk]); 1281 block->pmr[chunk] = NULL; 1282 block->remote_keys[chunk] = 0; 1283 1284 if (ret != 0) { 1285 perror("unregistration chunk failed"); 1286 return -ret; 1287 } 1288 rdma->total_registrations--; 1289 1290 reg.key.chunk = chunk; 1291 register_to_network(®); 1292 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1293 &resp, NULL, NULL); 1294 if (ret < 0) { 1295 return ret; 1296 } 1297 1298 trace_qemu_rdma_unregister_waiting_complete(chunk); 1299 } 1300 1301 return 0; 1302 } 1303 1304 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1305 uint64_t chunk) 1306 { 1307 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1308 1309 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1310 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1311 1312 return result; 1313 } 1314 1315 /* 1316 * Set bit for unregistration in the next iteration. 1317 * We cannot transmit right here, but will unpin later. 1318 */ 1319 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index, 1320 uint64_t chunk, uint64_t wr_id) 1321 { 1322 if (rdma->unregistrations[rdma->unregister_next] != 0) { 1323 error_report("rdma migration: queue is full"); 1324 } else { 1325 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1326 1327 if (!test_and_set_bit(chunk, block->unregister_bitmap)) { 1328 trace_qemu_rdma_signal_unregister_append(chunk, 1329 rdma->unregister_next); 1330 1331 rdma->unregistrations[rdma->unregister_next++] = 1332 qemu_rdma_make_wrid(wr_id, index, chunk); 1333 1334 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) { 1335 rdma->unregister_next = 0; 1336 } 1337 } else { 1338 trace_qemu_rdma_signal_unregister_already(chunk); 1339 } 1340 } 1341 } 1342 1343 /* 1344 * Consult the connection manager to see a work request 1345 * (of any kind) has completed. 1346 * Return the work request ID that completed. 1347 */ 1348 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out, 1349 uint32_t *byte_len) 1350 { 1351 int ret; 1352 struct ibv_wc wc; 1353 uint64_t wr_id; 1354 1355 ret = ibv_poll_cq(rdma->cq, 1, &wc); 1356 1357 if (!ret) { 1358 *wr_id_out = RDMA_WRID_NONE; 1359 return 0; 1360 } 1361 1362 if (ret < 0) { 1363 error_report("ibv_poll_cq return %d", ret); 1364 return ret; 1365 } 1366 1367 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1368 1369 if (wc.status != IBV_WC_SUCCESS) { 1370 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n", 1371 wc.status, ibv_wc_status_str(wc.status)); 1372 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]); 1373 1374 return -1; 1375 } 1376 1377 if (rdma->control_ready_expected && 1378 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1379 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL], 1380 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent); 1381 rdma->control_ready_expected = 0; 1382 } 1383 1384 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1385 uint64_t chunk = 1386 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1387 uint64_t index = 1388 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1389 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1390 1391 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent, 1392 index, chunk, block->local_host_addr, 1393 (void *)(uintptr_t)block->remote_host_addr); 1394 1395 clear_bit(chunk, block->transit_bitmap); 1396 1397 if (rdma->nb_sent > 0) { 1398 rdma->nb_sent--; 1399 } 1400 1401 if (!rdma->pin_all) { 1402 /* 1403 * FYI: If one wanted to signal a specific chunk to be unregistered 1404 * using LRU or workload-specific information, this is the function 1405 * you would call to do so. That chunk would then get asynchronously 1406 * unregistered later. 1407 */ 1408 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1409 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id); 1410 #endif 1411 } 1412 } else { 1413 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent); 1414 } 1415 1416 *wr_id_out = wc.wr_id; 1417 if (byte_len) { 1418 *byte_len = wc.byte_len; 1419 } 1420 1421 return 0; 1422 } 1423 1424 /* 1425 * Block until the next work request has completed. 1426 * 1427 * First poll to see if a work request has already completed, 1428 * otherwise block. 1429 * 1430 * If we encounter completed work requests for IDs other than 1431 * the one we're interested in, then that's generally an error. 1432 * 1433 * The only exception is actual RDMA Write completions. These 1434 * completions only need to be recorded, but do not actually 1435 * need further processing. 1436 */ 1437 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested, 1438 uint32_t *byte_len) 1439 { 1440 int num_cq_events = 0, ret = 0; 1441 struct ibv_cq *cq; 1442 void *cq_ctx; 1443 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1444 1445 if (ibv_req_notify_cq(rdma->cq, 0)) { 1446 return -1; 1447 } 1448 /* poll cq first */ 1449 while (wr_id != wrid_requested) { 1450 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1451 if (ret < 0) { 1452 return ret; 1453 } 1454 1455 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1456 1457 if (wr_id == RDMA_WRID_NONE) { 1458 break; 1459 } 1460 if (wr_id != wrid_requested) { 1461 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1462 wrid_requested, print_wrid(wr_id), wr_id); 1463 } 1464 } 1465 1466 if (wr_id == wrid_requested) { 1467 return 0; 1468 } 1469 1470 while (1) { 1471 /* 1472 * Coroutine doesn't start until process_incoming_migration() 1473 * so don't yield unless we know we're running inside of a coroutine. 1474 */ 1475 if (rdma->migration_started_on_destination) { 1476 yield_until_fd_readable(rdma->comp_channel->fd); 1477 } 1478 1479 if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) { 1480 perror("ibv_get_cq_event"); 1481 goto err_block_for_wrid; 1482 } 1483 1484 num_cq_events++; 1485 1486 if (ibv_req_notify_cq(cq, 0)) { 1487 goto err_block_for_wrid; 1488 } 1489 1490 while (wr_id != wrid_requested) { 1491 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1492 if (ret < 0) { 1493 goto err_block_for_wrid; 1494 } 1495 1496 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1497 1498 if (wr_id == RDMA_WRID_NONE) { 1499 break; 1500 } 1501 if (wr_id != wrid_requested) { 1502 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1503 wrid_requested, print_wrid(wr_id), wr_id); 1504 } 1505 } 1506 1507 if (wr_id == wrid_requested) { 1508 goto success_block_for_wrid; 1509 } 1510 } 1511 1512 success_block_for_wrid: 1513 if (num_cq_events) { 1514 ibv_ack_cq_events(cq, num_cq_events); 1515 } 1516 return 0; 1517 1518 err_block_for_wrid: 1519 if (num_cq_events) { 1520 ibv_ack_cq_events(cq, num_cq_events); 1521 } 1522 return ret; 1523 } 1524 1525 /* 1526 * Post a SEND message work request for the control channel 1527 * containing some data and block until the post completes. 1528 */ 1529 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1530 RDMAControlHeader *head) 1531 { 1532 int ret = 0; 1533 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1534 struct ibv_send_wr *bad_wr; 1535 struct ibv_sge sge = { 1536 .addr = (uintptr_t)(wr->control), 1537 .length = head->len + sizeof(RDMAControlHeader), 1538 .lkey = wr->control_mr->lkey, 1539 }; 1540 struct ibv_send_wr send_wr = { 1541 .wr_id = RDMA_WRID_SEND_CONTROL, 1542 .opcode = IBV_WR_SEND, 1543 .send_flags = IBV_SEND_SIGNALED, 1544 .sg_list = &sge, 1545 .num_sge = 1, 1546 }; 1547 1548 trace_qemu_rdma_post_send_control(control_desc[head->type]); 1549 1550 /* 1551 * We don't actually need to do a memcpy() in here if we used 1552 * the "sge" properly, but since we're only sending control messages 1553 * (not RAM in a performance-critical path), then its OK for now. 1554 * 1555 * The copy makes the RDMAControlHeader simpler to manipulate 1556 * for the time being. 1557 */ 1558 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1559 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1560 control_to_network((void *) wr->control); 1561 1562 if (buf) { 1563 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1564 } 1565 1566 1567 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1568 1569 if (ret > 0) { 1570 error_report("Failed to use post IB SEND for control"); 1571 return -ret; 1572 } 1573 1574 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1575 if (ret < 0) { 1576 error_report("rdma migration: send polling control error"); 1577 } 1578 1579 return ret; 1580 } 1581 1582 /* 1583 * Post a RECV work request in anticipation of some future receipt 1584 * of data on the control channel. 1585 */ 1586 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx) 1587 { 1588 struct ibv_recv_wr *bad_wr; 1589 struct ibv_sge sge = { 1590 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1591 .length = RDMA_CONTROL_MAX_BUFFER, 1592 .lkey = rdma->wr_data[idx].control_mr->lkey, 1593 }; 1594 1595 struct ibv_recv_wr recv_wr = { 1596 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1597 .sg_list = &sge, 1598 .num_sge = 1, 1599 }; 1600 1601 1602 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1603 return -1; 1604 } 1605 1606 return 0; 1607 } 1608 1609 /* 1610 * Block and wait for a RECV control channel message to arrive. 1611 */ 1612 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1613 RDMAControlHeader *head, int expecting, int idx) 1614 { 1615 uint32_t byte_len; 1616 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1617 &byte_len); 1618 1619 if (ret < 0) { 1620 error_report("rdma migration: recv polling control error!"); 1621 return ret; 1622 } 1623 1624 network_to_control((void *) rdma->wr_data[idx].control); 1625 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1626 1627 trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]); 1628 1629 if (expecting == RDMA_CONTROL_NONE) { 1630 trace_qemu_rdma_exchange_get_response_none(control_desc[head->type], 1631 head->type); 1632 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1633 error_report("Was expecting a %s (%d) control message" 1634 ", but got: %s (%d), length: %d", 1635 control_desc[expecting], expecting, 1636 control_desc[head->type], head->type, head->len); 1637 return -EIO; 1638 } 1639 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1640 error_report("too long length: %d", head->len); 1641 return -EINVAL; 1642 } 1643 if (sizeof(*head) + head->len != byte_len) { 1644 error_report("Malformed length: %d byte_len %d", head->len, byte_len); 1645 return -EINVAL; 1646 } 1647 1648 return 0; 1649 } 1650 1651 /* 1652 * When a RECV work request has completed, the work request's 1653 * buffer is pointed at the header. 1654 * 1655 * This will advance the pointer to the data portion 1656 * of the control message of the work request's buffer that 1657 * was populated after the work request finished. 1658 */ 1659 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1660 RDMAControlHeader *head) 1661 { 1662 rdma->wr_data[idx].control_len = head->len; 1663 rdma->wr_data[idx].control_curr = 1664 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1665 } 1666 1667 /* 1668 * This is an 'atomic' high-level operation to deliver a single, unified 1669 * control-channel message. 1670 * 1671 * Additionally, if the user is expecting some kind of reply to this message, 1672 * they can request a 'resp' response message be filled in by posting an 1673 * additional work request on behalf of the user and waiting for an additional 1674 * completion. 1675 * 1676 * The extra (optional) response is used during registration to us from having 1677 * to perform an *additional* exchange of message just to provide a response by 1678 * instead piggy-backing on the acknowledgement. 1679 */ 1680 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1681 uint8_t *data, RDMAControlHeader *resp, 1682 int *resp_idx, 1683 int (*callback)(RDMAContext *rdma)) 1684 { 1685 int ret = 0; 1686 1687 /* 1688 * Wait until the dest is ready before attempting to deliver the message 1689 * by waiting for a READY message. 1690 */ 1691 if (rdma->control_ready_expected) { 1692 RDMAControlHeader resp; 1693 ret = qemu_rdma_exchange_get_response(rdma, 1694 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY); 1695 if (ret < 0) { 1696 return ret; 1697 } 1698 } 1699 1700 /* 1701 * If the user is expecting a response, post a WR in anticipation of it. 1702 */ 1703 if (resp) { 1704 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA); 1705 if (ret) { 1706 error_report("rdma migration: error posting" 1707 " extra control recv for anticipated result!"); 1708 return ret; 1709 } 1710 } 1711 1712 /* 1713 * Post a WR to replace the one we just consumed for the READY message. 1714 */ 1715 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1716 if (ret) { 1717 error_report("rdma migration: error posting first control recv!"); 1718 return ret; 1719 } 1720 1721 /* 1722 * Deliver the control message that was requested. 1723 */ 1724 ret = qemu_rdma_post_send_control(rdma, data, head); 1725 1726 if (ret < 0) { 1727 error_report("Failed to send control buffer!"); 1728 return ret; 1729 } 1730 1731 /* 1732 * If we're expecting a response, block and wait for it. 1733 */ 1734 if (resp) { 1735 if (callback) { 1736 trace_qemu_rdma_exchange_send_issue_callback(); 1737 ret = callback(rdma); 1738 if (ret < 0) { 1739 return ret; 1740 } 1741 } 1742 1743 trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]); 1744 ret = qemu_rdma_exchange_get_response(rdma, resp, 1745 resp->type, RDMA_WRID_DATA); 1746 1747 if (ret < 0) { 1748 return ret; 1749 } 1750 1751 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1752 if (resp_idx) { 1753 *resp_idx = RDMA_WRID_DATA; 1754 } 1755 trace_qemu_rdma_exchange_send_received(control_desc[resp->type]); 1756 } 1757 1758 rdma->control_ready_expected = 1; 1759 1760 return 0; 1761 } 1762 1763 /* 1764 * This is an 'atomic' high-level operation to receive a single, unified 1765 * control-channel message. 1766 */ 1767 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 1768 int expecting) 1769 { 1770 RDMAControlHeader ready = { 1771 .len = 0, 1772 .type = RDMA_CONTROL_READY, 1773 .repeat = 1, 1774 }; 1775 int ret; 1776 1777 /* 1778 * Inform the source that we're ready to receive a message. 1779 */ 1780 ret = qemu_rdma_post_send_control(rdma, NULL, &ready); 1781 1782 if (ret < 0) { 1783 error_report("Failed to send control buffer!"); 1784 return ret; 1785 } 1786 1787 /* 1788 * Block and wait for the message. 1789 */ 1790 ret = qemu_rdma_exchange_get_response(rdma, head, 1791 expecting, RDMA_WRID_READY); 1792 1793 if (ret < 0) { 1794 return ret; 1795 } 1796 1797 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 1798 1799 /* 1800 * Post a new RECV work request to replace the one we just consumed. 1801 */ 1802 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1803 if (ret) { 1804 error_report("rdma migration: error posting second control recv!"); 1805 return ret; 1806 } 1807 1808 return 0; 1809 } 1810 1811 /* 1812 * Write an actual chunk of memory using RDMA. 1813 * 1814 * If we're using dynamic registration on the dest-side, we have to 1815 * send a registration command first. 1816 */ 1817 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, 1818 int current_index, uint64_t current_addr, 1819 uint64_t length) 1820 { 1821 struct ibv_sge sge; 1822 struct ibv_send_wr send_wr = { 0 }; 1823 struct ibv_send_wr *bad_wr; 1824 int reg_result_idx, ret, count = 0; 1825 uint64_t chunk, chunks; 1826 uint8_t *chunk_start, *chunk_end; 1827 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 1828 RDMARegister reg; 1829 RDMARegisterResult *reg_result; 1830 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 1831 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1832 .type = RDMA_CONTROL_REGISTER_REQUEST, 1833 .repeat = 1, 1834 }; 1835 1836 retry: 1837 sge.addr = (uintptr_t)(block->local_host_addr + 1838 (current_addr - block->offset)); 1839 sge.length = length; 1840 1841 chunk = ram_chunk_index(block->local_host_addr, 1842 (uint8_t *)(uintptr_t)sge.addr); 1843 chunk_start = ram_chunk_start(block, chunk); 1844 1845 if (block->is_ram_block) { 1846 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 1847 1848 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1849 chunks--; 1850 } 1851 } else { 1852 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 1853 1854 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1855 chunks--; 1856 } 1857 } 1858 1859 trace_qemu_rdma_write_one_top(chunks + 1, 1860 (chunks + 1) * 1861 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 1862 1863 chunk_end = ram_chunk_end(block, chunk + chunks); 1864 1865 if (!rdma->pin_all) { 1866 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1867 qemu_rdma_unregister_waiting(rdma); 1868 #endif 1869 } 1870 1871 while (test_bit(chunk, block->transit_bitmap)) { 1872 (void)count; 1873 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 1874 sge.addr, length, rdma->nb_sent, block->nb_chunks); 1875 1876 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 1877 1878 if (ret < 0) { 1879 error_report("Failed to Wait for previous write to complete " 1880 "block %d chunk %" PRIu64 1881 " current %" PRIu64 " len %" PRIu64 " %d", 1882 current_index, chunk, sge.addr, length, rdma->nb_sent); 1883 return ret; 1884 } 1885 } 1886 1887 if (!rdma->pin_all || !block->is_ram_block) { 1888 if (!block->remote_keys[chunk]) { 1889 /* 1890 * This chunk has not yet been registered, so first check to see 1891 * if the entire chunk is zero. If so, tell the other size to 1892 * memset() + madvise() the entire chunk without RDMA. 1893 */ 1894 1895 if (can_use_buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr, 1896 length) 1897 && buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr, 1898 length) == length) { 1899 RDMACompress comp = { 1900 .offset = current_addr, 1901 .value = 0, 1902 .block_idx = current_index, 1903 .length = length, 1904 }; 1905 1906 head.len = sizeof(comp); 1907 head.type = RDMA_CONTROL_COMPRESS; 1908 1909 trace_qemu_rdma_write_one_zero(chunk, sge.length, 1910 current_index, current_addr); 1911 1912 compress_to_network(&comp); 1913 ret = qemu_rdma_exchange_send(rdma, &head, 1914 (uint8_t *) &comp, NULL, NULL, NULL); 1915 1916 if (ret < 0) { 1917 return -EIO; 1918 } 1919 1920 acct_update_position(f, sge.length, true); 1921 1922 return 1; 1923 } 1924 1925 /* 1926 * Otherwise, tell other side to register. 1927 */ 1928 reg.current_index = current_index; 1929 if (block->is_ram_block) { 1930 reg.key.current_addr = current_addr; 1931 } else { 1932 reg.key.chunk = chunk; 1933 } 1934 reg.chunks = chunks; 1935 1936 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 1937 current_addr); 1938 1939 register_to_network(®); 1940 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1941 &resp, ®_result_idx, NULL); 1942 if (ret < 0) { 1943 return ret; 1944 } 1945 1946 /* try to overlap this single registration with the one we sent. */ 1947 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1948 &sge.lkey, NULL, chunk, 1949 chunk_start, chunk_end)) { 1950 error_report("cannot get lkey"); 1951 return -EINVAL; 1952 } 1953 1954 reg_result = (RDMARegisterResult *) 1955 rdma->wr_data[reg_result_idx].control_curr; 1956 1957 network_to_result(reg_result); 1958 1959 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 1960 reg_result->rkey, chunk); 1961 1962 block->remote_keys[chunk] = reg_result->rkey; 1963 block->remote_host_addr = reg_result->host_addr; 1964 } else { 1965 /* already registered before */ 1966 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1967 &sge.lkey, NULL, chunk, 1968 chunk_start, chunk_end)) { 1969 error_report("cannot get lkey!"); 1970 return -EINVAL; 1971 } 1972 } 1973 1974 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 1975 } else { 1976 send_wr.wr.rdma.rkey = block->remote_rkey; 1977 1978 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1979 &sge.lkey, NULL, chunk, 1980 chunk_start, chunk_end)) { 1981 error_report("cannot get lkey!"); 1982 return -EINVAL; 1983 } 1984 } 1985 1986 /* 1987 * Encode the ram block index and chunk within this wrid. 1988 * We will use this information at the time of completion 1989 * to figure out which bitmap to check against and then which 1990 * chunk in the bitmap to look for. 1991 */ 1992 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 1993 current_index, chunk); 1994 1995 send_wr.opcode = IBV_WR_RDMA_WRITE; 1996 send_wr.send_flags = IBV_SEND_SIGNALED; 1997 send_wr.sg_list = &sge; 1998 send_wr.num_sge = 1; 1999 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2000 (current_addr - block->offset); 2001 2002 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2003 sge.length); 2004 2005 /* 2006 * ibv_post_send() does not return negative error numbers, 2007 * per the specification they are positive - no idea why. 2008 */ 2009 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2010 2011 if (ret == ENOMEM) { 2012 trace_qemu_rdma_write_one_queue_full(); 2013 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2014 if (ret < 0) { 2015 error_report("rdma migration: failed to make " 2016 "room in full send queue! %d", ret); 2017 return ret; 2018 } 2019 2020 goto retry; 2021 2022 } else if (ret > 0) { 2023 perror("rdma migration: post rdma write failed"); 2024 return -ret; 2025 } 2026 2027 set_bit(chunk, block->transit_bitmap); 2028 acct_update_position(f, sge.length, false); 2029 rdma->total_writes++; 2030 2031 return 0; 2032 } 2033 2034 /* 2035 * Push out any unwritten RDMA operations. 2036 * 2037 * We support sending out multiple chunks at the same time. 2038 * Not all of them need to get signaled in the completion queue. 2039 */ 2040 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) 2041 { 2042 int ret; 2043 2044 if (!rdma->current_length) { 2045 return 0; 2046 } 2047 2048 ret = qemu_rdma_write_one(f, rdma, 2049 rdma->current_index, rdma->current_addr, rdma->current_length); 2050 2051 if (ret < 0) { 2052 return ret; 2053 } 2054 2055 if (ret == 0) { 2056 rdma->nb_sent++; 2057 trace_qemu_rdma_write_flush(rdma->nb_sent); 2058 } 2059 2060 rdma->current_length = 0; 2061 rdma->current_addr = 0; 2062 2063 return 0; 2064 } 2065 2066 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma, 2067 uint64_t offset, uint64_t len) 2068 { 2069 RDMALocalBlock *block; 2070 uint8_t *host_addr; 2071 uint8_t *chunk_end; 2072 2073 if (rdma->current_index < 0) { 2074 return 0; 2075 } 2076 2077 if (rdma->current_chunk < 0) { 2078 return 0; 2079 } 2080 2081 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2082 host_addr = block->local_host_addr + (offset - block->offset); 2083 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2084 2085 if (rdma->current_length == 0) { 2086 return 0; 2087 } 2088 2089 /* 2090 * Only merge into chunk sequentially. 2091 */ 2092 if (offset != (rdma->current_addr + rdma->current_length)) { 2093 return 0; 2094 } 2095 2096 if (offset < block->offset) { 2097 return 0; 2098 } 2099 2100 if ((offset + len) > (block->offset + block->length)) { 2101 return 0; 2102 } 2103 2104 if ((host_addr + len) > chunk_end) { 2105 return 0; 2106 } 2107 2108 return 1; 2109 } 2110 2111 /* 2112 * We're not actually writing here, but doing three things: 2113 * 2114 * 1. Identify the chunk the buffer belongs to. 2115 * 2. If the chunk is full or the buffer doesn't belong to the current 2116 * chunk, then start a new chunk and flush() the old chunk. 2117 * 3. To keep the hardware busy, we also group chunks into batches 2118 * and only require that a batch gets acknowledged in the completion 2119 * qeueue instead of each individual chunk. 2120 */ 2121 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma, 2122 uint64_t block_offset, uint64_t offset, 2123 uint64_t len) 2124 { 2125 uint64_t current_addr = block_offset + offset; 2126 uint64_t index = rdma->current_index; 2127 uint64_t chunk = rdma->current_chunk; 2128 int ret; 2129 2130 /* If we cannot merge it, we flush the current buffer first. */ 2131 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) { 2132 ret = qemu_rdma_write_flush(f, rdma); 2133 if (ret) { 2134 return ret; 2135 } 2136 rdma->current_length = 0; 2137 rdma->current_addr = current_addr; 2138 2139 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2140 offset, len, &index, &chunk); 2141 if (ret) { 2142 error_report("ram block search failed"); 2143 return ret; 2144 } 2145 rdma->current_index = index; 2146 rdma->current_chunk = chunk; 2147 } 2148 2149 /* merge it */ 2150 rdma->current_length += len; 2151 2152 /* flush it if buffer is too large */ 2153 if (rdma->current_length >= RDMA_MERGE_MAX) { 2154 return qemu_rdma_write_flush(f, rdma); 2155 } 2156 2157 return 0; 2158 } 2159 2160 static void qemu_rdma_cleanup(RDMAContext *rdma) 2161 { 2162 struct rdma_cm_event *cm_event; 2163 int ret, idx; 2164 2165 if (rdma->cm_id && rdma->connected) { 2166 if (rdma->error_state) { 2167 RDMAControlHeader head = { .len = 0, 2168 .type = RDMA_CONTROL_ERROR, 2169 .repeat = 1, 2170 }; 2171 error_report("Early error. Sending error."); 2172 qemu_rdma_post_send_control(rdma, NULL, &head); 2173 } 2174 2175 ret = rdma_disconnect(rdma->cm_id); 2176 if (!ret) { 2177 trace_qemu_rdma_cleanup_waiting_for_disconnect(); 2178 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2179 if (!ret) { 2180 rdma_ack_cm_event(cm_event); 2181 } 2182 } 2183 trace_qemu_rdma_cleanup_disconnect(); 2184 rdma->connected = false; 2185 } 2186 2187 g_free(rdma->dest_blocks); 2188 rdma->dest_blocks = NULL; 2189 2190 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2191 if (rdma->wr_data[idx].control_mr) { 2192 rdma->total_registrations--; 2193 ibv_dereg_mr(rdma->wr_data[idx].control_mr); 2194 } 2195 rdma->wr_data[idx].control_mr = NULL; 2196 } 2197 2198 if (rdma->local_ram_blocks.block) { 2199 while (rdma->local_ram_blocks.nb_blocks) { 2200 rdma_delete_block(rdma, rdma->local_ram_blocks.block->offset); 2201 } 2202 } 2203 2204 if (rdma->qp) { 2205 rdma_destroy_qp(rdma->cm_id); 2206 rdma->qp = NULL; 2207 } 2208 if (rdma->cq) { 2209 ibv_destroy_cq(rdma->cq); 2210 rdma->cq = NULL; 2211 } 2212 if (rdma->comp_channel) { 2213 ibv_destroy_comp_channel(rdma->comp_channel); 2214 rdma->comp_channel = NULL; 2215 } 2216 if (rdma->pd) { 2217 ibv_dealloc_pd(rdma->pd); 2218 rdma->pd = NULL; 2219 } 2220 if (rdma->cm_id) { 2221 rdma_destroy_id(rdma->cm_id); 2222 rdma->cm_id = NULL; 2223 } 2224 if (rdma->listen_id) { 2225 rdma_destroy_id(rdma->listen_id); 2226 rdma->listen_id = NULL; 2227 } 2228 if (rdma->channel) { 2229 rdma_destroy_event_channel(rdma->channel); 2230 rdma->channel = NULL; 2231 } 2232 g_free(rdma->host); 2233 rdma->host = NULL; 2234 } 2235 2236 2237 static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all) 2238 { 2239 int ret, idx; 2240 Error *local_err = NULL, **temp = &local_err; 2241 2242 /* 2243 * Will be validated against destination's actual capabilities 2244 * after the connect() completes. 2245 */ 2246 rdma->pin_all = pin_all; 2247 2248 ret = qemu_rdma_resolve_host(rdma, temp); 2249 if (ret) { 2250 goto err_rdma_source_init; 2251 } 2252 2253 ret = qemu_rdma_alloc_pd_cq(rdma); 2254 if (ret) { 2255 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()" 2256 " limits may be too low. Please check $ ulimit -a # and " 2257 "search for 'ulimit -l' in the output"); 2258 goto err_rdma_source_init; 2259 } 2260 2261 ret = qemu_rdma_alloc_qp(rdma); 2262 if (ret) { 2263 ERROR(temp, "rdma migration: error allocating qp!"); 2264 goto err_rdma_source_init; 2265 } 2266 2267 ret = qemu_rdma_init_ram_blocks(rdma); 2268 if (ret) { 2269 ERROR(temp, "rdma migration: error initializing ram blocks!"); 2270 goto err_rdma_source_init; 2271 } 2272 2273 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2274 ret = qemu_rdma_reg_control(rdma, idx); 2275 if (ret) { 2276 ERROR(temp, "rdma migration: error registering %d control!", 2277 idx); 2278 goto err_rdma_source_init; 2279 } 2280 } 2281 2282 return 0; 2283 2284 err_rdma_source_init: 2285 error_propagate(errp, local_err); 2286 qemu_rdma_cleanup(rdma); 2287 return -1; 2288 } 2289 2290 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp) 2291 { 2292 RDMACapabilities cap = { 2293 .version = RDMA_CONTROL_VERSION_CURRENT, 2294 .flags = 0, 2295 }; 2296 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2297 .retry_count = 5, 2298 .private_data = &cap, 2299 .private_data_len = sizeof(cap), 2300 }; 2301 struct rdma_cm_event *cm_event; 2302 int ret; 2303 2304 /* 2305 * Only negotiate the capability with destination if the user 2306 * on the source first requested the capability. 2307 */ 2308 if (rdma->pin_all) { 2309 trace_qemu_rdma_connect_pin_all_requested(); 2310 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2311 } 2312 2313 caps_to_network(&cap); 2314 2315 ret = rdma_connect(rdma->cm_id, &conn_param); 2316 if (ret) { 2317 perror("rdma_connect"); 2318 ERROR(errp, "connecting to destination!"); 2319 goto err_rdma_source_connect; 2320 } 2321 2322 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2323 if (ret) { 2324 perror("rdma_get_cm_event after rdma_connect"); 2325 ERROR(errp, "connecting to destination!"); 2326 rdma_ack_cm_event(cm_event); 2327 goto err_rdma_source_connect; 2328 } 2329 2330 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2331 perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect"); 2332 ERROR(errp, "connecting to destination!"); 2333 rdma_ack_cm_event(cm_event); 2334 goto err_rdma_source_connect; 2335 } 2336 rdma->connected = true; 2337 2338 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2339 network_to_caps(&cap); 2340 2341 /* 2342 * Verify that the *requested* capabilities are supported by the destination 2343 * and disable them otherwise. 2344 */ 2345 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2346 ERROR(errp, "Server cannot support pinning all memory. " 2347 "Will register memory dynamically."); 2348 rdma->pin_all = false; 2349 } 2350 2351 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2352 2353 rdma_ack_cm_event(cm_event); 2354 2355 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2356 if (ret) { 2357 ERROR(errp, "posting second control recv!"); 2358 goto err_rdma_source_connect; 2359 } 2360 2361 rdma->control_ready_expected = 1; 2362 rdma->nb_sent = 0; 2363 return 0; 2364 2365 err_rdma_source_connect: 2366 qemu_rdma_cleanup(rdma); 2367 return -1; 2368 } 2369 2370 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2371 { 2372 int ret, idx; 2373 struct rdma_cm_id *listen_id; 2374 char ip[40] = "unknown"; 2375 struct rdma_addrinfo *res, *e; 2376 char port_str[16]; 2377 2378 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2379 rdma->wr_data[idx].control_len = 0; 2380 rdma->wr_data[idx].control_curr = NULL; 2381 } 2382 2383 if (!rdma->host || !rdma->host[0]) { 2384 ERROR(errp, "RDMA host is not set!"); 2385 rdma->error_state = -EINVAL; 2386 return -1; 2387 } 2388 /* create CM channel */ 2389 rdma->channel = rdma_create_event_channel(); 2390 if (!rdma->channel) { 2391 ERROR(errp, "could not create rdma event channel"); 2392 rdma->error_state = -EINVAL; 2393 return -1; 2394 } 2395 2396 /* create CM id */ 2397 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2398 if (ret) { 2399 ERROR(errp, "could not create cm_id!"); 2400 goto err_dest_init_create_listen_id; 2401 } 2402 2403 snprintf(port_str, 16, "%d", rdma->port); 2404 port_str[15] = '\0'; 2405 2406 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2407 if (ret < 0) { 2408 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 2409 goto err_dest_init_bind_addr; 2410 } 2411 2412 for (e = res; e != NULL; e = e->ai_next) { 2413 inet_ntop(e->ai_family, 2414 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2415 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2416 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2417 if (ret) { 2418 continue; 2419 } 2420 if (e->ai_family == AF_INET6) { 2421 ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs); 2422 if (ret) { 2423 continue; 2424 } 2425 } 2426 break; 2427 } 2428 2429 if (!e) { 2430 ERROR(errp, "Error: could not rdma_bind_addr!"); 2431 goto err_dest_init_bind_addr; 2432 } 2433 2434 rdma->listen_id = listen_id; 2435 qemu_rdma_dump_gid("dest_init", listen_id); 2436 return 0; 2437 2438 err_dest_init_bind_addr: 2439 rdma_destroy_id(listen_id); 2440 err_dest_init_create_listen_id: 2441 rdma_destroy_event_channel(rdma->channel); 2442 rdma->channel = NULL; 2443 rdma->error_state = ret; 2444 return ret; 2445 2446 } 2447 2448 static void *qemu_rdma_data_init(const char *host_port, Error **errp) 2449 { 2450 RDMAContext *rdma = NULL; 2451 InetSocketAddress *addr; 2452 2453 if (host_port) { 2454 rdma = g_malloc0(sizeof(RDMAContext)); 2455 rdma->current_index = -1; 2456 rdma->current_chunk = -1; 2457 2458 addr = inet_parse(host_port, NULL); 2459 if (addr != NULL) { 2460 rdma->port = atoi(addr->port); 2461 rdma->host = g_strdup(addr->host); 2462 } else { 2463 ERROR(errp, "bad RDMA migration address '%s'", host_port); 2464 g_free(rdma); 2465 rdma = NULL; 2466 } 2467 2468 qapi_free_InetSocketAddress(addr); 2469 } 2470 2471 return rdma; 2472 } 2473 2474 /* 2475 * QEMUFile interface to the control channel. 2476 * SEND messages for control only. 2477 * VM's ram is handled with regular RDMA messages. 2478 */ 2479 static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, 2480 int64_t pos, int size) 2481 { 2482 QEMUFileRDMA *r = opaque; 2483 QEMUFile *f = r->file; 2484 RDMAContext *rdma = r->rdma; 2485 size_t remaining = size; 2486 uint8_t * data = (void *) buf; 2487 int ret; 2488 2489 CHECK_ERROR_STATE(); 2490 2491 /* 2492 * Push out any writes that 2493 * we're queued up for VM's ram. 2494 */ 2495 ret = qemu_rdma_write_flush(f, rdma); 2496 if (ret < 0) { 2497 rdma->error_state = ret; 2498 return ret; 2499 } 2500 2501 while (remaining) { 2502 RDMAControlHeader head; 2503 2504 r->len = MIN(remaining, RDMA_SEND_INCREMENT); 2505 remaining -= r->len; 2506 2507 head.len = r->len; 2508 head.type = RDMA_CONTROL_QEMU_FILE; 2509 2510 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); 2511 2512 if (ret < 0) { 2513 rdma->error_state = ret; 2514 return ret; 2515 } 2516 2517 data += r->len; 2518 } 2519 2520 return size; 2521 } 2522 2523 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2524 int size, int idx) 2525 { 2526 size_t len = 0; 2527 2528 if (rdma->wr_data[idx].control_len) { 2529 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2530 2531 len = MIN(size, rdma->wr_data[idx].control_len); 2532 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2533 rdma->wr_data[idx].control_curr += len; 2534 rdma->wr_data[idx].control_len -= len; 2535 } 2536 2537 return len; 2538 } 2539 2540 /* 2541 * QEMUFile interface to the control channel. 2542 * RDMA links don't use bytestreams, so we have to 2543 * return bytes to QEMUFile opportunistically. 2544 */ 2545 static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf, 2546 int64_t pos, int size) 2547 { 2548 QEMUFileRDMA *r = opaque; 2549 RDMAContext *rdma = r->rdma; 2550 RDMAControlHeader head; 2551 int ret = 0; 2552 2553 CHECK_ERROR_STATE(); 2554 2555 /* 2556 * First, we hold on to the last SEND message we 2557 * were given and dish out the bytes until we run 2558 * out of bytes. 2559 */ 2560 r->len = qemu_rdma_fill(r->rdma, buf, size, 0); 2561 if (r->len) { 2562 return r->len; 2563 } 2564 2565 /* 2566 * Once we run out, we block and wait for another 2567 * SEND message to arrive. 2568 */ 2569 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); 2570 2571 if (ret < 0) { 2572 rdma->error_state = ret; 2573 return ret; 2574 } 2575 2576 /* 2577 * SEND was received with new bytes, now try again. 2578 */ 2579 return qemu_rdma_fill(r->rdma, buf, size, 0); 2580 } 2581 2582 /* 2583 * Block until all the outstanding chunks have been delivered by the hardware. 2584 */ 2585 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) 2586 { 2587 int ret; 2588 2589 if (qemu_rdma_write_flush(f, rdma) < 0) { 2590 return -EIO; 2591 } 2592 2593 while (rdma->nb_sent) { 2594 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2595 if (ret < 0) { 2596 error_report("rdma migration: complete polling error!"); 2597 return -EIO; 2598 } 2599 } 2600 2601 qemu_rdma_unregister_waiting(rdma); 2602 2603 return 0; 2604 } 2605 2606 static int qemu_rdma_close(void *opaque) 2607 { 2608 trace_qemu_rdma_close(); 2609 QEMUFileRDMA *r = opaque; 2610 if (r->rdma) { 2611 qemu_rdma_cleanup(r->rdma); 2612 g_free(r->rdma); 2613 } 2614 g_free(r); 2615 return 0; 2616 } 2617 2618 /* 2619 * Parameters: 2620 * @offset == 0 : 2621 * This means that 'block_offset' is a full virtual address that does not 2622 * belong to a RAMBlock of the virtual machine and instead 2623 * represents a private malloc'd memory area that the caller wishes to 2624 * transfer. 2625 * 2626 * @offset != 0 : 2627 * Offset is an offset to be added to block_offset and used 2628 * to also lookup the corresponding RAMBlock. 2629 * 2630 * @size > 0 : 2631 * Initiate an transfer this size. 2632 * 2633 * @size == 0 : 2634 * A 'hint' or 'advice' that means that we wish to speculatively 2635 * and asynchronously unregister this memory. In this case, there is no 2636 * guarantee that the unregister will actually happen, for example, 2637 * if the memory is being actively transmitted. Additionally, the memory 2638 * may be re-registered at any future time if a write within the same 2639 * chunk was requested again, even if you attempted to unregister it 2640 * here. 2641 * 2642 * @size < 0 : TODO, not yet supported 2643 * Unregister the memory NOW. This means that the caller does not 2644 * expect there to be any future RDMA transfers and we just want to clean 2645 * things up. This is used in case the upper layer owns the memory and 2646 * cannot wait for qemu_fclose() to occur. 2647 * 2648 * @bytes_sent : User-specificed pointer to indicate how many bytes were 2649 * sent. Usually, this will not be more than a few bytes of 2650 * the protocol because most transfers are sent asynchronously. 2651 */ 2652 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, 2653 ram_addr_t block_offset, ram_addr_t offset, 2654 size_t size, uint64_t *bytes_sent) 2655 { 2656 QEMUFileRDMA *rfile = opaque; 2657 RDMAContext *rdma = rfile->rdma; 2658 int ret; 2659 2660 CHECK_ERROR_STATE(); 2661 2662 qemu_fflush(f); 2663 2664 if (size > 0) { 2665 /* 2666 * Add this page to the current 'chunk'. If the chunk 2667 * is full, or the page doen't belong to the current chunk, 2668 * an actual RDMA write will occur and a new chunk will be formed. 2669 */ 2670 ret = qemu_rdma_write(f, rdma, block_offset, offset, size); 2671 if (ret < 0) { 2672 error_report("rdma migration: write error! %d", ret); 2673 goto err; 2674 } 2675 2676 /* 2677 * We always return 1 bytes because the RDMA 2678 * protocol is completely asynchronous. We do not yet know 2679 * whether an identified chunk is zero or not because we're 2680 * waiting for other pages to potentially be merged with 2681 * the current chunk. So, we have to call qemu_update_position() 2682 * later on when the actual write occurs. 2683 */ 2684 if (bytes_sent) { 2685 *bytes_sent = 1; 2686 } 2687 } else { 2688 uint64_t index, chunk; 2689 2690 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long 2691 if (size < 0) { 2692 ret = qemu_rdma_drain_cq(f, rdma); 2693 if (ret < 0) { 2694 fprintf(stderr, "rdma: failed to synchronously drain" 2695 " completion queue before unregistration.\n"); 2696 goto err; 2697 } 2698 } 2699 */ 2700 2701 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2702 offset, size, &index, &chunk); 2703 2704 if (ret) { 2705 error_report("ram block search failed"); 2706 goto err; 2707 } 2708 2709 qemu_rdma_signal_unregister(rdma, index, chunk, 0); 2710 2711 /* 2712 * TODO: Synchronous, guaranteed unregistration (should not occur during 2713 * fast-path). Otherwise, unregisters will process on the next call to 2714 * qemu_rdma_drain_cq() 2715 if (size < 0) { 2716 qemu_rdma_unregister_waiting(rdma); 2717 } 2718 */ 2719 } 2720 2721 /* 2722 * Drain the Completion Queue if possible, but do not block, 2723 * just poll. 2724 * 2725 * If nothing to poll, the end of the iteration will do this 2726 * again to make sure we don't overflow the request queue. 2727 */ 2728 while (1) { 2729 uint64_t wr_id, wr_id_in; 2730 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL); 2731 if (ret < 0) { 2732 error_report("rdma migration: polling error! %d", ret); 2733 goto err; 2734 } 2735 2736 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 2737 2738 if (wr_id == RDMA_WRID_NONE) { 2739 break; 2740 } 2741 } 2742 2743 return RAM_SAVE_CONTROL_DELAYED; 2744 err: 2745 rdma->error_state = ret; 2746 return ret; 2747 } 2748 2749 static int qemu_rdma_accept(RDMAContext *rdma) 2750 { 2751 RDMACapabilities cap; 2752 struct rdma_conn_param conn_param = { 2753 .responder_resources = 2, 2754 .private_data = &cap, 2755 .private_data_len = sizeof(cap), 2756 }; 2757 struct rdma_cm_event *cm_event; 2758 struct ibv_context *verbs; 2759 int ret = -EINVAL; 2760 int idx; 2761 2762 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2763 if (ret) { 2764 goto err_rdma_dest_wait; 2765 } 2766 2767 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 2768 rdma_ack_cm_event(cm_event); 2769 goto err_rdma_dest_wait; 2770 } 2771 2772 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2773 2774 network_to_caps(&cap); 2775 2776 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 2777 error_report("Unknown source RDMA version: %d, bailing...", 2778 cap.version); 2779 rdma_ack_cm_event(cm_event); 2780 goto err_rdma_dest_wait; 2781 } 2782 2783 /* 2784 * Respond with only the capabilities this version of QEMU knows about. 2785 */ 2786 cap.flags &= known_capabilities; 2787 2788 /* 2789 * Enable the ones that we do know about. 2790 * Add other checks here as new ones are introduced. 2791 */ 2792 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 2793 rdma->pin_all = true; 2794 } 2795 2796 rdma->cm_id = cm_event->id; 2797 verbs = cm_event->id->verbs; 2798 2799 rdma_ack_cm_event(cm_event); 2800 2801 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 2802 2803 caps_to_network(&cap); 2804 2805 trace_qemu_rdma_accept_pin_verbsc(verbs); 2806 2807 if (!rdma->verbs) { 2808 rdma->verbs = verbs; 2809 } else if (rdma->verbs != verbs) { 2810 error_report("ibv context not matching %p, %p!", rdma->verbs, 2811 verbs); 2812 goto err_rdma_dest_wait; 2813 } 2814 2815 qemu_rdma_dump_id("dest_init", verbs); 2816 2817 ret = qemu_rdma_alloc_pd_cq(rdma); 2818 if (ret) { 2819 error_report("rdma migration: error allocating pd and cq!"); 2820 goto err_rdma_dest_wait; 2821 } 2822 2823 ret = qemu_rdma_alloc_qp(rdma); 2824 if (ret) { 2825 error_report("rdma migration: error allocating qp!"); 2826 goto err_rdma_dest_wait; 2827 } 2828 2829 ret = qemu_rdma_init_ram_blocks(rdma); 2830 if (ret) { 2831 error_report("rdma migration: error initializing ram blocks!"); 2832 goto err_rdma_dest_wait; 2833 } 2834 2835 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2836 ret = qemu_rdma_reg_control(rdma, idx); 2837 if (ret) { 2838 error_report("rdma: error registering %d control", idx); 2839 goto err_rdma_dest_wait; 2840 } 2841 } 2842 2843 qemu_set_fd_handler2(rdma->channel->fd, NULL, NULL, NULL, NULL); 2844 2845 ret = rdma_accept(rdma->cm_id, &conn_param); 2846 if (ret) { 2847 error_report("rdma_accept returns %d", ret); 2848 goto err_rdma_dest_wait; 2849 } 2850 2851 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2852 if (ret) { 2853 error_report("rdma_accept get_cm_event failed %d", ret); 2854 goto err_rdma_dest_wait; 2855 } 2856 2857 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2858 error_report("rdma_accept not event established"); 2859 rdma_ack_cm_event(cm_event); 2860 goto err_rdma_dest_wait; 2861 } 2862 2863 rdma_ack_cm_event(cm_event); 2864 rdma->connected = true; 2865 2866 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2867 if (ret) { 2868 error_report("rdma migration: error posting second control recv"); 2869 goto err_rdma_dest_wait; 2870 } 2871 2872 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 2873 2874 return 0; 2875 2876 err_rdma_dest_wait: 2877 rdma->error_state = ret; 2878 qemu_rdma_cleanup(rdma); 2879 return ret; 2880 } 2881 2882 /* 2883 * During each iteration of the migration, we listen for instructions 2884 * by the source VM to perform dynamic page registrations before they 2885 * can perform RDMA operations. 2886 * 2887 * We respond with the 'rkey'. 2888 * 2889 * Keep doing this until the source tells us to stop. 2890 */ 2891 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque, 2892 uint64_t flags) 2893 { 2894 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 2895 .type = RDMA_CONTROL_REGISTER_RESULT, 2896 .repeat = 0, 2897 }; 2898 RDMAControlHeader unreg_resp = { .len = 0, 2899 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 2900 .repeat = 0, 2901 }; 2902 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 2903 .repeat = 1 }; 2904 QEMUFileRDMA *rfile = opaque; 2905 RDMAContext *rdma = rfile->rdma; 2906 RDMALocalBlocks *local = &rdma->local_ram_blocks; 2907 RDMAControlHeader head; 2908 RDMARegister *reg, *registers; 2909 RDMACompress *comp; 2910 RDMARegisterResult *reg_result; 2911 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 2912 RDMALocalBlock *block; 2913 void *host_addr; 2914 int ret = 0; 2915 int idx = 0; 2916 int count = 0; 2917 int i = 0; 2918 2919 CHECK_ERROR_STATE(); 2920 2921 do { 2922 trace_qemu_rdma_registration_handle_wait(flags); 2923 2924 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE); 2925 2926 if (ret < 0) { 2927 break; 2928 } 2929 2930 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 2931 error_report("rdma: Too many requests in this message (%d)." 2932 "Bailing.", head.repeat); 2933 ret = -EIO; 2934 break; 2935 } 2936 2937 switch (head.type) { 2938 case RDMA_CONTROL_COMPRESS: 2939 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 2940 network_to_compress(comp); 2941 2942 trace_qemu_rdma_registration_handle_compress(comp->length, 2943 comp->block_idx, 2944 comp->offset); 2945 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 2946 2947 host_addr = block->local_host_addr + 2948 (comp->offset - block->offset); 2949 2950 ram_handle_compressed(host_addr, comp->value, comp->length); 2951 break; 2952 2953 case RDMA_CONTROL_REGISTER_FINISHED: 2954 trace_qemu_rdma_registration_handle_finished(); 2955 goto out; 2956 2957 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 2958 trace_qemu_rdma_registration_handle_ram_blocks(); 2959 2960 if (rdma->pin_all) { 2961 ret = qemu_rdma_reg_whole_ram_blocks(rdma); 2962 if (ret) { 2963 error_report("rdma migration: error dest " 2964 "registering ram blocks"); 2965 goto out; 2966 } 2967 } 2968 2969 /* 2970 * Dest uses this to prepare to transmit the RAMBlock descriptions 2971 * to the source VM after connection setup. 2972 * Both sides use the "remote" structure to communicate and update 2973 * their "local" descriptions with what was sent. 2974 */ 2975 for (i = 0; i < local->nb_blocks; i++) { 2976 rdma->dest_blocks[i].remote_host_addr = 2977 (uintptr_t)(local->block[i].local_host_addr); 2978 2979 if (rdma->pin_all) { 2980 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 2981 } 2982 2983 rdma->dest_blocks[i].offset = local->block[i].offset; 2984 rdma->dest_blocks[i].length = local->block[i].length; 2985 2986 dest_block_to_network(&rdma->dest_blocks[i]); 2987 } 2988 2989 blocks.len = rdma->local_ram_blocks.nb_blocks 2990 * sizeof(RDMADestBlock); 2991 2992 2993 ret = qemu_rdma_post_send_control(rdma, 2994 (uint8_t *) rdma->dest_blocks, &blocks); 2995 2996 if (ret < 0) { 2997 error_report("rdma migration: error sending remote info"); 2998 goto out; 2999 } 3000 3001 break; 3002 case RDMA_CONTROL_REGISTER_REQUEST: 3003 trace_qemu_rdma_registration_handle_register(head.repeat); 3004 3005 reg_resp.repeat = head.repeat; 3006 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3007 3008 for (count = 0; count < head.repeat; count++) { 3009 uint64_t chunk; 3010 uint8_t *chunk_start, *chunk_end; 3011 3012 reg = ®isters[count]; 3013 network_to_register(reg); 3014 3015 reg_result = &results[count]; 3016 3017 trace_qemu_rdma_registration_handle_register_loop(count, 3018 reg->current_index, reg->key.current_addr, reg->chunks); 3019 3020 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3021 if (block->is_ram_block) { 3022 host_addr = (block->local_host_addr + 3023 (reg->key.current_addr - block->offset)); 3024 chunk = ram_chunk_index(block->local_host_addr, 3025 (uint8_t *) host_addr); 3026 } else { 3027 chunk = reg->key.chunk; 3028 host_addr = block->local_host_addr + 3029 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3030 } 3031 chunk_start = ram_chunk_start(block, chunk); 3032 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3033 if (qemu_rdma_register_and_get_keys(rdma, block, 3034 (uintptr_t)host_addr, NULL, ®_result->rkey, 3035 chunk, chunk_start, chunk_end)) { 3036 error_report("cannot get rkey"); 3037 ret = -EINVAL; 3038 goto out; 3039 } 3040 3041 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3042 3043 trace_qemu_rdma_registration_handle_register_rkey( 3044 reg_result->rkey); 3045 3046 result_to_network(reg_result); 3047 } 3048 3049 ret = qemu_rdma_post_send_control(rdma, 3050 (uint8_t *) results, ®_resp); 3051 3052 if (ret < 0) { 3053 error_report("Failed to send control buffer"); 3054 goto out; 3055 } 3056 break; 3057 case RDMA_CONTROL_UNREGISTER_REQUEST: 3058 trace_qemu_rdma_registration_handle_unregister(head.repeat); 3059 unreg_resp.repeat = head.repeat; 3060 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3061 3062 for (count = 0; count < head.repeat; count++) { 3063 reg = ®isters[count]; 3064 network_to_register(reg); 3065 3066 trace_qemu_rdma_registration_handle_unregister_loop(count, 3067 reg->current_index, reg->key.chunk); 3068 3069 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3070 3071 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3072 block->pmr[reg->key.chunk] = NULL; 3073 3074 if (ret != 0) { 3075 perror("rdma unregistration chunk failed"); 3076 ret = -ret; 3077 goto out; 3078 } 3079 3080 rdma->total_registrations--; 3081 3082 trace_qemu_rdma_registration_handle_unregister_success( 3083 reg->key.chunk); 3084 } 3085 3086 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp); 3087 3088 if (ret < 0) { 3089 error_report("Failed to send control buffer"); 3090 goto out; 3091 } 3092 break; 3093 case RDMA_CONTROL_REGISTER_RESULT: 3094 error_report("Invalid RESULT message at dest."); 3095 ret = -EIO; 3096 goto out; 3097 default: 3098 error_report("Unknown control message %s", control_desc[head.type]); 3099 ret = -EIO; 3100 goto out; 3101 } 3102 } while (1); 3103 out: 3104 if (ret < 0) { 3105 rdma->error_state = ret; 3106 } 3107 return ret; 3108 } 3109 3110 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, 3111 uint64_t flags) 3112 { 3113 QEMUFileRDMA *rfile = opaque; 3114 RDMAContext *rdma = rfile->rdma; 3115 3116 CHECK_ERROR_STATE(); 3117 3118 trace_qemu_rdma_registration_start(flags); 3119 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3120 qemu_fflush(f); 3121 3122 return 0; 3123 } 3124 3125 /* 3126 * Inform dest that dynamic registrations are done for now. 3127 * First, flush writes, if any. 3128 */ 3129 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, 3130 uint64_t flags) 3131 { 3132 Error *local_err = NULL, **errp = &local_err; 3133 QEMUFileRDMA *rfile = opaque; 3134 RDMAContext *rdma = rfile->rdma; 3135 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3136 int ret = 0; 3137 3138 CHECK_ERROR_STATE(); 3139 3140 qemu_fflush(f); 3141 ret = qemu_rdma_drain_cq(f, rdma); 3142 3143 if (ret < 0) { 3144 goto err; 3145 } 3146 3147 if (flags == RAM_CONTROL_SETUP) { 3148 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3149 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3150 int reg_result_idx, i, j, nb_dest_blocks; 3151 3152 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3153 trace_qemu_rdma_registration_stop_ram(); 3154 3155 /* 3156 * Make sure that we parallelize the pinning on both sides. 3157 * For very large guests, doing this serially takes a really 3158 * long time, so we have to 'interleave' the pinning locally 3159 * with the control messages by performing the pinning on this 3160 * side before we receive the control response from the other 3161 * side that the pinning has completed. 3162 */ 3163 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3164 ®_result_idx, rdma->pin_all ? 3165 qemu_rdma_reg_whole_ram_blocks : NULL); 3166 if (ret < 0) { 3167 ERROR(errp, "receiving remote info!"); 3168 return ret; 3169 } 3170 3171 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3172 3173 /* 3174 * The protocol uses two different sets of rkeys (mutually exclusive): 3175 * 1. One key to represent the virtual address of the entire ram block. 3176 * (dynamic chunk registration disabled - pin everything with one rkey.) 3177 * 2. One to represent individual chunks within a ram block. 3178 * (dynamic chunk registration enabled - pin individual chunks.) 3179 * 3180 * Once the capability is successfully negotiated, the destination transmits 3181 * the keys to use (or sends them later) including the virtual addresses 3182 * and then propagates the remote ram block descriptions to his local copy. 3183 */ 3184 3185 if (local->nb_blocks != nb_dest_blocks) { 3186 ERROR(errp, "ram blocks mismatch #1! " 3187 "Your QEMU command line parameters are probably " 3188 "not identical on both the source and destination."); 3189 return -EINVAL; 3190 } 3191 3192 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 3193 memcpy(rdma->dest_blocks, 3194 rdma->wr_data[reg_result_idx].control_curr, resp.len); 3195 for (i = 0; i < nb_dest_blocks; i++) { 3196 network_to_dest_block(&rdma->dest_blocks[i]); 3197 3198 /* search local ram blocks */ 3199 for (j = 0; j < local->nb_blocks; j++) { 3200 if (rdma->dest_blocks[i].offset != local->block[j].offset) { 3201 continue; 3202 } 3203 3204 if (rdma->dest_blocks[i].length != local->block[j].length) { 3205 ERROR(errp, "ram blocks mismatch #2! " 3206 "Your QEMU command line parameters are probably " 3207 "not identical on both the source and destination."); 3208 return -EINVAL; 3209 } 3210 local->block[j].remote_host_addr = 3211 rdma->dest_blocks[i].remote_host_addr; 3212 local->block[j].remote_rkey = rdma->dest_blocks[i].remote_rkey; 3213 break; 3214 } 3215 3216 if (j >= local->nb_blocks) { 3217 ERROR(errp, "ram blocks mismatch #3! " 3218 "Your QEMU command line parameters are probably " 3219 "not identical on both the source and destination."); 3220 return -EINVAL; 3221 } 3222 } 3223 } 3224 3225 trace_qemu_rdma_registration_stop(flags); 3226 3227 head.type = RDMA_CONTROL_REGISTER_FINISHED; 3228 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); 3229 3230 if (ret < 0) { 3231 goto err; 3232 } 3233 3234 return 0; 3235 err: 3236 rdma->error_state = ret; 3237 return ret; 3238 } 3239 3240 static int qemu_rdma_get_fd(void *opaque) 3241 { 3242 QEMUFileRDMA *rfile = opaque; 3243 RDMAContext *rdma = rfile->rdma; 3244 3245 return rdma->comp_channel->fd; 3246 } 3247 3248 static const QEMUFileOps rdma_read_ops = { 3249 .get_buffer = qemu_rdma_get_buffer, 3250 .get_fd = qemu_rdma_get_fd, 3251 .close = qemu_rdma_close, 3252 .hook_ram_load = qemu_rdma_registration_handle, 3253 }; 3254 3255 static const QEMUFileOps rdma_write_ops = { 3256 .put_buffer = qemu_rdma_put_buffer, 3257 .close = qemu_rdma_close, 3258 .before_ram_iterate = qemu_rdma_registration_start, 3259 .after_ram_iterate = qemu_rdma_registration_stop, 3260 .save_page = qemu_rdma_save_page, 3261 }; 3262 3263 static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) 3264 { 3265 QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA)); 3266 3267 if (qemu_file_mode_is_not_valid(mode)) { 3268 return NULL; 3269 } 3270 3271 r->rdma = rdma; 3272 3273 if (mode[0] == 'w') { 3274 r->file = qemu_fopen_ops(r, &rdma_write_ops); 3275 } else { 3276 r->file = qemu_fopen_ops(r, &rdma_read_ops); 3277 } 3278 3279 return r->file; 3280 } 3281 3282 static void rdma_accept_incoming_migration(void *opaque) 3283 { 3284 RDMAContext *rdma = opaque; 3285 int ret; 3286 QEMUFile *f; 3287 Error *local_err = NULL, **errp = &local_err; 3288 3289 trace_qemu_dma_accept_incoming_migration(); 3290 ret = qemu_rdma_accept(rdma); 3291 3292 if (ret) { 3293 ERROR(errp, "RDMA Migration initialization failed!"); 3294 return; 3295 } 3296 3297 trace_qemu_dma_accept_incoming_migration_accepted(); 3298 3299 f = qemu_fopen_rdma(rdma, "rb"); 3300 if (f == NULL) { 3301 ERROR(errp, "could not qemu_fopen_rdma!"); 3302 qemu_rdma_cleanup(rdma); 3303 return; 3304 } 3305 3306 rdma->migration_started_on_destination = 1; 3307 process_incoming_migration(f); 3308 } 3309 3310 void rdma_start_incoming_migration(const char *host_port, Error **errp) 3311 { 3312 int ret; 3313 RDMAContext *rdma; 3314 Error *local_err = NULL; 3315 3316 trace_rdma_start_incoming_migration(); 3317 rdma = qemu_rdma_data_init(host_port, &local_err); 3318 3319 if (rdma == NULL) { 3320 goto err; 3321 } 3322 3323 ret = qemu_rdma_dest_init(rdma, &local_err); 3324 3325 if (ret) { 3326 goto err; 3327 } 3328 3329 trace_rdma_start_incoming_migration_after_dest_init(); 3330 3331 ret = rdma_listen(rdma->listen_id, 5); 3332 3333 if (ret) { 3334 ERROR(errp, "listening on socket!"); 3335 goto err; 3336 } 3337 3338 trace_rdma_start_incoming_migration_after_rdma_listen(); 3339 3340 qemu_set_fd_handler2(rdma->channel->fd, NULL, 3341 rdma_accept_incoming_migration, NULL, 3342 (void *)(intptr_t) rdma); 3343 return; 3344 err: 3345 error_propagate(errp, local_err); 3346 g_free(rdma); 3347 } 3348 3349 void rdma_start_outgoing_migration(void *opaque, 3350 const char *host_port, Error **errp) 3351 { 3352 MigrationState *s = opaque; 3353 Error *local_err = NULL, **temp = &local_err; 3354 RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err); 3355 int ret = 0; 3356 3357 if (rdma == NULL) { 3358 ERROR(temp, "Failed to initialize RDMA data structures! %d", ret); 3359 goto err; 3360 } 3361 3362 ret = qemu_rdma_source_init(rdma, &local_err, 3363 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]); 3364 3365 if (ret) { 3366 goto err; 3367 } 3368 3369 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 3370 ret = qemu_rdma_connect(rdma, &local_err); 3371 3372 if (ret) { 3373 goto err; 3374 } 3375 3376 trace_rdma_start_outgoing_migration_after_rdma_connect(); 3377 3378 s->file = qemu_fopen_rdma(rdma, "wb"); 3379 migrate_fd_connect(s); 3380 return; 3381 err: 3382 error_propagate(errp, local_err); 3383 g_free(rdma); 3384 migrate_fd_error(s); 3385 } 3386