1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * Copyright Red Hat, Inc. 2015-2016 6 * 7 * Authors: 8 * Michael R. Hines <mrhines@us.ibm.com> 9 * Jiuxing Liu <jl@us.ibm.com> 10 * Daniel P. Berrange <berrange@redhat.com> 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or 13 * later. See the COPYING file in the top-level directory. 14 * 15 */ 16 #include "qemu/osdep.h" 17 #include "qapi/error.h" 18 #include "qemu-common.h" 19 #include "qemu/cutils.h" 20 #include "rdma.h" 21 #include "migration.h" 22 #include "qemu-file.h" 23 #include "ram.h" 24 #include "qemu-file-channel.h" 25 #include "qemu/error-report.h" 26 #include "qemu/main-loop.h" 27 #include "qemu/sockets.h" 28 #include "qemu/bitmap.h" 29 #include "qemu/coroutine.h" 30 #include <sys/socket.h> 31 #include <netdb.h> 32 #include <arpa/inet.h> 33 #include <rdma/rdma_cma.h> 34 #include "trace.h" 35 36 /* 37 * Print and error on both the Monitor and the Log file. 38 */ 39 #define ERROR(errp, fmt, ...) \ 40 do { \ 41 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \ 42 if (errp && (*(errp) == NULL)) { \ 43 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \ 44 } \ 45 } while (0) 46 47 #define RDMA_RESOLVE_TIMEOUT_MS 10000 48 49 /* Do not merge data if larger than this. */ 50 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 51 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 52 53 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 54 55 /* 56 * This is only for non-live state being migrated. 57 * Instead of RDMA_WRITE messages, we use RDMA_SEND 58 * messages for that state, which requires a different 59 * delivery design than main memory. 60 */ 61 #define RDMA_SEND_INCREMENT 32768 62 63 /* 64 * Maximum size infiniband SEND message 65 */ 66 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 67 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 68 69 #define RDMA_CONTROL_VERSION_CURRENT 1 70 /* 71 * Capabilities for negotiation. 72 */ 73 #define RDMA_CAPABILITY_PIN_ALL 0x01 74 75 /* 76 * Add the other flags above to this list of known capabilities 77 * as they are introduced. 78 */ 79 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 80 81 #define CHECK_ERROR_STATE() \ 82 do { \ 83 if (rdma->error_state) { \ 84 if (!rdma->error_reported) { \ 85 error_report("RDMA is in an error state waiting migration" \ 86 " to abort!"); \ 87 rdma->error_reported = 1; \ 88 } \ 89 return rdma->error_state; \ 90 } \ 91 } while (0); 92 93 /* 94 * A work request ID is 64-bits and we split up these bits 95 * into 3 parts: 96 * 97 * bits 0-15 : type of control message, 2^16 98 * bits 16-29: ram block index, 2^14 99 * bits 30-63: ram block chunk number, 2^34 100 * 101 * The last two bit ranges are only used for RDMA writes, 102 * in order to track their completion and potentially 103 * also track unregistration status of the message. 104 */ 105 #define RDMA_WRID_TYPE_SHIFT 0UL 106 #define RDMA_WRID_BLOCK_SHIFT 16UL 107 #define RDMA_WRID_CHUNK_SHIFT 30UL 108 109 #define RDMA_WRID_TYPE_MASK \ 110 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 111 112 #define RDMA_WRID_BLOCK_MASK \ 113 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 114 115 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 116 117 /* 118 * RDMA migration protocol: 119 * 1. RDMA Writes (data messages, i.e. RAM) 120 * 2. IB Send/Recv (control channel messages) 121 */ 122 enum { 123 RDMA_WRID_NONE = 0, 124 RDMA_WRID_RDMA_WRITE = 1, 125 RDMA_WRID_SEND_CONTROL = 2000, 126 RDMA_WRID_RECV_CONTROL = 4000, 127 }; 128 129 static const char *wrid_desc[] = { 130 [RDMA_WRID_NONE] = "NONE", 131 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA", 132 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND", 133 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", 134 }; 135 136 /* 137 * Work request IDs for IB SEND messages only (not RDMA writes). 138 * This is used by the migration protocol to transmit 139 * control messages (such as device state and registration commands) 140 * 141 * We could use more WRs, but we have enough for now. 142 */ 143 enum { 144 RDMA_WRID_READY = 0, 145 RDMA_WRID_DATA, 146 RDMA_WRID_CONTROL, 147 RDMA_WRID_MAX, 148 }; 149 150 /* 151 * SEND/RECV IB Control Messages. 152 */ 153 enum { 154 RDMA_CONTROL_NONE = 0, 155 RDMA_CONTROL_ERROR, 156 RDMA_CONTROL_READY, /* ready to receive */ 157 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 158 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 159 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 160 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 161 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 162 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 163 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 164 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 165 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 166 }; 167 168 static const char *control_desc[] = { 169 [RDMA_CONTROL_NONE] = "NONE", 170 [RDMA_CONTROL_ERROR] = "ERROR", 171 [RDMA_CONTROL_READY] = "READY", 172 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 173 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 174 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 175 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 176 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 177 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 178 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 179 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 180 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 181 }; 182 183 /* 184 * Memory and MR structures used to represent an IB Send/Recv work request. 185 * This is *not* used for RDMA writes, only IB Send/Recv. 186 */ 187 typedef struct { 188 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 189 struct ibv_mr *control_mr; /* registration metadata */ 190 size_t control_len; /* length of the message */ 191 uint8_t *control_curr; /* start of unconsumed bytes */ 192 } RDMAWorkRequestData; 193 194 /* 195 * Negotiate RDMA capabilities during connection-setup time. 196 */ 197 typedef struct { 198 uint32_t version; 199 uint32_t flags; 200 } RDMACapabilities; 201 202 static void caps_to_network(RDMACapabilities *cap) 203 { 204 cap->version = htonl(cap->version); 205 cap->flags = htonl(cap->flags); 206 } 207 208 static void network_to_caps(RDMACapabilities *cap) 209 { 210 cap->version = ntohl(cap->version); 211 cap->flags = ntohl(cap->flags); 212 } 213 214 /* 215 * Representation of a RAMBlock from an RDMA perspective. 216 * This is not transmitted, only local. 217 * This and subsequent structures cannot be linked lists 218 * because we're using a single IB message to transmit 219 * the information. It's small anyway, so a list is overkill. 220 */ 221 typedef struct RDMALocalBlock { 222 char *block_name; 223 uint8_t *local_host_addr; /* local virtual address */ 224 uint64_t remote_host_addr; /* remote virtual address */ 225 uint64_t offset; 226 uint64_t length; 227 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 228 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 229 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 230 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 231 int index; /* which block are we */ 232 unsigned int src_index; /* (Only used on dest) */ 233 bool is_ram_block; 234 int nb_chunks; 235 unsigned long *transit_bitmap; 236 unsigned long *unregister_bitmap; 237 } RDMALocalBlock; 238 239 /* 240 * Also represents a RAMblock, but only on the dest. 241 * This gets transmitted by the dest during connection-time 242 * to the source VM and then is used to populate the 243 * corresponding RDMALocalBlock with 244 * the information needed to perform the actual RDMA. 245 */ 246 typedef struct QEMU_PACKED RDMADestBlock { 247 uint64_t remote_host_addr; 248 uint64_t offset; 249 uint64_t length; 250 uint32_t remote_rkey; 251 uint32_t padding; 252 } RDMADestBlock; 253 254 static uint64_t htonll(uint64_t v) 255 { 256 union { uint32_t lv[2]; uint64_t llv; } u; 257 u.lv[0] = htonl(v >> 32); 258 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 259 return u.llv; 260 } 261 262 static uint64_t ntohll(uint64_t v) { 263 union { uint32_t lv[2]; uint64_t llv; } u; 264 u.llv = v; 265 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 266 } 267 268 static void dest_block_to_network(RDMADestBlock *db) 269 { 270 db->remote_host_addr = htonll(db->remote_host_addr); 271 db->offset = htonll(db->offset); 272 db->length = htonll(db->length); 273 db->remote_rkey = htonl(db->remote_rkey); 274 } 275 276 static void network_to_dest_block(RDMADestBlock *db) 277 { 278 db->remote_host_addr = ntohll(db->remote_host_addr); 279 db->offset = ntohll(db->offset); 280 db->length = ntohll(db->length); 281 db->remote_rkey = ntohl(db->remote_rkey); 282 } 283 284 /* 285 * Virtual address of the above structures used for transmitting 286 * the RAMBlock descriptions at connection-time. 287 * This structure is *not* transmitted. 288 */ 289 typedef struct RDMALocalBlocks { 290 int nb_blocks; 291 bool init; /* main memory init complete */ 292 RDMALocalBlock *block; 293 } RDMALocalBlocks; 294 295 /* 296 * Main data structure for RDMA state. 297 * While there is only one copy of this structure being allocated right now, 298 * this is the place where one would start if you wanted to consider 299 * having more than one RDMA connection open at the same time. 300 */ 301 typedef struct RDMAContext { 302 char *host; 303 int port; 304 305 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 306 307 /* 308 * This is used by *_exchange_send() to figure out whether or not 309 * the initial "READY" message has already been received or not. 310 * This is because other functions may potentially poll() and detect 311 * the READY message before send() does, in which case we need to 312 * know if it completed. 313 */ 314 int control_ready_expected; 315 316 /* number of outstanding writes */ 317 int nb_sent; 318 319 /* store info about current buffer so that we can 320 merge it with future sends */ 321 uint64_t current_addr; 322 uint64_t current_length; 323 /* index of ram block the current buffer belongs to */ 324 int current_index; 325 /* index of the chunk in the current ram block */ 326 int current_chunk; 327 328 bool pin_all; 329 330 /* 331 * infiniband-specific variables for opening the device 332 * and maintaining connection state and so forth. 333 * 334 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 335 * cm_id->verbs, cm_id->channel, and cm_id->qp. 336 */ 337 struct rdma_cm_id *cm_id; /* connection manager ID */ 338 struct rdma_cm_id *listen_id; 339 bool connected; 340 341 struct ibv_context *verbs; 342 struct rdma_event_channel *channel; 343 struct ibv_qp *qp; /* queue pair */ 344 struct ibv_comp_channel *comp_channel; /* completion channel */ 345 struct ibv_pd *pd; /* protection domain */ 346 struct ibv_cq *cq; /* completion queue */ 347 348 /* 349 * If a previous write failed (perhaps because of a failed 350 * memory registration, then do not attempt any future work 351 * and remember the error state. 352 */ 353 int error_state; 354 int error_reported; 355 int received_error; 356 357 /* 358 * Description of ram blocks used throughout the code. 359 */ 360 RDMALocalBlocks local_ram_blocks; 361 RDMADestBlock *dest_blocks; 362 363 /* Index of the next RAMBlock received during block registration */ 364 unsigned int next_src_index; 365 366 /* 367 * Migration on *destination* started. 368 * Then use coroutine yield function. 369 * Source runs in a thread, so we don't care. 370 */ 371 int migration_started_on_destination; 372 373 int total_registrations; 374 int total_writes; 375 376 int unregister_current, unregister_next; 377 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 378 379 GHashTable *blockmap; 380 } RDMAContext; 381 382 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" 383 #define QIO_CHANNEL_RDMA(obj) \ 384 OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA) 385 386 typedef struct QIOChannelRDMA QIOChannelRDMA; 387 388 389 struct QIOChannelRDMA { 390 QIOChannel parent; 391 RDMAContext *rdma; 392 QEMUFile *file; 393 size_t len; 394 bool blocking; /* XXX we don't actually honour this yet */ 395 }; 396 397 /* 398 * Main structure for IB Send/Recv control messages. 399 * This gets prepended at the beginning of every Send/Recv. 400 */ 401 typedef struct QEMU_PACKED { 402 uint32_t len; /* Total length of data portion */ 403 uint32_t type; /* which control command to perform */ 404 uint32_t repeat; /* number of commands in data portion of same type */ 405 uint32_t padding; 406 } RDMAControlHeader; 407 408 static void control_to_network(RDMAControlHeader *control) 409 { 410 control->type = htonl(control->type); 411 control->len = htonl(control->len); 412 control->repeat = htonl(control->repeat); 413 } 414 415 static void network_to_control(RDMAControlHeader *control) 416 { 417 control->type = ntohl(control->type); 418 control->len = ntohl(control->len); 419 control->repeat = ntohl(control->repeat); 420 } 421 422 /* 423 * Register a single Chunk. 424 * Information sent by the source VM to inform the dest 425 * to register an single chunk of memory before we can perform 426 * the actual RDMA operation. 427 */ 428 typedef struct QEMU_PACKED { 429 union QEMU_PACKED { 430 uint64_t current_addr; /* offset into the ram_addr_t space */ 431 uint64_t chunk; /* chunk to lookup if unregistering */ 432 } key; 433 uint32_t current_index; /* which ramblock the chunk belongs to */ 434 uint32_t padding; 435 uint64_t chunks; /* how many sequential chunks to register */ 436 } RDMARegister; 437 438 static void register_to_network(RDMAContext *rdma, RDMARegister *reg) 439 { 440 RDMALocalBlock *local_block; 441 local_block = &rdma->local_ram_blocks.block[reg->current_index]; 442 443 if (local_block->is_ram_block) { 444 /* 445 * current_addr as passed in is an address in the local ram_addr_t 446 * space, we need to translate this for the destination 447 */ 448 reg->key.current_addr -= local_block->offset; 449 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset; 450 } 451 reg->key.current_addr = htonll(reg->key.current_addr); 452 reg->current_index = htonl(reg->current_index); 453 reg->chunks = htonll(reg->chunks); 454 } 455 456 static void network_to_register(RDMARegister *reg) 457 { 458 reg->key.current_addr = ntohll(reg->key.current_addr); 459 reg->current_index = ntohl(reg->current_index); 460 reg->chunks = ntohll(reg->chunks); 461 } 462 463 typedef struct QEMU_PACKED { 464 uint32_t value; /* if zero, we will madvise() */ 465 uint32_t block_idx; /* which ram block index */ 466 uint64_t offset; /* Address in remote ram_addr_t space */ 467 uint64_t length; /* length of the chunk */ 468 } RDMACompress; 469 470 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp) 471 { 472 comp->value = htonl(comp->value); 473 /* 474 * comp->offset as passed in is an address in the local ram_addr_t 475 * space, we need to translate this for the destination 476 */ 477 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset; 478 comp->offset += rdma->dest_blocks[comp->block_idx].offset; 479 comp->block_idx = htonl(comp->block_idx); 480 comp->offset = htonll(comp->offset); 481 comp->length = htonll(comp->length); 482 } 483 484 static void network_to_compress(RDMACompress *comp) 485 { 486 comp->value = ntohl(comp->value); 487 comp->block_idx = ntohl(comp->block_idx); 488 comp->offset = ntohll(comp->offset); 489 comp->length = ntohll(comp->length); 490 } 491 492 /* 493 * The result of the dest's memory registration produces an "rkey" 494 * which the source VM must reference in order to perform 495 * the RDMA operation. 496 */ 497 typedef struct QEMU_PACKED { 498 uint32_t rkey; 499 uint32_t padding; 500 uint64_t host_addr; 501 } RDMARegisterResult; 502 503 static void result_to_network(RDMARegisterResult *result) 504 { 505 result->rkey = htonl(result->rkey); 506 result->host_addr = htonll(result->host_addr); 507 }; 508 509 static void network_to_result(RDMARegisterResult *result) 510 { 511 result->rkey = ntohl(result->rkey); 512 result->host_addr = ntohll(result->host_addr); 513 }; 514 515 const char *print_wrid(int wrid); 516 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 517 uint8_t *data, RDMAControlHeader *resp, 518 int *resp_idx, 519 int (*callback)(RDMAContext *rdma)); 520 521 static inline uint64_t ram_chunk_index(const uint8_t *start, 522 const uint8_t *host) 523 { 524 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 525 } 526 527 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 528 uint64_t i) 529 { 530 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 531 (i << RDMA_REG_CHUNK_SHIFT)); 532 } 533 534 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 535 uint64_t i) 536 { 537 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 538 (1UL << RDMA_REG_CHUNK_SHIFT); 539 540 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 541 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 542 } 543 544 return result; 545 } 546 547 static int rdma_add_block(RDMAContext *rdma, const char *block_name, 548 void *host_addr, 549 ram_addr_t block_offset, uint64_t length) 550 { 551 RDMALocalBlocks *local = &rdma->local_ram_blocks; 552 RDMALocalBlock *block; 553 RDMALocalBlock *old = local->block; 554 555 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1); 556 557 if (local->nb_blocks) { 558 int x; 559 560 if (rdma->blockmap) { 561 for (x = 0; x < local->nb_blocks; x++) { 562 g_hash_table_remove(rdma->blockmap, 563 (void *)(uintptr_t)old[x].offset); 564 g_hash_table_insert(rdma->blockmap, 565 (void *)(uintptr_t)old[x].offset, 566 &local->block[x]); 567 } 568 } 569 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 570 g_free(old); 571 } 572 573 block = &local->block[local->nb_blocks]; 574 575 block->block_name = g_strdup(block_name); 576 block->local_host_addr = host_addr; 577 block->offset = block_offset; 578 block->length = length; 579 block->index = local->nb_blocks; 580 block->src_index = ~0U; /* Filled in by the receipt of the block list */ 581 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 582 block->transit_bitmap = bitmap_new(block->nb_chunks); 583 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 584 block->unregister_bitmap = bitmap_new(block->nb_chunks); 585 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 586 block->remote_keys = g_new0(uint32_t, block->nb_chunks); 587 588 block->is_ram_block = local->init ? false : true; 589 590 if (rdma->blockmap) { 591 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block); 592 } 593 594 trace_rdma_add_block(block_name, local->nb_blocks, 595 (uintptr_t) block->local_host_addr, 596 block->offset, block->length, 597 (uintptr_t) (block->local_host_addr + block->length), 598 BITS_TO_LONGS(block->nb_chunks) * 599 sizeof(unsigned long) * 8, 600 block->nb_chunks); 601 602 local->nb_blocks++; 603 604 return 0; 605 } 606 607 /* 608 * Memory regions need to be registered with the device and queue pairs setup 609 * in advanced before the migration starts. This tells us where the RAM blocks 610 * are so that we can register them individually. 611 */ 612 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr, 613 ram_addr_t block_offset, ram_addr_t length, void *opaque) 614 { 615 return rdma_add_block(opaque, block_name, host_addr, block_offset, length); 616 } 617 618 /* 619 * Identify the RAMBlocks and their quantity. They will be references to 620 * identify chunk boundaries inside each RAMBlock and also be referenced 621 * during dynamic page registration. 622 */ 623 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma) 624 { 625 RDMALocalBlocks *local = &rdma->local_ram_blocks; 626 627 assert(rdma->blockmap == NULL); 628 memset(local, 0, sizeof *local); 629 qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma); 630 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 631 rdma->dest_blocks = g_new0(RDMADestBlock, 632 rdma->local_ram_blocks.nb_blocks); 633 local->init = true; 634 return 0; 635 } 636 637 /* 638 * Note: If used outside of cleanup, the caller must ensure that the destination 639 * block structures are also updated 640 */ 641 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) 642 { 643 RDMALocalBlocks *local = &rdma->local_ram_blocks; 644 RDMALocalBlock *old = local->block; 645 int x; 646 647 if (rdma->blockmap) { 648 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset); 649 } 650 if (block->pmr) { 651 int j; 652 653 for (j = 0; j < block->nb_chunks; j++) { 654 if (!block->pmr[j]) { 655 continue; 656 } 657 ibv_dereg_mr(block->pmr[j]); 658 rdma->total_registrations--; 659 } 660 g_free(block->pmr); 661 block->pmr = NULL; 662 } 663 664 if (block->mr) { 665 ibv_dereg_mr(block->mr); 666 rdma->total_registrations--; 667 block->mr = NULL; 668 } 669 670 g_free(block->transit_bitmap); 671 block->transit_bitmap = NULL; 672 673 g_free(block->unregister_bitmap); 674 block->unregister_bitmap = NULL; 675 676 g_free(block->remote_keys); 677 block->remote_keys = NULL; 678 679 g_free(block->block_name); 680 block->block_name = NULL; 681 682 if (rdma->blockmap) { 683 for (x = 0; x < local->nb_blocks; x++) { 684 g_hash_table_remove(rdma->blockmap, 685 (void *)(uintptr_t)old[x].offset); 686 } 687 } 688 689 if (local->nb_blocks > 1) { 690 691 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1); 692 693 if (block->index) { 694 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 695 } 696 697 if (block->index < (local->nb_blocks - 1)) { 698 memcpy(local->block + block->index, old + (block->index + 1), 699 sizeof(RDMALocalBlock) * 700 (local->nb_blocks - (block->index + 1))); 701 } 702 } else { 703 assert(block == local->block); 704 local->block = NULL; 705 } 706 707 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr, 708 block->offset, block->length, 709 (uintptr_t)(block->local_host_addr + block->length), 710 BITS_TO_LONGS(block->nb_chunks) * 711 sizeof(unsigned long) * 8, block->nb_chunks); 712 713 g_free(old); 714 715 local->nb_blocks--; 716 717 if (local->nb_blocks && rdma->blockmap) { 718 for (x = 0; x < local->nb_blocks; x++) { 719 g_hash_table_insert(rdma->blockmap, 720 (void *)(uintptr_t)local->block[x].offset, 721 &local->block[x]); 722 } 723 } 724 725 return 0; 726 } 727 728 /* 729 * Put in the log file which RDMA device was opened and the details 730 * associated with that device. 731 */ 732 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 733 { 734 struct ibv_port_attr port; 735 736 if (ibv_query_port(verbs, 1, &port)) { 737 error_report("Failed to query port information"); 738 return; 739 } 740 741 printf("%s RDMA Device opened: kernel name %s " 742 "uverbs device name %s, " 743 "infiniband_verbs class device path %s, " 744 "infiniband class device path %s, " 745 "transport: (%d) %s\n", 746 who, 747 verbs->device->name, 748 verbs->device->dev_name, 749 verbs->device->dev_path, 750 verbs->device->ibdev_path, 751 port.link_layer, 752 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" : 753 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 754 ? "Ethernet" : "Unknown")); 755 } 756 757 /* 758 * Put in the log file the RDMA gid addressing information, 759 * useful for folks who have trouble understanding the 760 * RDMA device hierarchy in the kernel. 761 */ 762 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 763 { 764 char sgid[33]; 765 char dgid[33]; 766 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 767 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 768 trace_qemu_rdma_dump_gid(who, sgid, dgid); 769 } 770 771 /* 772 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 773 * We will try the next addrinfo struct, and fail if there are 774 * no other valid addresses to bind against. 775 * 776 * If user is listening on '[::]', then we will not have a opened a device 777 * yet and have no way of verifying if the device is RoCE or not. 778 * 779 * In this case, the source VM will throw an error for ALL types of 780 * connections (both IPv4 and IPv6) if the destination machine does not have 781 * a regular infiniband network available for use. 782 * 783 * The only way to guarantee that an error is thrown for broken kernels is 784 * for the management software to choose a *specific* interface at bind time 785 * and validate what time of hardware it is. 786 * 787 * Unfortunately, this puts the user in a fix: 788 * 789 * If the source VM connects with an IPv4 address without knowing that the 790 * destination has bound to '[::]' the migration will unconditionally fail 791 * unless the management software is explicitly listening on the IPv4 792 * address while using a RoCE-based device. 793 * 794 * If the source VM connects with an IPv6 address, then we're OK because we can 795 * throw an error on the source (and similarly on the destination). 796 * 797 * But in mixed environments, this will be broken for a while until it is fixed 798 * inside linux. 799 * 800 * We do provide a *tiny* bit of help in this function: We can list all of the 801 * devices in the system and check to see if all the devices are RoCE or 802 * Infiniband. 803 * 804 * If we detect that we have a *pure* RoCE environment, then we can safely 805 * thrown an error even if the management software has specified '[::]' as the 806 * bind address. 807 * 808 * However, if there is are multiple hetergeneous devices, then we cannot make 809 * this assumption and the user just has to be sure they know what they are 810 * doing. 811 * 812 * Patches are being reviewed on linux-rdma. 813 */ 814 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp) 815 { 816 struct ibv_port_attr port_attr; 817 818 /* This bug only exists in linux, to our knowledge. */ 819 #ifdef CONFIG_LINUX 820 821 /* 822 * Verbs are only NULL if management has bound to '[::]'. 823 * 824 * Let's iterate through all the devices and see if there any pure IB 825 * devices (non-ethernet). 826 * 827 * If not, then we can safely proceed with the migration. 828 * Otherwise, there are no guarantees until the bug is fixed in linux. 829 */ 830 if (!verbs) { 831 int num_devices, x; 832 struct ibv_device ** dev_list = ibv_get_device_list(&num_devices); 833 bool roce_found = false; 834 bool ib_found = false; 835 836 for (x = 0; x < num_devices; x++) { 837 verbs = ibv_open_device(dev_list[x]); 838 if (!verbs) { 839 if (errno == EPERM) { 840 continue; 841 } else { 842 return -EINVAL; 843 } 844 } 845 846 if (ibv_query_port(verbs, 1, &port_attr)) { 847 ibv_close_device(verbs); 848 ERROR(errp, "Could not query initial IB port"); 849 return -EINVAL; 850 } 851 852 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 853 ib_found = true; 854 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 855 roce_found = true; 856 } 857 858 ibv_close_device(verbs); 859 860 } 861 862 if (roce_found) { 863 if (ib_found) { 864 fprintf(stderr, "WARN: migrations may fail:" 865 " IPv6 over RoCE / iWARP in linux" 866 " is broken. But since you appear to have a" 867 " mixed RoCE / IB environment, be sure to only" 868 " migrate over the IB fabric until the kernel " 869 " fixes the bug.\n"); 870 } else { 871 ERROR(errp, "You only have RoCE / iWARP devices in your systems" 872 " and your management software has specified '[::]'" 873 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 874 return -ENONET; 875 } 876 } 877 878 return 0; 879 } 880 881 /* 882 * If we have a verbs context, that means that some other than '[::]' was 883 * used by the management software for binding. In which case we can 884 * actually warn the user about a potentially broken kernel. 885 */ 886 887 /* IB ports start with 1, not 0 */ 888 if (ibv_query_port(verbs, 1, &port_attr)) { 889 ERROR(errp, "Could not query initial IB port"); 890 return -EINVAL; 891 } 892 893 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 894 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 " 895 "(but patches on linux-rdma in progress)"); 896 return -ENONET; 897 } 898 899 #endif 900 901 return 0; 902 } 903 904 /* 905 * Figure out which RDMA device corresponds to the requested IP hostname 906 * Also create the initial connection manager identifiers for opening 907 * the connection. 908 */ 909 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 910 { 911 int ret; 912 struct rdma_addrinfo *res; 913 char port_str[16]; 914 struct rdma_cm_event *cm_event; 915 char ip[40] = "unknown"; 916 struct rdma_addrinfo *e; 917 918 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 919 ERROR(errp, "RDMA hostname has not been set"); 920 return -EINVAL; 921 } 922 923 /* create CM channel */ 924 rdma->channel = rdma_create_event_channel(); 925 if (!rdma->channel) { 926 ERROR(errp, "could not create CM channel"); 927 return -EINVAL; 928 } 929 930 /* create CM id */ 931 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 932 if (ret) { 933 ERROR(errp, "could not create channel id"); 934 goto err_resolve_create_id; 935 } 936 937 snprintf(port_str, 16, "%d", rdma->port); 938 port_str[15] = '\0'; 939 940 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 941 if (ret < 0) { 942 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 943 goto err_resolve_get_addr; 944 } 945 946 for (e = res; e != NULL; e = e->ai_next) { 947 inet_ntop(e->ai_family, 948 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 949 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 950 951 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 952 RDMA_RESOLVE_TIMEOUT_MS); 953 if (!ret) { 954 if (e->ai_family == AF_INET6) { 955 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp); 956 if (ret) { 957 continue; 958 } 959 } 960 goto route; 961 } 962 } 963 964 ERROR(errp, "could not resolve address %s", rdma->host); 965 goto err_resolve_get_addr; 966 967 route: 968 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 969 970 ret = rdma_get_cm_event(rdma->channel, &cm_event); 971 if (ret) { 972 ERROR(errp, "could not perform event_addr_resolved"); 973 goto err_resolve_get_addr; 974 } 975 976 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 977 ERROR(errp, "result not equal to event_addr_resolved %s", 978 rdma_event_str(cm_event->event)); 979 perror("rdma_resolve_addr"); 980 rdma_ack_cm_event(cm_event); 981 ret = -EINVAL; 982 goto err_resolve_get_addr; 983 } 984 rdma_ack_cm_event(cm_event); 985 986 /* resolve route */ 987 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 988 if (ret) { 989 ERROR(errp, "could not resolve rdma route"); 990 goto err_resolve_get_addr; 991 } 992 993 ret = rdma_get_cm_event(rdma->channel, &cm_event); 994 if (ret) { 995 ERROR(errp, "could not perform event_route_resolved"); 996 goto err_resolve_get_addr; 997 } 998 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 999 ERROR(errp, "result not equal to event_route_resolved: %s", 1000 rdma_event_str(cm_event->event)); 1001 rdma_ack_cm_event(cm_event); 1002 ret = -EINVAL; 1003 goto err_resolve_get_addr; 1004 } 1005 rdma_ack_cm_event(cm_event); 1006 rdma->verbs = rdma->cm_id->verbs; 1007 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 1008 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 1009 return 0; 1010 1011 err_resolve_get_addr: 1012 rdma_destroy_id(rdma->cm_id); 1013 rdma->cm_id = NULL; 1014 err_resolve_create_id: 1015 rdma_destroy_event_channel(rdma->channel); 1016 rdma->channel = NULL; 1017 return ret; 1018 } 1019 1020 /* 1021 * Create protection domain and completion queues 1022 */ 1023 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma) 1024 { 1025 /* allocate pd */ 1026 rdma->pd = ibv_alloc_pd(rdma->verbs); 1027 if (!rdma->pd) { 1028 error_report("failed to allocate protection domain"); 1029 return -1; 1030 } 1031 1032 /* create completion channel */ 1033 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs); 1034 if (!rdma->comp_channel) { 1035 error_report("failed to allocate completion channel"); 1036 goto err_alloc_pd_cq; 1037 } 1038 1039 /* 1040 * Completion queue can be filled by both read and write work requests, 1041 * so must reflect the sum of both possible queue sizes. 1042 */ 1043 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1044 NULL, rdma->comp_channel, 0); 1045 if (!rdma->cq) { 1046 error_report("failed to allocate completion queue"); 1047 goto err_alloc_pd_cq; 1048 } 1049 1050 return 0; 1051 1052 err_alloc_pd_cq: 1053 if (rdma->pd) { 1054 ibv_dealloc_pd(rdma->pd); 1055 } 1056 if (rdma->comp_channel) { 1057 ibv_destroy_comp_channel(rdma->comp_channel); 1058 } 1059 rdma->pd = NULL; 1060 rdma->comp_channel = NULL; 1061 return -1; 1062 1063 } 1064 1065 /* 1066 * Create queue pairs. 1067 */ 1068 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1069 { 1070 struct ibv_qp_init_attr attr = { 0 }; 1071 int ret; 1072 1073 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1074 attr.cap.max_recv_wr = 3; 1075 attr.cap.max_send_sge = 1; 1076 attr.cap.max_recv_sge = 1; 1077 attr.send_cq = rdma->cq; 1078 attr.recv_cq = rdma->cq; 1079 attr.qp_type = IBV_QPT_RC; 1080 1081 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr); 1082 if (ret) { 1083 return -1; 1084 } 1085 1086 rdma->qp = rdma->cm_id->qp; 1087 return 0; 1088 } 1089 1090 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma) 1091 { 1092 int i; 1093 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1094 1095 for (i = 0; i < local->nb_blocks; i++) { 1096 local->block[i].mr = 1097 ibv_reg_mr(rdma->pd, 1098 local->block[i].local_host_addr, 1099 local->block[i].length, 1100 IBV_ACCESS_LOCAL_WRITE | 1101 IBV_ACCESS_REMOTE_WRITE 1102 ); 1103 if (!local->block[i].mr) { 1104 perror("Failed to register local dest ram block!\n"); 1105 break; 1106 } 1107 rdma->total_registrations++; 1108 } 1109 1110 if (i >= local->nb_blocks) { 1111 return 0; 1112 } 1113 1114 for (i--; i >= 0; i--) { 1115 ibv_dereg_mr(local->block[i].mr); 1116 rdma->total_registrations--; 1117 } 1118 1119 return -1; 1120 1121 } 1122 1123 /* 1124 * Find the ram block that corresponds to the page requested to be 1125 * transmitted by QEMU. 1126 * 1127 * Once the block is found, also identify which 'chunk' within that 1128 * block that the page belongs to. 1129 * 1130 * This search cannot fail or the migration will fail. 1131 */ 1132 static int qemu_rdma_search_ram_block(RDMAContext *rdma, 1133 uintptr_t block_offset, 1134 uint64_t offset, 1135 uint64_t length, 1136 uint64_t *block_index, 1137 uint64_t *chunk_index) 1138 { 1139 uint64_t current_addr = block_offset + offset; 1140 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1141 (void *) block_offset); 1142 assert(block); 1143 assert(current_addr >= block->offset); 1144 assert((current_addr + length) <= (block->offset + block->length)); 1145 1146 *block_index = block->index; 1147 *chunk_index = ram_chunk_index(block->local_host_addr, 1148 block->local_host_addr + (current_addr - block->offset)); 1149 1150 return 0; 1151 } 1152 1153 /* 1154 * Register a chunk with IB. If the chunk was already registered 1155 * previously, then skip. 1156 * 1157 * Also return the keys associated with the registration needed 1158 * to perform the actual RDMA operation. 1159 */ 1160 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1161 RDMALocalBlock *block, uintptr_t host_addr, 1162 uint32_t *lkey, uint32_t *rkey, int chunk, 1163 uint8_t *chunk_start, uint8_t *chunk_end) 1164 { 1165 if (block->mr) { 1166 if (lkey) { 1167 *lkey = block->mr->lkey; 1168 } 1169 if (rkey) { 1170 *rkey = block->mr->rkey; 1171 } 1172 return 0; 1173 } 1174 1175 /* allocate memory to store chunk MRs */ 1176 if (!block->pmr) { 1177 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks); 1178 } 1179 1180 /* 1181 * If 'rkey', then we're the destination, so grant access to the source. 1182 * 1183 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1184 */ 1185 if (!block->pmr[chunk]) { 1186 uint64_t len = chunk_end - chunk_start; 1187 1188 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1189 1190 block->pmr[chunk] = ibv_reg_mr(rdma->pd, 1191 chunk_start, len, 1192 (rkey ? (IBV_ACCESS_LOCAL_WRITE | 1193 IBV_ACCESS_REMOTE_WRITE) : 0)); 1194 1195 if (!block->pmr[chunk]) { 1196 perror("Failed to register chunk!"); 1197 fprintf(stderr, "Chunk details: block: %d chunk index %d" 1198 " start %" PRIuPTR " end %" PRIuPTR 1199 " host %" PRIuPTR 1200 " local %" PRIuPTR " registrations: %d\n", 1201 block->index, chunk, (uintptr_t)chunk_start, 1202 (uintptr_t)chunk_end, host_addr, 1203 (uintptr_t)block->local_host_addr, 1204 rdma->total_registrations); 1205 return -1; 1206 } 1207 rdma->total_registrations++; 1208 } 1209 1210 if (lkey) { 1211 *lkey = block->pmr[chunk]->lkey; 1212 } 1213 if (rkey) { 1214 *rkey = block->pmr[chunk]->rkey; 1215 } 1216 return 0; 1217 } 1218 1219 /* 1220 * Register (at connection time) the memory used for control 1221 * channel messages. 1222 */ 1223 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1224 { 1225 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1226 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1227 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1228 if (rdma->wr_data[idx].control_mr) { 1229 rdma->total_registrations++; 1230 return 0; 1231 } 1232 error_report("qemu_rdma_reg_control failed"); 1233 return -1; 1234 } 1235 1236 const char *print_wrid(int wrid) 1237 { 1238 if (wrid >= RDMA_WRID_RECV_CONTROL) { 1239 return wrid_desc[RDMA_WRID_RECV_CONTROL]; 1240 } 1241 return wrid_desc[wrid]; 1242 } 1243 1244 /* 1245 * RDMA requires memory registration (mlock/pinning), but this is not good for 1246 * overcommitment. 1247 * 1248 * In preparation for the future where LRU information or workload-specific 1249 * writable writable working set memory access behavior is available to QEMU 1250 * it would be nice to have in place the ability to UN-register/UN-pin 1251 * particular memory regions from the RDMA hardware when it is determine that 1252 * those regions of memory will likely not be accessed again in the near future. 1253 * 1254 * While we do not yet have such information right now, the following 1255 * compile-time option allows us to perform a non-optimized version of this 1256 * behavior. 1257 * 1258 * By uncommenting this option, you will cause *all* RDMA transfers to be 1259 * unregistered immediately after the transfer completes on both sides of the 1260 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode. 1261 * 1262 * This will have a terrible impact on migration performance, so until future 1263 * workload information or LRU information is available, do not attempt to use 1264 * this feature except for basic testing. 1265 */ 1266 //#define RDMA_UNREGISTRATION_EXAMPLE 1267 1268 /* 1269 * Perform a non-optimized memory unregistration after every transfer 1270 * for demonstration purposes, only if pin-all is not requested. 1271 * 1272 * Potential optimizations: 1273 * 1. Start a new thread to run this function continuously 1274 - for bit clearing 1275 - and for receipt of unregister messages 1276 * 2. Use an LRU. 1277 * 3. Use workload hints. 1278 */ 1279 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1280 { 1281 while (rdma->unregistrations[rdma->unregister_current]) { 1282 int ret; 1283 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1284 uint64_t chunk = 1285 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1286 uint64_t index = 1287 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1288 RDMALocalBlock *block = 1289 &(rdma->local_ram_blocks.block[index]); 1290 RDMARegister reg = { .current_index = index }; 1291 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1292 }; 1293 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1294 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1295 .repeat = 1, 1296 }; 1297 1298 trace_qemu_rdma_unregister_waiting_proc(chunk, 1299 rdma->unregister_current); 1300 1301 rdma->unregistrations[rdma->unregister_current] = 0; 1302 rdma->unregister_current++; 1303 1304 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1305 rdma->unregister_current = 0; 1306 } 1307 1308 1309 /* 1310 * Unregistration is speculative (because migration is single-threaded 1311 * and we cannot break the protocol's inifinband message ordering). 1312 * Thus, if the memory is currently being used for transmission, 1313 * then abort the attempt to unregister and try again 1314 * later the next time a completion is received for this memory. 1315 */ 1316 clear_bit(chunk, block->unregister_bitmap); 1317 1318 if (test_bit(chunk, block->transit_bitmap)) { 1319 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1320 continue; 1321 } 1322 1323 trace_qemu_rdma_unregister_waiting_send(chunk); 1324 1325 ret = ibv_dereg_mr(block->pmr[chunk]); 1326 block->pmr[chunk] = NULL; 1327 block->remote_keys[chunk] = 0; 1328 1329 if (ret != 0) { 1330 perror("unregistration chunk failed"); 1331 return -ret; 1332 } 1333 rdma->total_registrations--; 1334 1335 reg.key.chunk = chunk; 1336 register_to_network(rdma, ®); 1337 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1338 &resp, NULL, NULL); 1339 if (ret < 0) { 1340 return ret; 1341 } 1342 1343 trace_qemu_rdma_unregister_waiting_complete(chunk); 1344 } 1345 1346 return 0; 1347 } 1348 1349 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1350 uint64_t chunk) 1351 { 1352 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1353 1354 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1355 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1356 1357 return result; 1358 } 1359 1360 /* 1361 * Set bit for unregistration in the next iteration. 1362 * We cannot transmit right here, but will unpin later. 1363 */ 1364 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index, 1365 uint64_t chunk, uint64_t wr_id) 1366 { 1367 if (rdma->unregistrations[rdma->unregister_next] != 0) { 1368 error_report("rdma migration: queue is full"); 1369 } else { 1370 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1371 1372 if (!test_and_set_bit(chunk, block->unregister_bitmap)) { 1373 trace_qemu_rdma_signal_unregister_append(chunk, 1374 rdma->unregister_next); 1375 1376 rdma->unregistrations[rdma->unregister_next++] = 1377 qemu_rdma_make_wrid(wr_id, index, chunk); 1378 1379 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) { 1380 rdma->unregister_next = 0; 1381 } 1382 } else { 1383 trace_qemu_rdma_signal_unregister_already(chunk); 1384 } 1385 } 1386 } 1387 1388 /* 1389 * Consult the connection manager to see a work request 1390 * (of any kind) has completed. 1391 * Return the work request ID that completed. 1392 */ 1393 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out, 1394 uint32_t *byte_len) 1395 { 1396 int ret; 1397 struct ibv_wc wc; 1398 uint64_t wr_id; 1399 1400 ret = ibv_poll_cq(rdma->cq, 1, &wc); 1401 1402 if (!ret) { 1403 *wr_id_out = RDMA_WRID_NONE; 1404 return 0; 1405 } 1406 1407 if (ret < 0) { 1408 error_report("ibv_poll_cq return %d", ret); 1409 return ret; 1410 } 1411 1412 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1413 1414 if (wc.status != IBV_WC_SUCCESS) { 1415 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n", 1416 wc.status, ibv_wc_status_str(wc.status)); 1417 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]); 1418 1419 return -1; 1420 } 1421 1422 if (rdma->control_ready_expected && 1423 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1424 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL], 1425 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent); 1426 rdma->control_ready_expected = 0; 1427 } 1428 1429 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1430 uint64_t chunk = 1431 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1432 uint64_t index = 1433 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1434 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1435 1436 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent, 1437 index, chunk, block->local_host_addr, 1438 (void *)(uintptr_t)block->remote_host_addr); 1439 1440 clear_bit(chunk, block->transit_bitmap); 1441 1442 if (rdma->nb_sent > 0) { 1443 rdma->nb_sent--; 1444 } 1445 1446 if (!rdma->pin_all) { 1447 /* 1448 * FYI: If one wanted to signal a specific chunk to be unregistered 1449 * using LRU or workload-specific information, this is the function 1450 * you would call to do so. That chunk would then get asynchronously 1451 * unregistered later. 1452 */ 1453 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1454 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id); 1455 #endif 1456 } 1457 } else { 1458 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent); 1459 } 1460 1461 *wr_id_out = wc.wr_id; 1462 if (byte_len) { 1463 *byte_len = wc.byte_len; 1464 } 1465 1466 return 0; 1467 } 1468 1469 /* 1470 * Block until the next work request has completed. 1471 * 1472 * First poll to see if a work request has already completed, 1473 * otherwise block. 1474 * 1475 * If we encounter completed work requests for IDs other than 1476 * the one we're interested in, then that's generally an error. 1477 * 1478 * The only exception is actual RDMA Write completions. These 1479 * completions only need to be recorded, but do not actually 1480 * need further processing. 1481 */ 1482 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested, 1483 uint32_t *byte_len) 1484 { 1485 int num_cq_events = 0, ret = 0; 1486 struct ibv_cq *cq; 1487 void *cq_ctx; 1488 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1489 1490 if (ibv_req_notify_cq(rdma->cq, 0)) { 1491 return -1; 1492 } 1493 /* poll cq first */ 1494 while (wr_id != wrid_requested) { 1495 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1496 if (ret < 0) { 1497 return ret; 1498 } 1499 1500 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1501 1502 if (wr_id == RDMA_WRID_NONE) { 1503 break; 1504 } 1505 if (wr_id != wrid_requested) { 1506 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1507 wrid_requested, print_wrid(wr_id), wr_id); 1508 } 1509 } 1510 1511 if (wr_id == wrid_requested) { 1512 return 0; 1513 } 1514 1515 while (1) { 1516 /* 1517 * Coroutine doesn't start until migration_fd_process_incoming() 1518 * so don't yield unless we know we're running inside of a coroutine. 1519 */ 1520 if (rdma->migration_started_on_destination) { 1521 yield_until_fd_readable(rdma->comp_channel->fd); 1522 } 1523 1524 if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) { 1525 perror("ibv_get_cq_event"); 1526 goto err_block_for_wrid; 1527 } 1528 1529 num_cq_events++; 1530 1531 if (ibv_req_notify_cq(cq, 0)) { 1532 goto err_block_for_wrid; 1533 } 1534 1535 while (wr_id != wrid_requested) { 1536 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1537 if (ret < 0) { 1538 goto err_block_for_wrid; 1539 } 1540 1541 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1542 1543 if (wr_id == RDMA_WRID_NONE) { 1544 break; 1545 } 1546 if (wr_id != wrid_requested) { 1547 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1548 wrid_requested, print_wrid(wr_id), wr_id); 1549 } 1550 } 1551 1552 if (wr_id == wrid_requested) { 1553 goto success_block_for_wrid; 1554 } 1555 } 1556 1557 success_block_for_wrid: 1558 if (num_cq_events) { 1559 ibv_ack_cq_events(cq, num_cq_events); 1560 } 1561 return 0; 1562 1563 err_block_for_wrid: 1564 if (num_cq_events) { 1565 ibv_ack_cq_events(cq, num_cq_events); 1566 } 1567 return ret; 1568 } 1569 1570 /* 1571 * Post a SEND message work request for the control channel 1572 * containing some data and block until the post completes. 1573 */ 1574 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1575 RDMAControlHeader *head) 1576 { 1577 int ret = 0; 1578 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1579 struct ibv_send_wr *bad_wr; 1580 struct ibv_sge sge = { 1581 .addr = (uintptr_t)(wr->control), 1582 .length = head->len + sizeof(RDMAControlHeader), 1583 .lkey = wr->control_mr->lkey, 1584 }; 1585 struct ibv_send_wr send_wr = { 1586 .wr_id = RDMA_WRID_SEND_CONTROL, 1587 .opcode = IBV_WR_SEND, 1588 .send_flags = IBV_SEND_SIGNALED, 1589 .sg_list = &sge, 1590 .num_sge = 1, 1591 }; 1592 1593 trace_qemu_rdma_post_send_control(control_desc[head->type]); 1594 1595 /* 1596 * We don't actually need to do a memcpy() in here if we used 1597 * the "sge" properly, but since we're only sending control messages 1598 * (not RAM in a performance-critical path), then its OK for now. 1599 * 1600 * The copy makes the RDMAControlHeader simpler to manipulate 1601 * for the time being. 1602 */ 1603 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1604 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1605 control_to_network((void *) wr->control); 1606 1607 if (buf) { 1608 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1609 } 1610 1611 1612 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1613 1614 if (ret > 0) { 1615 error_report("Failed to use post IB SEND for control"); 1616 return -ret; 1617 } 1618 1619 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1620 if (ret < 0) { 1621 error_report("rdma migration: send polling control error"); 1622 } 1623 1624 return ret; 1625 } 1626 1627 /* 1628 * Post a RECV work request in anticipation of some future receipt 1629 * of data on the control channel. 1630 */ 1631 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx) 1632 { 1633 struct ibv_recv_wr *bad_wr; 1634 struct ibv_sge sge = { 1635 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1636 .length = RDMA_CONTROL_MAX_BUFFER, 1637 .lkey = rdma->wr_data[idx].control_mr->lkey, 1638 }; 1639 1640 struct ibv_recv_wr recv_wr = { 1641 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1642 .sg_list = &sge, 1643 .num_sge = 1, 1644 }; 1645 1646 1647 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1648 return -1; 1649 } 1650 1651 return 0; 1652 } 1653 1654 /* 1655 * Block and wait for a RECV control channel message to arrive. 1656 */ 1657 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1658 RDMAControlHeader *head, int expecting, int idx) 1659 { 1660 uint32_t byte_len; 1661 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1662 &byte_len); 1663 1664 if (ret < 0) { 1665 error_report("rdma migration: recv polling control error!"); 1666 return ret; 1667 } 1668 1669 network_to_control((void *) rdma->wr_data[idx].control); 1670 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1671 1672 trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]); 1673 1674 if (expecting == RDMA_CONTROL_NONE) { 1675 trace_qemu_rdma_exchange_get_response_none(control_desc[head->type], 1676 head->type); 1677 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1678 error_report("Was expecting a %s (%d) control message" 1679 ", but got: %s (%d), length: %d", 1680 control_desc[expecting], expecting, 1681 control_desc[head->type], head->type, head->len); 1682 if (head->type == RDMA_CONTROL_ERROR) { 1683 rdma->received_error = true; 1684 } 1685 return -EIO; 1686 } 1687 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1688 error_report("too long length: %d", head->len); 1689 return -EINVAL; 1690 } 1691 if (sizeof(*head) + head->len != byte_len) { 1692 error_report("Malformed length: %d byte_len %d", head->len, byte_len); 1693 return -EINVAL; 1694 } 1695 1696 return 0; 1697 } 1698 1699 /* 1700 * When a RECV work request has completed, the work request's 1701 * buffer is pointed at the header. 1702 * 1703 * This will advance the pointer to the data portion 1704 * of the control message of the work request's buffer that 1705 * was populated after the work request finished. 1706 */ 1707 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1708 RDMAControlHeader *head) 1709 { 1710 rdma->wr_data[idx].control_len = head->len; 1711 rdma->wr_data[idx].control_curr = 1712 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1713 } 1714 1715 /* 1716 * This is an 'atomic' high-level operation to deliver a single, unified 1717 * control-channel message. 1718 * 1719 * Additionally, if the user is expecting some kind of reply to this message, 1720 * they can request a 'resp' response message be filled in by posting an 1721 * additional work request on behalf of the user and waiting for an additional 1722 * completion. 1723 * 1724 * The extra (optional) response is used during registration to us from having 1725 * to perform an *additional* exchange of message just to provide a response by 1726 * instead piggy-backing on the acknowledgement. 1727 */ 1728 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1729 uint8_t *data, RDMAControlHeader *resp, 1730 int *resp_idx, 1731 int (*callback)(RDMAContext *rdma)) 1732 { 1733 int ret = 0; 1734 1735 /* 1736 * Wait until the dest is ready before attempting to deliver the message 1737 * by waiting for a READY message. 1738 */ 1739 if (rdma->control_ready_expected) { 1740 RDMAControlHeader resp; 1741 ret = qemu_rdma_exchange_get_response(rdma, 1742 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY); 1743 if (ret < 0) { 1744 return ret; 1745 } 1746 } 1747 1748 /* 1749 * If the user is expecting a response, post a WR in anticipation of it. 1750 */ 1751 if (resp) { 1752 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA); 1753 if (ret) { 1754 error_report("rdma migration: error posting" 1755 " extra control recv for anticipated result!"); 1756 return ret; 1757 } 1758 } 1759 1760 /* 1761 * Post a WR to replace the one we just consumed for the READY message. 1762 */ 1763 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1764 if (ret) { 1765 error_report("rdma migration: error posting first control recv!"); 1766 return ret; 1767 } 1768 1769 /* 1770 * Deliver the control message that was requested. 1771 */ 1772 ret = qemu_rdma_post_send_control(rdma, data, head); 1773 1774 if (ret < 0) { 1775 error_report("Failed to send control buffer!"); 1776 return ret; 1777 } 1778 1779 /* 1780 * If we're expecting a response, block and wait for it. 1781 */ 1782 if (resp) { 1783 if (callback) { 1784 trace_qemu_rdma_exchange_send_issue_callback(); 1785 ret = callback(rdma); 1786 if (ret < 0) { 1787 return ret; 1788 } 1789 } 1790 1791 trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]); 1792 ret = qemu_rdma_exchange_get_response(rdma, resp, 1793 resp->type, RDMA_WRID_DATA); 1794 1795 if (ret < 0) { 1796 return ret; 1797 } 1798 1799 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1800 if (resp_idx) { 1801 *resp_idx = RDMA_WRID_DATA; 1802 } 1803 trace_qemu_rdma_exchange_send_received(control_desc[resp->type]); 1804 } 1805 1806 rdma->control_ready_expected = 1; 1807 1808 return 0; 1809 } 1810 1811 /* 1812 * This is an 'atomic' high-level operation to receive a single, unified 1813 * control-channel message. 1814 */ 1815 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 1816 int expecting) 1817 { 1818 RDMAControlHeader ready = { 1819 .len = 0, 1820 .type = RDMA_CONTROL_READY, 1821 .repeat = 1, 1822 }; 1823 int ret; 1824 1825 /* 1826 * Inform the source that we're ready to receive a message. 1827 */ 1828 ret = qemu_rdma_post_send_control(rdma, NULL, &ready); 1829 1830 if (ret < 0) { 1831 error_report("Failed to send control buffer!"); 1832 return ret; 1833 } 1834 1835 /* 1836 * Block and wait for the message. 1837 */ 1838 ret = qemu_rdma_exchange_get_response(rdma, head, 1839 expecting, RDMA_WRID_READY); 1840 1841 if (ret < 0) { 1842 return ret; 1843 } 1844 1845 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 1846 1847 /* 1848 * Post a new RECV work request to replace the one we just consumed. 1849 */ 1850 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1851 if (ret) { 1852 error_report("rdma migration: error posting second control recv!"); 1853 return ret; 1854 } 1855 1856 return 0; 1857 } 1858 1859 /* 1860 * Write an actual chunk of memory using RDMA. 1861 * 1862 * If we're using dynamic registration on the dest-side, we have to 1863 * send a registration command first. 1864 */ 1865 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, 1866 int current_index, uint64_t current_addr, 1867 uint64_t length) 1868 { 1869 struct ibv_sge sge; 1870 struct ibv_send_wr send_wr = { 0 }; 1871 struct ibv_send_wr *bad_wr; 1872 int reg_result_idx, ret, count = 0; 1873 uint64_t chunk, chunks; 1874 uint8_t *chunk_start, *chunk_end; 1875 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 1876 RDMARegister reg; 1877 RDMARegisterResult *reg_result; 1878 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 1879 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1880 .type = RDMA_CONTROL_REGISTER_REQUEST, 1881 .repeat = 1, 1882 }; 1883 1884 retry: 1885 sge.addr = (uintptr_t)(block->local_host_addr + 1886 (current_addr - block->offset)); 1887 sge.length = length; 1888 1889 chunk = ram_chunk_index(block->local_host_addr, 1890 (uint8_t *)(uintptr_t)sge.addr); 1891 chunk_start = ram_chunk_start(block, chunk); 1892 1893 if (block->is_ram_block) { 1894 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 1895 1896 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1897 chunks--; 1898 } 1899 } else { 1900 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 1901 1902 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1903 chunks--; 1904 } 1905 } 1906 1907 trace_qemu_rdma_write_one_top(chunks + 1, 1908 (chunks + 1) * 1909 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 1910 1911 chunk_end = ram_chunk_end(block, chunk + chunks); 1912 1913 if (!rdma->pin_all) { 1914 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1915 qemu_rdma_unregister_waiting(rdma); 1916 #endif 1917 } 1918 1919 while (test_bit(chunk, block->transit_bitmap)) { 1920 (void)count; 1921 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 1922 sge.addr, length, rdma->nb_sent, block->nb_chunks); 1923 1924 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 1925 1926 if (ret < 0) { 1927 error_report("Failed to Wait for previous write to complete " 1928 "block %d chunk %" PRIu64 1929 " current %" PRIu64 " len %" PRIu64 " %d", 1930 current_index, chunk, sge.addr, length, rdma->nb_sent); 1931 return ret; 1932 } 1933 } 1934 1935 if (!rdma->pin_all || !block->is_ram_block) { 1936 if (!block->remote_keys[chunk]) { 1937 /* 1938 * This chunk has not yet been registered, so first check to see 1939 * if the entire chunk is zero. If so, tell the other size to 1940 * memset() + madvise() the entire chunk without RDMA. 1941 */ 1942 1943 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) { 1944 RDMACompress comp = { 1945 .offset = current_addr, 1946 .value = 0, 1947 .block_idx = current_index, 1948 .length = length, 1949 }; 1950 1951 head.len = sizeof(comp); 1952 head.type = RDMA_CONTROL_COMPRESS; 1953 1954 trace_qemu_rdma_write_one_zero(chunk, sge.length, 1955 current_index, current_addr); 1956 1957 compress_to_network(rdma, &comp); 1958 ret = qemu_rdma_exchange_send(rdma, &head, 1959 (uint8_t *) &comp, NULL, NULL, NULL); 1960 1961 if (ret < 0) { 1962 return -EIO; 1963 } 1964 1965 acct_update_position(f, sge.length, true); 1966 1967 return 1; 1968 } 1969 1970 /* 1971 * Otherwise, tell other side to register. 1972 */ 1973 reg.current_index = current_index; 1974 if (block->is_ram_block) { 1975 reg.key.current_addr = current_addr; 1976 } else { 1977 reg.key.chunk = chunk; 1978 } 1979 reg.chunks = chunks; 1980 1981 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 1982 current_addr); 1983 1984 register_to_network(rdma, ®); 1985 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1986 &resp, ®_result_idx, NULL); 1987 if (ret < 0) { 1988 return ret; 1989 } 1990 1991 /* try to overlap this single registration with the one we sent. */ 1992 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1993 &sge.lkey, NULL, chunk, 1994 chunk_start, chunk_end)) { 1995 error_report("cannot get lkey"); 1996 return -EINVAL; 1997 } 1998 1999 reg_result = (RDMARegisterResult *) 2000 rdma->wr_data[reg_result_idx].control_curr; 2001 2002 network_to_result(reg_result); 2003 2004 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 2005 reg_result->rkey, chunk); 2006 2007 block->remote_keys[chunk] = reg_result->rkey; 2008 block->remote_host_addr = reg_result->host_addr; 2009 } else { 2010 /* already registered before */ 2011 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2012 &sge.lkey, NULL, chunk, 2013 chunk_start, chunk_end)) { 2014 error_report("cannot get lkey!"); 2015 return -EINVAL; 2016 } 2017 } 2018 2019 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 2020 } else { 2021 send_wr.wr.rdma.rkey = block->remote_rkey; 2022 2023 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2024 &sge.lkey, NULL, chunk, 2025 chunk_start, chunk_end)) { 2026 error_report("cannot get lkey!"); 2027 return -EINVAL; 2028 } 2029 } 2030 2031 /* 2032 * Encode the ram block index and chunk within this wrid. 2033 * We will use this information at the time of completion 2034 * to figure out which bitmap to check against and then which 2035 * chunk in the bitmap to look for. 2036 */ 2037 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 2038 current_index, chunk); 2039 2040 send_wr.opcode = IBV_WR_RDMA_WRITE; 2041 send_wr.send_flags = IBV_SEND_SIGNALED; 2042 send_wr.sg_list = &sge; 2043 send_wr.num_sge = 1; 2044 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2045 (current_addr - block->offset); 2046 2047 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2048 sge.length); 2049 2050 /* 2051 * ibv_post_send() does not return negative error numbers, 2052 * per the specification they are positive - no idea why. 2053 */ 2054 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2055 2056 if (ret == ENOMEM) { 2057 trace_qemu_rdma_write_one_queue_full(); 2058 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2059 if (ret < 0) { 2060 error_report("rdma migration: failed to make " 2061 "room in full send queue! %d", ret); 2062 return ret; 2063 } 2064 2065 goto retry; 2066 2067 } else if (ret > 0) { 2068 perror("rdma migration: post rdma write failed"); 2069 return -ret; 2070 } 2071 2072 set_bit(chunk, block->transit_bitmap); 2073 acct_update_position(f, sge.length, false); 2074 rdma->total_writes++; 2075 2076 return 0; 2077 } 2078 2079 /* 2080 * Push out any unwritten RDMA operations. 2081 * 2082 * We support sending out multiple chunks at the same time. 2083 * Not all of them need to get signaled in the completion queue. 2084 */ 2085 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) 2086 { 2087 int ret; 2088 2089 if (!rdma->current_length) { 2090 return 0; 2091 } 2092 2093 ret = qemu_rdma_write_one(f, rdma, 2094 rdma->current_index, rdma->current_addr, rdma->current_length); 2095 2096 if (ret < 0) { 2097 return ret; 2098 } 2099 2100 if (ret == 0) { 2101 rdma->nb_sent++; 2102 trace_qemu_rdma_write_flush(rdma->nb_sent); 2103 } 2104 2105 rdma->current_length = 0; 2106 rdma->current_addr = 0; 2107 2108 return 0; 2109 } 2110 2111 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma, 2112 uint64_t offset, uint64_t len) 2113 { 2114 RDMALocalBlock *block; 2115 uint8_t *host_addr; 2116 uint8_t *chunk_end; 2117 2118 if (rdma->current_index < 0) { 2119 return 0; 2120 } 2121 2122 if (rdma->current_chunk < 0) { 2123 return 0; 2124 } 2125 2126 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2127 host_addr = block->local_host_addr + (offset - block->offset); 2128 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2129 2130 if (rdma->current_length == 0) { 2131 return 0; 2132 } 2133 2134 /* 2135 * Only merge into chunk sequentially. 2136 */ 2137 if (offset != (rdma->current_addr + rdma->current_length)) { 2138 return 0; 2139 } 2140 2141 if (offset < block->offset) { 2142 return 0; 2143 } 2144 2145 if ((offset + len) > (block->offset + block->length)) { 2146 return 0; 2147 } 2148 2149 if ((host_addr + len) > chunk_end) { 2150 return 0; 2151 } 2152 2153 return 1; 2154 } 2155 2156 /* 2157 * We're not actually writing here, but doing three things: 2158 * 2159 * 1. Identify the chunk the buffer belongs to. 2160 * 2. If the chunk is full or the buffer doesn't belong to the current 2161 * chunk, then start a new chunk and flush() the old chunk. 2162 * 3. To keep the hardware busy, we also group chunks into batches 2163 * and only require that a batch gets acknowledged in the completion 2164 * qeueue instead of each individual chunk. 2165 */ 2166 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma, 2167 uint64_t block_offset, uint64_t offset, 2168 uint64_t len) 2169 { 2170 uint64_t current_addr = block_offset + offset; 2171 uint64_t index = rdma->current_index; 2172 uint64_t chunk = rdma->current_chunk; 2173 int ret; 2174 2175 /* If we cannot merge it, we flush the current buffer first. */ 2176 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) { 2177 ret = qemu_rdma_write_flush(f, rdma); 2178 if (ret) { 2179 return ret; 2180 } 2181 rdma->current_length = 0; 2182 rdma->current_addr = current_addr; 2183 2184 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2185 offset, len, &index, &chunk); 2186 if (ret) { 2187 error_report("ram block search failed"); 2188 return ret; 2189 } 2190 rdma->current_index = index; 2191 rdma->current_chunk = chunk; 2192 } 2193 2194 /* merge it */ 2195 rdma->current_length += len; 2196 2197 /* flush it if buffer is too large */ 2198 if (rdma->current_length >= RDMA_MERGE_MAX) { 2199 return qemu_rdma_write_flush(f, rdma); 2200 } 2201 2202 return 0; 2203 } 2204 2205 static void qemu_rdma_cleanup(RDMAContext *rdma) 2206 { 2207 struct rdma_cm_event *cm_event; 2208 int ret, idx; 2209 2210 if (rdma->cm_id && rdma->connected) { 2211 if (rdma->error_state && !rdma->received_error) { 2212 RDMAControlHeader head = { .len = 0, 2213 .type = RDMA_CONTROL_ERROR, 2214 .repeat = 1, 2215 }; 2216 error_report("Early error. Sending error."); 2217 qemu_rdma_post_send_control(rdma, NULL, &head); 2218 } 2219 2220 ret = rdma_disconnect(rdma->cm_id); 2221 if (!ret) { 2222 trace_qemu_rdma_cleanup_waiting_for_disconnect(); 2223 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2224 if (!ret) { 2225 rdma_ack_cm_event(cm_event); 2226 } 2227 } 2228 trace_qemu_rdma_cleanup_disconnect(); 2229 rdma->connected = false; 2230 } 2231 2232 g_free(rdma->dest_blocks); 2233 rdma->dest_blocks = NULL; 2234 2235 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2236 if (rdma->wr_data[idx].control_mr) { 2237 rdma->total_registrations--; 2238 ibv_dereg_mr(rdma->wr_data[idx].control_mr); 2239 } 2240 rdma->wr_data[idx].control_mr = NULL; 2241 } 2242 2243 if (rdma->local_ram_blocks.block) { 2244 while (rdma->local_ram_blocks.nb_blocks) { 2245 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); 2246 } 2247 } 2248 2249 if (rdma->qp) { 2250 rdma_destroy_qp(rdma->cm_id); 2251 rdma->qp = NULL; 2252 } 2253 if (rdma->cq) { 2254 ibv_destroy_cq(rdma->cq); 2255 rdma->cq = NULL; 2256 } 2257 if (rdma->comp_channel) { 2258 ibv_destroy_comp_channel(rdma->comp_channel); 2259 rdma->comp_channel = NULL; 2260 } 2261 if (rdma->pd) { 2262 ibv_dealloc_pd(rdma->pd); 2263 rdma->pd = NULL; 2264 } 2265 if (rdma->cm_id) { 2266 rdma_destroy_id(rdma->cm_id); 2267 rdma->cm_id = NULL; 2268 } 2269 if (rdma->listen_id) { 2270 rdma_destroy_id(rdma->listen_id); 2271 rdma->listen_id = NULL; 2272 } 2273 if (rdma->channel) { 2274 rdma_destroy_event_channel(rdma->channel); 2275 rdma->channel = NULL; 2276 } 2277 g_free(rdma->host); 2278 rdma->host = NULL; 2279 } 2280 2281 2282 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp) 2283 { 2284 int ret, idx; 2285 Error *local_err = NULL, **temp = &local_err; 2286 2287 /* 2288 * Will be validated against destination's actual capabilities 2289 * after the connect() completes. 2290 */ 2291 rdma->pin_all = pin_all; 2292 2293 ret = qemu_rdma_resolve_host(rdma, temp); 2294 if (ret) { 2295 goto err_rdma_source_init; 2296 } 2297 2298 ret = qemu_rdma_alloc_pd_cq(rdma); 2299 if (ret) { 2300 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()" 2301 " limits may be too low. Please check $ ulimit -a # and " 2302 "search for 'ulimit -l' in the output"); 2303 goto err_rdma_source_init; 2304 } 2305 2306 ret = qemu_rdma_alloc_qp(rdma); 2307 if (ret) { 2308 ERROR(temp, "rdma migration: error allocating qp!"); 2309 goto err_rdma_source_init; 2310 } 2311 2312 ret = qemu_rdma_init_ram_blocks(rdma); 2313 if (ret) { 2314 ERROR(temp, "rdma migration: error initializing ram blocks!"); 2315 goto err_rdma_source_init; 2316 } 2317 2318 /* Build the hash that maps from offset to RAMBlock */ 2319 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 2320 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) { 2321 g_hash_table_insert(rdma->blockmap, 2322 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset, 2323 &rdma->local_ram_blocks.block[idx]); 2324 } 2325 2326 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2327 ret = qemu_rdma_reg_control(rdma, idx); 2328 if (ret) { 2329 ERROR(temp, "rdma migration: error registering %d control!", 2330 idx); 2331 goto err_rdma_source_init; 2332 } 2333 } 2334 2335 return 0; 2336 2337 err_rdma_source_init: 2338 error_propagate(errp, local_err); 2339 qemu_rdma_cleanup(rdma); 2340 return -1; 2341 } 2342 2343 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp) 2344 { 2345 RDMACapabilities cap = { 2346 .version = RDMA_CONTROL_VERSION_CURRENT, 2347 .flags = 0, 2348 }; 2349 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2350 .retry_count = 5, 2351 .private_data = &cap, 2352 .private_data_len = sizeof(cap), 2353 }; 2354 struct rdma_cm_event *cm_event; 2355 int ret; 2356 2357 /* 2358 * Only negotiate the capability with destination if the user 2359 * on the source first requested the capability. 2360 */ 2361 if (rdma->pin_all) { 2362 trace_qemu_rdma_connect_pin_all_requested(); 2363 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2364 } 2365 2366 caps_to_network(&cap); 2367 2368 ret = rdma_connect(rdma->cm_id, &conn_param); 2369 if (ret) { 2370 perror("rdma_connect"); 2371 ERROR(errp, "connecting to destination!"); 2372 goto err_rdma_source_connect; 2373 } 2374 2375 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2376 if (ret) { 2377 perror("rdma_get_cm_event after rdma_connect"); 2378 ERROR(errp, "connecting to destination!"); 2379 rdma_ack_cm_event(cm_event); 2380 goto err_rdma_source_connect; 2381 } 2382 2383 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2384 perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect"); 2385 ERROR(errp, "connecting to destination!"); 2386 rdma_ack_cm_event(cm_event); 2387 goto err_rdma_source_connect; 2388 } 2389 rdma->connected = true; 2390 2391 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2392 network_to_caps(&cap); 2393 2394 /* 2395 * Verify that the *requested* capabilities are supported by the destination 2396 * and disable them otherwise. 2397 */ 2398 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2399 ERROR(errp, "Server cannot support pinning all memory. " 2400 "Will register memory dynamically."); 2401 rdma->pin_all = false; 2402 } 2403 2404 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2405 2406 rdma_ack_cm_event(cm_event); 2407 2408 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2409 if (ret) { 2410 ERROR(errp, "posting second control recv!"); 2411 goto err_rdma_source_connect; 2412 } 2413 2414 rdma->control_ready_expected = 1; 2415 rdma->nb_sent = 0; 2416 return 0; 2417 2418 err_rdma_source_connect: 2419 qemu_rdma_cleanup(rdma); 2420 return -1; 2421 } 2422 2423 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2424 { 2425 int ret, idx; 2426 struct rdma_cm_id *listen_id; 2427 char ip[40] = "unknown"; 2428 struct rdma_addrinfo *res, *e; 2429 char port_str[16]; 2430 2431 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2432 rdma->wr_data[idx].control_len = 0; 2433 rdma->wr_data[idx].control_curr = NULL; 2434 } 2435 2436 if (!rdma->host || !rdma->host[0]) { 2437 ERROR(errp, "RDMA host is not set!"); 2438 rdma->error_state = -EINVAL; 2439 return -1; 2440 } 2441 /* create CM channel */ 2442 rdma->channel = rdma_create_event_channel(); 2443 if (!rdma->channel) { 2444 ERROR(errp, "could not create rdma event channel"); 2445 rdma->error_state = -EINVAL; 2446 return -1; 2447 } 2448 2449 /* create CM id */ 2450 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2451 if (ret) { 2452 ERROR(errp, "could not create cm_id!"); 2453 goto err_dest_init_create_listen_id; 2454 } 2455 2456 snprintf(port_str, 16, "%d", rdma->port); 2457 port_str[15] = '\0'; 2458 2459 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2460 if (ret < 0) { 2461 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 2462 goto err_dest_init_bind_addr; 2463 } 2464 2465 for (e = res; e != NULL; e = e->ai_next) { 2466 inet_ntop(e->ai_family, 2467 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2468 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2469 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2470 if (ret) { 2471 continue; 2472 } 2473 if (e->ai_family == AF_INET6) { 2474 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp); 2475 if (ret) { 2476 continue; 2477 } 2478 } 2479 break; 2480 } 2481 2482 if (!e) { 2483 ERROR(errp, "Error: could not rdma_bind_addr!"); 2484 goto err_dest_init_bind_addr; 2485 } 2486 2487 rdma->listen_id = listen_id; 2488 qemu_rdma_dump_gid("dest_init", listen_id); 2489 return 0; 2490 2491 err_dest_init_bind_addr: 2492 rdma_destroy_id(listen_id); 2493 err_dest_init_create_listen_id: 2494 rdma_destroy_event_channel(rdma->channel); 2495 rdma->channel = NULL; 2496 rdma->error_state = ret; 2497 return ret; 2498 2499 } 2500 2501 static void *qemu_rdma_data_init(const char *host_port, Error **errp) 2502 { 2503 RDMAContext *rdma = NULL; 2504 InetSocketAddress *addr; 2505 2506 if (host_port) { 2507 rdma = g_new0(RDMAContext, 1); 2508 rdma->current_index = -1; 2509 rdma->current_chunk = -1; 2510 2511 addr = g_new(InetSocketAddress, 1); 2512 if (!inet_parse(addr, host_port, NULL)) { 2513 rdma->port = atoi(addr->port); 2514 rdma->host = g_strdup(addr->host); 2515 } else { 2516 ERROR(errp, "bad RDMA migration address '%s'", host_port); 2517 g_free(rdma); 2518 rdma = NULL; 2519 } 2520 2521 qapi_free_InetSocketAddress(addr); 2522 } 2523 2524 return rdma; 2525 } 2526 2527 /* 2528 * QEMUFile interface to the control channel. 2529 * SEND messages for control only. 2530 * VM's ram is handled with regular RDMA messages. 2531 */ 2532 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, 2533 const struct iovec *iov, 2534 size_t niov, 2535 int *fds, 2536 size_t nfds, 2537 Error **errp) 2538 { 2539 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2540 QEMUFile *f = rioc->file; 2541 RDMAContext *rdma = rioc->rdma; 2542 int ret; 2543 ssize_t done = 0; 2544 size_t i; 2545 2546 CHECK_ERROR_STATE(); 2547 2548 /* 2549 * Push out any writes that 2550 * we're queued up for VM's ram. 2551 */ 2552 ret = qemu_rdma_write_flush(f, rdma); 2553 if (ret < 0) { 2554 rdma->error_state = ret; 2555 return ret; 2556 } 2557 2558 for (i = 0; i < niov; i++) { 2559 size_t remaining = iov[i].iov_len; 2560 uint8_t * data = (void *)iov[i].iov_base; 2561 while (remaining) { 2562 RDMAControlHeader head; 2563 2564 rioc->len = MIN(remaining, RDMA_SEND_INCREMENT); 2565 remaining -= rioc->len; 2566 2567 head.len = rioc->len; 2568 head.type = RDMA_CONTROL_QEMU_FILE; 2569 2570 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); 2571 2572 if (ret < 0) { 2573 rdma->error_state = ret; 2574 return ret; 2575 } 2576 2577 data += rioc->len; 2578 done += rioc->len; 2579 } 2580 } 2581 2582 return done; 2583 } 2584 2585 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2586 size_t size, int idx) 2587 { 2588 size_t len = 0; 2589 2590 if (rdma->wr_data[idx].control_len) { 2591 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2592 2593 len = MIN(size, rdma->wr_data[idx].control_len); 2594 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2595 rdma->wr_data[idx].control_curr += len; 2596 rdma->wr_data[idx].control_len -= len; 2597 } 2598 2599 return len; 2600 } 2601 2602 /* 2603 * QEMUFile interface to the control channel. 2604 * RDMA links don't use bytestreams, so we have to 2605 * return bytes to QEMUFile opportunistically. 2606 */ 2607 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, 2608 const struct iovec *iov, 2609 size_t niov, 2610 int **fds, 2611 size_t *nfds, 2612 Error **errp) 2613 { 2614 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2615 RDMAContext *rdma = rioc->rdma; 2616 RDMAControlHeader head; 2617 int ret = 0; 2618 ssize_t i; 2619 size_t done = 0; 2620 2621 CHECK_ERROR_STATE(); 2622 2623 for (i = 0; i < niov; i++) { 2624 size_t want = iov[i].iov_len; 2625 uint8_t *data = (void *)iov[i].iov_base; 2626 2627 /* 2628 * First, we hold on to the last SEND message we 2629 * were given and dish out the bytes until we run 2630 * out of bytes. 2631 */ 2632 ret = qemu_rdma_fill(rioc->rdma, data, want, 0); 2633 done += ret; 2634 want -= ret; 2635 /* Got what we needed, so go to next iovec */ 2636 if (want == 0) { 2637 continue; 2638 } 2639 2640 /* If we got any data so far, then don't wait 2641 * for more, just return what we have */ 2642 if (done > 0) { 2643 break; 2644 } 2645 2646 2647 /* We've got nothing at all, so lets wait for 2648 * more to arrive 2649 */ 2650 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); 2651 2652 if (ret < 0) { 2653 rdma->error_state = ret; 2654 return ret; 2655 } 2656 2657 /* 2658 * SEND was received with new bytes, now try again. 2659 */ 2660 ret = qemu_rdma_fill(rioc->rdma, data, want, 0); 2661 done += ret; 2662 want -= ret; 2663 2664 /* Still didn't get enough, so lets just return */ 2665 if (want) { 2666 if (done == 0) { 2667 return QIO_CHANNEL_ERR_BLOCK; 2668 } else { 2669 break; 2670 } 2671 } 2672 } 2673 rioc->len = done; 2674 return rioc->len; 2675 } 2676 2677 /* 2678 * Block until all the outstanding chunks have been delivered by the hardware. 2679 */ 2680 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) 2681 { 2682 int ret; 2683 2684 if (qemu_rdma_write_flush(f, rdma) < 0) { 2685 return -EIO; 2686 } 2687 2688 while (rdma->nb_sent) { 2689 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2690 if (ret < 0) { 2691 error_report("rdma migration: complete polling error!"); 2692 return -EIO; 2693 } 2694 } 2695 2696 qemu_rdma_unregister_waiting(rdma); 2697 2698 return 0; 2699 } 2700 2701 2702 static int qio_channel_rdma_set_blocking(QIOChannel *ioc, 2703 bool blocking, 2704 Error **errp) 2705 { 2706 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2707 /* XXX we should make readv/writev actually honour this :-) */ 2708 rioc->blocking = blocking; 2709 return 0; 2710 } 2711 2712 2713 typedef struct QIOChannelRDMASource QIOChannelRDMASource; 2714 struct QIOChannelRDMASource { 2715 GSource parent; 2716 QIOChannelRDMA *rioc; 2717 GIOCondition condition; 2718 }; 2719 2720 static gboolean 2721 qio_channel_rdma_source_prepare(GSource *source, 2722 gint *timeout) 2723 { 2724 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2725 RDMAContext *rdma = rsource->rioc->rdma; 2726 GIOCondition cond = 0; 2727 *timeout = -1; 2728 2729 if (rdma->wr_data[0].control_len) { 2730 cond |= G_IO_IN; 2731 } 2732 cond |= G_IO_OUT; 2733 2734 return cond & rsource->condition; 2735 } 2736 2737 static gboolean 2738 qio_channel_rdma_source_check(GSource *source) 2739 { 2740 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2741 RDMAContext *rdma = rsource->rioc->rdma; 2742 GIOCondition cond = 0; 2743 2744 if (rdma->wr_data[0].control_len) { 2745 cond |= G_IO_IN; 2746 } 2747 cond |= G_IO_OUT; 2748 2749 return cond & rsource->condition; 2750 } 2751 2752 static gboolean 2753 qio_channel_rdma_source_dispatch(GSource *source, 2754 GSourceFunc callback, 2755 gpointer user_data) 2756 { 2757 QIOChannelFunc func = (QIOChannelFunc)callback; 2758 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2759 RDMAContext *rdma = rsource->rioc->rdma; 2760 GIOCondition cond = 0; 2761 2762 if (rdma->wr_data[0].control_len) { 2763 cond |= G_IO_IN; 2764 } 2765 cond |= G_IO_OUT; 2766 2767 return (*func)(QIO_CHANNEL(rsource->rioc), 2768 (cond & rsource->condition), 2769 user_data); 2770 } 2771 2772 static void 2773 qio_channel_rdma_source_finalize(GSource *source) 2774 { 2775 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source; 2776 2777 object_unref(OBJECT(ssource->rioc)); 2778 } 2779 2780 GSourceFuncs qio_channel_rdma_source_funcs = { 2781 qio_channel_rdma_source_prepare, 2782 qio_channel_rdma_source_check, 2783 qio_channel_rdma_source_dispatch, 2784 qio_channel_rdma_source_finalize 2785 }; 2786 2787 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, 2788 GIOCondition condition) 2789 { 2790 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2791 QIOChannelRDMASource *ssource; 2792 GSource *source; 2793 2794 source = g_source_new(&qio_channel_rdma_source_funcs, 2795 sizeof(QIOChannelRDMASource)); 2796 ssource = (QIOChannelRDMASource *)source; 2797 2798 ssource->rioc = rioc; 2799 object_ref(OBJECT(rioc)); 2800 2801 ssource->condition = condition; 2802 2803 return source; 2804 } 2805 2806 2807 static int qio_channel_rdma_close(QIOChannel *ioc, 2808 Error **errp) 2809 { 2810 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2811 trace_qemu_rdma_close(); 2812 if (rioc->rdma) { 2813 if (!rioc->rdma->error_state) { 2814 rioc->rdma->error_state = qemu_file_get_error(rioc->file); 2815 } 2816 qemu_rdma_cleanup(rioc->rdma); 2817 g_free(rioc->rdma); 2818 rioc->rdma = NULL; 2819 } 2820 return 0; 2821 } 2822 2823 /* 2824 * Parameters: 2825 * @offset == 0 : 2826 * This means that 'block_offset' is a full virtual address that does not 2827 * belong to a RAMBlock of the virtual machine and instead 2828 * represents a private malloc'd memory area that the caller wishes to 2829 * transfer. 2830 * 2831 * @offset != 0 : 2832 * Offset is an offset to be added to block_offset and used 2833 * to also lookup the corresponding RAMBlock. 2834 * 2835 * @size > 0 : 2836 * Initiate an transfer this size. 2837 * 2838 * @size == 0 : 2839 * A 'hint' or 'advice' that means that we wish to speculatively 2840 * and asynchronously unregister this memory. In this case, there is no 2841 * guarantee that the unregister will actually happen, for example, 2842 * if the memory is being actively transmitted. Additionally, the memory 2843 * may be re-registered at any future time if a write within the same 2844 * chunk was requested again, even if you attempted to unregister it 2845 * here. 2846 * 2847 * @size < 0 : TODO, not yet supported 2848 * Unregister the memory NOW. This means that the caller does not 2849 * expect there to be any future RDMA transfers and we just want to clean 2850 * things up. This is used in case the upper layer owns the memory and 2851 * cannot wait for qemu_fclose() to occur. 2852 * 2853 * @bytes_sent : User-specificed pointer to indicate how many bytes were 2854 * sent. Usually, this will not be more than a few bytes of 2855 * the protocol because most transfers are sent asynchronously. 2856 */ 2857 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, 2858 ram_addr_t block_offset, ram_addr_t offset, 2859 size_t size, uint64_t *bytes_sent) 2860 { 2861 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 2862 RDMAContext *rdma = rioc->rdma; 2863 int ret; 2864 2865 CHECK_ERROR_STATE(); 2866 2867 qemu_fflush(f); 2868 2869 if (size > 0) { 2870 /* 2871 * Add this page to the current 'chunk'. If the chunk 2872 * is full, or the page doen't belong to the current chunk, 2873 * an actual RDMA write will occur and a new chunk will be formed. 2874 */ 2875 ret = qemu_rdma_write(f, rdma, block_offset, offset, size); 2876 if (ret < 0) { 2877 error_report("rdma migration: write error! %d", ret); 2878 goto err; 2879 } 2880 2881 /* 2882 * We always return 1 bytes because the RDMA 2883 * protocol is completely asynchronous. We do not yet know 2884 * whether an identified chunk is zero or not because we're 2885 * waiting for other pages to potentially be merged with 2886 * the current chunk. So, we have to call qemu_update_position() 2887 * later on when the actual write occurs. 2888 */ 2889 if (bytes_sent) { 2890 *bytes_sent = 1; 2891 } 2892 } else { 2893 uint64_t index, chunk; 2894 2895 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long 2896 if (size < 0) { 2897 ret = qemu_rdma_drain_cq(f, rdma); 2898 if (ret < 0) { 2899 fprintf(stderr, "rdma: failed to synchronously drain" 2900 " completion queue before unregistration.\n"); 2901 goto err; 2902 } 2903 } 2904 */ 2905 2906 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2907 offset, size, &index, &chunk); 2908 2909 if (ret) { 2910 error_report("ram block search failed"); 2911 goto err; 2912 } 2913 2914 qemu_rdma_signal_unregister(rdma, index, chunk, 0); 2915 2916 /* 2917 * TODO: Synchronous, guaranteed unregistration (should not occur during 2918 * fast-path). Otherwise, unregisters will process on the next call to 2919 * qemu_rdma_drain_cq() 2920 if (size < 0) { 2921 qemu_rdma_unregister_waiting(rdma); 2922 } 2923 */ 2924 } 2925 2926 /* 2927 * Drain the Completion Queue if possible, but do not block, 2928 * just poll. 2929 * 2930 * If nothing to poll, the end of the iteration will do this 2931 * again to make sure we don't overflow the request queue. 2932 */ 2933 while (1) { 2934 uint64_t wr_id, wr_id_in; 2935 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL); 2936 if (ret < 0) { 2937 error_report("rdma migration: polling error! %d", ret); 2938 goto err; 2939 } 2940 2941 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 2942 2943 if (wr_id == RDMA_WRID_NONE) { 2944 break; 2945 } 2946 } 2947 2948 return RAM_SAVE_CONTROL_DELAYED; 2949 err: 2950 rdma->error_state = ret; 2951 return ret; 2952 } 2953 2954 static int qemu_rdma_accept(RDMAContext *rdma) 2955 { 2956 RDMACapabilities cap; 2957 struct rdma_conn_param conn_param = { 2958 .responder_resources = 2, 2959 .private_data = &cap, 2960 .private_data_len = sizeof(cap), 2961 }; 2962 struct rdma_cm_event *cm_event; 2963 struct ibv_context *verbs; 2964 int ret = -EINVAL; 2965 int idx; 2966 2967 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2968 if (ret) { 2969 goto err_rdma_dest_wait; 2970 } 2971 2972 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 2973 rdma_ack_cm_event(cm_event); 2974 goto err_rdma_dest_wait; 2975 } 2976 2977 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2978 2979 network_to_caps(&cap); 2980 2981 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 2982 error_report("Unknown source RDMA version: %d, bailing...", 2983 cap.version); 2984 rdma_ack_cm_event(cm_event); 2985 goto err_rdma_dest_wait; 2986 } 2987 2988 /* 2989 * Respond with only the capabilities this version of QEMU knows about. 2990 */ 2991 cap.flags &= known_capabilities; 2992 2993 /* 2994 * Enable the ones that we do know about. 2995 * Add other checks here as new ones are introduced. 2996 */ 2997 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 2998 rdma->pin_all = true; 2999 } 3000 3001 rdma->cm_id = cm_event->id; 3002 verbs = cm_event->id->verbs; 3003 3004 rdma_ack_cm_event(cm_event); 3005 3006 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 3007 3008 caps_to_network(&cap); 3009 3010 trace_qemu_rdma_accept_pin_verbsc(verbs); 3011 3012 if (!rdma->verbs) { 3013 rdma->verbs = verbs; 3014 } else if (rdma->verbs != verbs) { 3015 error_report("ibv context not matching %p, %p!", rdma->verbs, 3016 verbs); 3017 goto err_rdma_dest_wait; 3018 } 3019 3020 qemu_rdma_dump_id("dest_init", verbs); 3021 3022 ret = qemu_rdma_alloc_pd_cq(rdma); 3023 if (ret) { 3024 error_report("rdma migration: error allocating pd and cq!"); 3025 goto err_rdma_dest_wait; 3026 } 3027 3028 ret = qemu_rdma_alloc_qp(rdma); 3029 if (ret) { 3030 error_report("rdma migration: error allocating qp!"); 3031 goto err_rdma_dest_wait; 3032 } 3033 3034 ret = qemu_rdma_init_ram_blocks(rdma); 3035 if (ret) { 3036 error_report("rdma migration: error initializing ram blocks!"); 3037 goto err_rdma_dest_wait; 3038 } 3039 3040 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 3041 ret = qemu_rdma_reg_control(rdma, idx); 3042 if (ret) { 3043 error_report("rdma: error registering %d control", idx); 3044 goto err_rdma_dest_wait; 3045 } 3046 } 3047 3048 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 3049 3050 ret = rdma_accept(rdma->cm_id, &conn_param); 3051 if (ret) { 3052 error_report("rdma_accept returns %d", ret); 3053 goto err_rdma_dest_wait; 3054 } 3055 3056 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3057 if (ret) { 3058 error_report("rdma_accept get_cm_event failed %d", ret); 3059 goto err_rdma_dest_wait; 3060 } 3061 3062 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 3063 error_report("rdma_accept not event established"); 3064 rdma_ack_cm_event(cm_event); 3065 goto err_rdma_dest_wait; 3066 } 3067 3068 rdma_ack_cm_event(cm_event); 3069 rdma->connected = true; 3070 3071 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 3072 if (ret) { 3073 error_report("rdma migration: error posting second control recv"); 3074 goto err_rdma_dest_wait; 3075 } 3076 3077 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 3078 3079 return 0; 3080 3081 err_rdma_dest_wait: 3082 rdma->error_state = ret; 3083 qemu_rdma_cleanup(rdma); 3084 return ret; 3085 } 3086 3087 static int dest_ram_sort_func(const void *a, const void *b) 3088 { 3089 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index; 3090 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index; 3091 3092 return (a_index < b_index) ? -1 : (a_index != b_index); 3093 } 3094 3095 /* 3096 * During each iteration of the migration, we listen for instructions 3097 * by the source VM to perform dynamic page registrations before they 3098 * can perform RDMA operations. 3099 * 3100 * We respond with the 'rkey'. 3101 * 3102 * Keep doing this until the source tells us to stop. 3103 */ 3104 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) 3105 { 3106 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 3107 .type = RDMA_CONTROL_REGISTER_RESULT, 3108 .repeat = 0, 3109 }; 3110 RDMAControlHeader unreg_resp = { .len = 0, 3111 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 3112 .repeat = 0, 3113 }; 3114 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 3115 .repeat = 1 }; 3116 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3117 RDMAContext *rdma = rioc->rdma; 3118 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3119 RDMAControlHeader head; 3120 RDMARegister *reg, *registers; 3121 RDMACompress *comp; 3122 RDMARegisterResult *reg_result; 3123 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 3124 RDMALocalBlock *block; 3125 void *host_addr; 3126 int ret = 0; 3127 int idx = 0; 3128 int count = 0; 3129 int i = 0; 3130 3131 CHECK_ERROR_STATE(); 3132 3133 do { 3134 trace_qemu_rdma_registration_handle_wait(); 3135 3136 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE); 3137 3138 if (ret < 0) { 3139 break; 3140 } 3141 3142 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 3143 error_report("rdma: Too many requests in this message (%d)." 3144 "Bailing.", head.repeat); 3145 ret = -EIO; 3146 break; 3147 } 3148 3149 switch (head.type) { 3150 case RDMA_CONTROL_COMPRESS: 3151 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 3152 network_to_compress(comp); 3153 3154 trace_qemu_rdma_registration_handle_compress(comp->length, 3155 comp->block_idx, 3156 comp->offset); 3157 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) { 3158 error_report("rdma: 'compress' bad block index %u (vs %d)", 3159 (unsigned int)comp->block_idx, 3160 rdma->local_ram_blocks.nb_blocks); 3161 ret = -EIO; 3162 goto out; 3163 } 3164 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 3165 3166 host_addr = block->local_host_addr + 3167 (comp->offset - block->offset); 3168 3169 ram_handle_compressed(host_addr, comp->value, comp->length); 3170 break; 3171 3172 case RDMA_CONTROL_REGISTER_FINISHED: 3173 trace_qemu_rdma_registration_handle_finished(); 3174 goto out; 3175 3176 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 3177 trace_qemu_rdma_registration_handle_ram_blocks(); 3178 3179 /* Sort our local RAM Block list so it's the same as the source, 3180 * we can do this since we've filled in a src_index in the list 3181 * as we received the RAMBlock list earlier. 3182 */ 3183 qsort(rdma->local_ram_blocks.block, 3184 rdma->local_ram_blocks.nb_blocks, 3185 sizeof(RDMALocalBlock), dest_ram_sort_func); 3186 if (rdma->pin_all) { 3187 ret = qemu_rdma_reg_whole_ram_blocks(rdma); 3188 if (ret) { 3189 error_report("rdma migration: error dest " 3190 "registering ram blocks"); 3191 goto out; 3192 } 3193 } 3194 3195 /* 3196 * Dest uses this to prepare to transmit the RAMBlock descriptions 3197 * to the source VM after connection setup. 3198 * Both sides use the "remote" structure to communicate and update 3199 * their "local" descriptions with what was sent. 3200 */ 3201 for (i = 0; i < local->nb_blocks; i++) { 3202 rdma->dest_blocks[i].remote_host_addr = 3203 (uintptr_t)(local->block[i].local_host_addr); 3204 3205 if (rdma->pin_all) { 3206 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 3207 } 3208 3209 rdma->dest_blocks[i].offset = local->block[i].offset; 3210 rdma->dest_blocks[i].length = local->block[i].length; 3211 3212 dest_block_to_network(&rdma->dest_blocks[i]); 3213 trace_qemu_rdma_registration_handle_ram_blocks_loop( 3214 local->block[i].block_name, 3215 local->block[i].offset, 3216 local->block[i].length, 3217 local->block[i].local_host_addr, 3218 local->block[i].src_index); 3219 } 3220 3221 blocks.len = rdma->local_ram_blocks.nb_blocks 3222 * sizeof(RDMADestBlock); 3223 3224 3225 ret = qemu_rdma_post_send_control(rdma, 3226 (uint8_t *) rdma->dest_blocks, &blocks); 3227 3228 if (ret < 0) { 3229 error_report("rdma migration: error sending remote info"); 3230 goto out; 3231 } 3232 3233 break; 3234 case RDMA_CONTROL_REGISTER_REQUEST: 3235 trace_qemu_rdma_registration_handle_register(head.repeat); 3236 3237 reg_resp.repeat = head.repeat; 3238 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3239 3240 for (count = 0; count < head.repeat; count++) { 3241 uint64_t chunk; 3242 uint8_t *chunk_start, *chunk_end; 3243 3244 reg = ®isters[count]; 3245 network_to_register(reg); 3246 3247 reg_result = &results[count]; 3248 3249 trace_qemu_rdma_registration_handle_register_loop(count, 3250 reg->current_index, reg->key.current_addr, reg->chunks); 3251 3252 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) { 3253 error_report("rdma: 'register' bad block index %u (vs %d)", 3254 (unsigned int)reg->current_index, 3255 rdma->local_ram_blocks.nb_blocks); 3256 ret = -ENOENT; 3257 goto out; 3258 } 3259 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3260 if (block->is_ram_block) { 3261 if (block->offset > reg->key.current_addr) { 3262 error_report("rdma: bad register address for block %s" 3263 " offset: %" PRIx64 " current_addr: %" PRIx64, 3264 block->block_name, block->offset, 3265 reg->key.current_addr); 3266 ret = -ERANGE; 3267 goto out; 3268 } 3269 host_addr = (block->local_host_addr + 3270 (reg->key.current_addr - block->offset)); 3271 chunk = ram_chunk_index(block->local_host_addr, 3272 (uint8_t *) host_addr); 3273 } else { 3274 chunk = reg->key.chunk; 3275 host_addr = block->local_host_addr + 3276 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3277 /* Check for particularly bad chunk value */ 3278 if (host_addr < (void *)block->local_host_addr) { 3279 error_report("rdma: bad chunk for block %s" 3280 " chunk: %" PRIx64, 3281 block->block_name, reg->key.chunk); 3282 ret = -ERANGE; 3283 goto out; 3284 } 3285 } 3286 chunk_start = ram_chunk_start(block, chunk); 3287 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3288 if (qemu_rdma_register_and_get_keys(rdma, block, 3289 (uintptr_t)host_addr, NULL, ®_result->rkey, 3290 chunk, chunk_start, chunk_end)) { 3291 error_report("cannot get rkey"); 3292 ret = -EINVAL; 3293 goto out; 3294 } 3295 3296 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3297 3298 trace_qemu_rdma_registration_handle_register_rkey( 3299 reg_result->rkey); 3300 3301 result_to_network(reg_result); 3302 } 3303 3304 ret = qemu_rdma_post_send_control(rdma, 3305 (uint8_t *) results, ®_resp); 3306 3307 if (ret < 0) { 3308 error_report("Failed to send control buffer"); 3309 goto out; 3310 } 3311 break; 3312 case RDMA_CONTROL_UNREGISTER_REQUEST: 3313 trace_qemu_rdma_registration_handle_unregister(head.repeat); 3314 unreg_resp.repeat = head.repeat; 3315 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3316 3317 for (count = 0; count < head.repeat; count++) { 3318 reg = ®isters[count]; 3319 network_to_register(reg); 3320 3321 trace_qemu_rdma_registration_handle_unregister_loop(count, 3322 reg->current_index, reg->key.chunk); 3323 3324 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3325 3326 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3327 block->pmr[reg->key.chunk] = NULL; 3328 3329 if (ret != 0) { 3330 perror("rdma unregistration chunk failed"); 3331 ret = -ret; 3332 goto out; 3333 } 3334 3335 rdma->total_registrations--; 3336 3337 trace_qemu_rdma_registration_handle_unregister_success( 3338 reg->key.chunk); 3339 } 3340 3341 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp); 3342 3343 if (ret < 0) { 3344 error_report("Failed to send control buffer"); 3345 goto out; 3346 } 3347 break; 3348 case RDMA_CONTROL_REGISTER_RESULT: 3349 error_report("Invalid RESULT message at dest."); 3350 ret = -EIO; 3351 goto out; 3352 default: 3353 error_report("Unknown control message %s", control_desc[head.type]); 3354 ret = -EIO; 3355 goto out; 3356 } 3357 } while (1); 3358 out: 3359 if (ret < 0) { 3360 rdma->error_state = ret; 3361 } 3362 return ret; 3363 } 3364 3365 /* Destination: 3366 * Called via a ram_control_load_hook during the initial RAM load section which 3367 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks 3368 * on the source. 3369 * We've already built our local RAMBlock list, but not yet sent the list to 3370 * the source. 3371 */ 3372 static int 3373 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) 3374 { 3375 RDMAContext *rdma = rioc->rdma; 3376 int curr; 3377 int found = -1; 3378 3379 /* Find the matching RAMBlock in our local list */ 3380 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { 3381 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { 3382 found = curr; 3383 break; 3384 } 3385 } 3386 3387 if (found == -1) { 3388 error_report("RAMBlock '%s' not found on destination", name); 3389 return -ENOENT; 3390 } 3391 3392 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index; 3393 trace_rdma_block_notification_handle(name, rdma->next_src_index); 3394 rdma->next_src_index++; 3395 3396 return 0; 3397 } 3398 3399 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) 3400 { 3401 switch (flags) { 3402 case RAM_CONTROL_BLOCK_REG: 3403 return rdma_block_notification_handle(opaque, data); 3404 3405 case RAM_CONTROL_HOOK: 3406 return qemu_rdma_registration_handle(f, opaque); 3407 3408 default: 3409 /* Shouldn't be called with any other values */ 3410 abort(); 3411 } 3412 } 3413 3414 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, 3415 uint64_t flags, void *data) 3416 { 3417 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3418 RDMAContext *rdma = rioc->rdma; 3419 3420 CHECK_ERROR_STATE(); 3421 3422 trace_qemu_rdma_registration_start(flags); 3423 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3424 qemu_fflush(f); 3425 3426 return 0; 3427 } 3428 3429 /* 3430 * Inform dest that dynamic registrations are done for now. 3431 * First, flush writes, if any. 3432 */ 3433 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, 3434 uint64_t flags, void *data) 3435 { 3436 Error *local_err = NULL, **errp = &local_err; 3437 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3438 RDMAContext *rdma = rioc->rdma; 3439 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3440 int ret = 0; 3441 3442 CHECK_ERROR_STATE(); 3443 3444 qemu_fflush(f); 3445 ret = qemu_rdma_drain_cq(f, rdma); 3446 3447 if (ret < 0) { 3448 goto err; 3449 } 3450 3451 if (flags == RAM_CONTROL_SETUP) { 3452 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3453 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3454 int reg_result_idx, i, nb_dest_blocks; 3455 3456 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3457 trace_qemu_rdma_registration_stop_ram(); 3458 3459 /* 3460 * Make sure that we parallelize the pinning on both sides. 3461 * For very large guests, doing this serially takes a really 3462 * long time, so we have to 'interleave' the pinning locally 3463 * with the control messages by performing the pinning on this 3464 * side before we receive the control response from the other 3465 * side that the pinning has completed. 3466 */ 3467 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3468 ®_result_idx, rdma->pin_all ? 3469 qemu_rdma_reg_whole_ram_blocks : NULL); 3470 if (ret < 0) { 3471 ERROR(errp, "receiving remote info!"); 3472 return ret; 3473 } 3474 3475 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3476 3477 /* 3478 * The protocol uses two different sets of rkeys (mutually exclusive): 3479 * 1. One key to represent the virtual address of the entire ram block. 3480 * (dynamic chunk registration disabled - pin everything with one rkey.) 3481 * 2. One to represent individual chunks within a ram block. 3482 * (dynamic chunk registration enabled - pin individual chunks.) 3483 * 3484 * Once the capability is successfully negotiated, the destination transmits 3485 * the keys to use (or sends them later) including the virtual addresses 3486 * and then propagates the remote ram block descriptions to his local copy. 3487 */ 3488 3489 if (local->nb_blocks != nb_dest_blocks) { 3490 ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) " 3491 "Your QEMU command line parameters are probably " 3492 "not identical on both the source and destination.", 3493 local->nb_blocks, nb_dest_blocks); 3494 rdma->error_state = -EINVAL; 3495 return -EINVAL; 3496 } 3497 3498 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 3499 memcpy(rdma->dest_blocks, 3500 rdma->wr_data[reg_result_idx].control_curr, resp.len); 3501 for (i = 0; i < nb_dest_blocks; i++) { 3502 network_to_dest_block(&rdma->dest_blocks[i]); 3503 3504 /* We require that the blocks are in the same order */ 3505 if (rdma->dest_blocks[i].length != local->block[i].length) { 3506 ERROR(errp, "Block %s/%d has a different length %" PRIu64 3507 "vs %" PRIu64, local->block[i].block_name, i, 3508 local->block[i].length, 3509 rdma->dest_blocks[i].length); 3510 rdma->error_state = -EINVAL; 3511 return -EINVAL; 3512 } 3513 local->block[i].remote_host_addr = 3514 rdma->dest_blocks[i].remote_host_addr; 3515 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; 3516 } 3517 } 3518 3519 trace_qemu_rdma_registration_stop(flags); 3520 3521 head.type = RDMA_CONTROL_REGISTER_FINISHED; 3522 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); 3523 3524 if (ret < 0) { 3525 goto err; 3526 } 3527 3528 return 0; 3529 err: 3530 rdma->error_state = ret; 3531 return ret; 3532 } 3533 3534 static const QEMUFileHooks rdma_read_hooks = { 3535 .hook_ram_load = rdma_load_hook, 3536 }; 3537 3538 static const QEMUFileHooks rdma_write_hooks = { 3539 .before_ram_iterate = qemu_rdma_registration_start, 3540 .after_ram_iterate = qemu_rdma_registration_stop, 3541 .save_page = qemu_rdma_save_page, 3542 }; 3543 3544 3545 static void qio_channel_rdma_finalize(Object *obj) 3546 { 3547 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); 3548 if (rioc->rdma) { 3549 qemu_rdma_cleanup(rioc->rdma); 3550 g_free(rioc->rdma); 3551 rioc->rdma = NULL; 3552 } 3553 } 3554 3555 static void qio_channel_rdma_class_init(ObjectClass *klass, 3556 void *class_data G_GNUC_UNUSED) 3557 { 3558 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 3559 3560 ioc_klass->io_writev = qio_channel_rdma_writev; 3561 ioc_klass->io_readv = qio_channel_rdma_readv; 3562 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; 3563 ioc_klass->io_close = qio_channel_rdma_close; 3564 ioc_klass->io_create_watch = qio_channel_rdma_create_watch; 3565 } 3566 3567 static const TypeInfo qio_channel_rdma_info = { 3568 .parent = TYPE_QIO_CHANNEL, 3569 .name = TYPE_QIO_CHANNEL_RDMA, 3570 .instance_size = sizeof(QIOChannelRDMA), 3571 .instance_finalize = qio_channel_rdma_finalize, 3572 .class_init = qio_channel_rdma_class_init, 3573 }; 3574 3575 static void qio_channel_rdma_register_types(void) 3576 { 3577 type_register_static(&qio_channel_rdma_info); 3578 } 3579 3580 type_init(qio_channel_rdma_register_types); 3581 3582 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) 3583 { 3584 QIOChannelRDMA *rioc; 3585 3586 if (qemu_file_mode_is_not_valid(mode)) { 3587 return NULL; 3588 } 3589 3590 rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); 3591 rioc->rdma = rdma; 3592 3593 if (mode[0] == 'w') { 3594 rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); 3595 qemu_file_set_hooks(rioc->file, &rdma_write_hooks); 3596 } else { 3597 rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc)); 3598 qemu_file_set_hooks(rioc->file, &rdma_read_hooks); 3599 } 3600 3601 return rioc->file; 3602 } 3603 3604 static void rdma_accept_incoming_migration(void *opaque) 3605 { 3606 RDMAContext *rdma = opaque; 3607 int ret; 3608 QEMUFile *f; 3609 Error *local_err = NULL, **errp = &local_err; 3610 3611 trace_qemu_rdma_accept_incoming_migration(); 3612 ret = qemu_rdma_accept(rdma); 3613 3614 if (ret) { 3615 ERROR(errp, "RDMA Migration initialization failed!"); 3616 return; 3617 } 3618 3619 trace_qemu_rdma_accept_incoming_migration_accepted(); 3620 3621 f = qemu_fopen_rdma(rdma, "rb"); 3622 if (f == NULL) { 3623 ERROR(errp, "could not qemu_fopen_rdma!"); 3624 qemu_rdma_cleanup(rdma); 3625 return; 3626 } 3627 3628 rdma->migration_started_on_destination = 1; 3629 migration_fd_process_incoming(f); 3630 } 3631 3632 void rdma_start_incoming_migration(const char *host_port, Error **errp) 3633 { 3634 int ret; 3635 RDMAContext *rdma; 3636 Error *local_err = NULL; 3637 3638 trace_rdma_start_incoming_migration(); 3639 rdma = qemu_rdma_data_init(host_port, &local_err); 3640 3641 if (rdma == NULL) { 3642 goto err; 3643 } 3644 3645 ret = qemu_rdma_dest_init(rdma, &local_err); 3646 3647 if (ret) { 3648 goto err; 3649 } 3650 3651 trace_rdma_start_incoming_migration_after_dest_init(); 3652 3653 ret = rdma_listen(rdma->listen_id, 5); 3654 3655 if (ret) { 3656 ERROR(errp, "listening on socket!"); 3657 goto err; 3658 } 3659 3660 trace_rdma_start_incoming_migration_after_rdma_listen(); 3661 3662 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3663 NULL, (void *)(intptr_t)rdma); 3664 return; 3665 err: 3666 error_propagate(errp, local_err); 3667 g_free(rdma); 3668 } 3669 3670 void rdma_start_outgoing_migration(void *opaque, 3671 const char *host_port, Error **errp) 3672 { 3673 MigrationState *s = opaque; 3674 RDMAContext *rdma = qemu_rdma_data_init(host_port, errp); 3675 int ret = 0; 3676 3677 if (rdma == NULL) { 3678 goto err; 3679 } 3680 3681 ret = qemu_rdma_source_init(rdma, 3682 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp); 3683 3684 if (ret) { 3685 goto err; 3686 } 3687 3688 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 3689 ret = qemu_rdma_connect(rdma, errp); 3690 3691 if (ret) { 3692 goto err; 3693 } 3694 3695 trace_rdma_start_outgoing_migration_after_rdma_connect(); 3696 3697 s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); 3698 migrate_fd_connect(s); 3699 return; 3700 err: 3701 g_free(rdma); 3702 } 3703