1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * Copyright Red Hat, Inc. 2015-2016 6 * 7 * Authors: 8 * Michael R. Hines <mrhines@us.ibm.com> 9 * Jiuxing Liu <jl@us.ibm.com> 10 * Daniel P. Berrange <berrange@redhat.com> 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or 13 * later. See the COPYING file in the top-level directory. 14 * 15 */ 16 #include "qemu/osdep.h" 17 #include "qapi/error.h" 18 #include "qemu-common.h" 19 #include "qemu/cutils.h" 20 #include "migration/migration.h" 21 #include "migration/qemu-file.h" 22 #include "exec/cpu-common.h" 23 #include "qemu/error-report.h" 24 #include "qemu/main-loop.h" 25 #include "qemu/sockets.h" 26 #include "qemu/bitmap.h" 27 #include "qemu/coroutine.h" 28 #include <sys/socket.h> 29 #include <netdb.h> 30 #include <arpa/inet.h> 31 #include <rdma/rdma_cma.h> 32 #include "trace.h" 33 34 /* 35 * Print and error on both the Monitor and the Log file. 36 */ 37 #define ERROR(errp, fmt, ...) \ 38 do { \ 39 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \ 40 if (errp && (*(errp) == NULL)) { \ 41 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \ 42 } \ 43 } while (0) 44 45 #define RDMA_RESOLVE_TIMEOUT_MS 10000 46 47 /* Do not merge data if larger than this. */ 48 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 49 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 50 51 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 52 53 /* 54 * This is only for non-live state being migrated. 55 * Instead of RDMA_WRITE messages, we use RDMA_SEND 56 * messages for that state, which requires a different 57 * delivery design than main memory. 58 */ 59 #define RDMA_SEND_INCREMENT 32768 60 61 /* 62 * Maximum size infiniband SEND message 63 */ 64 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 65 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 66 67 #define RDMA_CONTROL_VERSION_CURRENT 1 68 /* 69 * Capabilities for negotiation. 70 */ 71 #define RDMA_CAPABILITY_PIN_ALL 0x01 72 73 /* 74 * Add the other flags above to this list of known capabilities 75 * as they are introduced. 76 */ 77 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 78 79 #define CHECK_ERROR_STATE() \ 80 do { \ 81 if (rdma->error_state) { \ 82 if (!rdma->error_reported) { \ 83 error_report("RDMA is in an error state waiting migration" \ 84 " to abort!"); \ 85 rdma->error_reported = 1; \ 86 } \ 87 return rdma->error_state; \ 88 } \ 89 } while (0); 90 91 /* 92 * A work request ID is 64-bits and we split up these bits 93 * into 3 parts: 94 * 95 * bits 0-15 : type of control message, 2^16 96 * bits 16-29: ram block index, 2^14 97 * bits 30-63: ram block chunk number, 2^34 98 * 99 * The last two bit ranges are only used for RDMA writes, 100 * in order to track their completion and potentially 101 * also track unregistration status of the message. 102 */ 103 #define RDMA_WRID_TYPE_SHIFT 0UL 104 #define RDMA_WRID_BLOCK_SHIFT 16UL 105 #define RDMA_WRID_CHUNK_SHIFT 30UL 106 107 #define RDMA_WRID_TYPE_MASK \ 108 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 109 110 #define RDMA_WRID_BLOCK_MASK \ 111 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 112 113 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 114 115 /* 116 * RDMA migration protocol: 117 * 1. RDMA Writes (data messages, i.e. RAM) 118 * 2. IB Send/Recv (control channel messages) 119 */ 120 enum { 121 RDMA_WRID_NONE = 0, 122 RDMA_WRID_RDMA_WRITE = 1, 123 RDMA_WRID_SEND_CONTROL = 2000, 124 RDMA_WRID_RECV_CONTROL = 4000, 125 }; 126 127 static const char *wrid_desc[] = { 128 [RDMA_WRID_NONE] = "NONE", 129 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA", 130 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND", 131 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", 132 }; 133 134 /* 135 * Work request IDs for IB SEND messages only (not RDMA writes). 136 * This is used by the migration protocol to transmit 137 * control messages (such as device state and registration commands) 138 * 139 * We could use more WRs, but we have enough for now. 140 */ 141 enum { 142 RDMA_WRID_READY = 0, 143 RDMA_WRID_DATA, 144 RDMA_WRID_CONTROL, 145 RDMA_WRID_MAX, 146 }; 147 148 /* 149 * SEND/RECV IB Control Messages. 150 */ 151 enum { 152 RDMA_CONTROL_NONE = 0, 153 RDMA_CONTROL_ERROR, 154 RDMA_CONTROL_READY, /* ready to receive */ 155 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 156 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 157 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 158 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 159 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 160 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 161 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 162 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 163 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 164 }; 165 166 static const char *control_desc[] = { 167 [RDMA_CONTROL_NONE] = "NONE", 168 [RDMA_CONTROL_ERROR] = "ERROR", 169 [RDMA_CONTROL_READY] = "READY", 170 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 171 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 172 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 173 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 174 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 175 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 176 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 177 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 178 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 179 }; 180 181 /* 182 * Memory and MR structures used to represent an IB Send/Recv work request. 183 * This is *not* used for RDMA writes, only IB Send/Recv. 184 */ 185 typedef struct { 186 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 187 struct ibv_mr *control_mr; /* registration metadata */ 188 size_t control_len; /* length of the message */ 189 uint8_t *control_curr; /* start of unconsumed bytes */ 190 } RDMAWorkRequestData; 191 192 /* 193 * Negotiate RDMA capabilities during connection-setup time. 194 */ 195 typedef struct { 196 uint32_t version; 197 uint32_t flags; 198 } RDMACapabilities; 199 200 static void caps_to_network(RDMACapabilities *cap) 201 { 202 cap->version = htonl(cap->version); 203 cap->flags = htonl(cap->flags); 204 } 205 206 static void network_to_caps(RDMACapabilities *cap) 207 { 208 cap->version = ntohl(cap->version); 209 cap->flags = ntohl(cap->flags); 210 } 211 212 /* 213 * Representation of a RAMBlock from an RDMA perspective. 214 * This is not transmitted, only local. 215 * This and subsequent structures cannot be linked lists 216 * because we're using a single IB message to transmit 217 * the information. It's small anyway, so a list is overkill. 218 */ 219 typedef struct RDMALocalBlock { 220 char *block_name; 221 uint8_t *local_host_addr; /* local virtual address */ 222 uint64_t remote_host_addr; /* remote virtual address */ 223 uint64_t offset; 224 uint64_t length; 225 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 226 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 227 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 228 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 229 int index; /* which block are we */ 230 unsigned int src_index; /* (Only used on dest) */ 231 bool is_ram_block; 232 int nb_chunks; 233 unsigned long *transit_bitmap; 234 unsigned long *unregister_bitmap; 235 } RDMALocalBlock; 236 237 /* 238 * Also represents a RAMblock, but only on the dest. 239 * This gets transmitted by the dest during connection-time 240 * to the source VM and then is used to populate the 241 * corresponding RDMALocalBlock with 242 * the information needed to perform the actual RDMA. 243 */ 244 typedef struct QEMU_PACKED RDMADestBlock { 245 uint64_t remote_host_addr; 246 uint64_t offset; 247 uint64_t length; 248 uint32_t remote_rkey; 249 uint32_t padding; 250 } RDMADestBlock; 251 252 static uint64_t htonll(uint64_t v) 253 { 254 union { uint32_t lv[2]; uint64_t llv; } u; 255 u.lv[0] = htonl(v >> 32); 256 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 257 return u.llv; 258 } 259 260 static uint64_t ntohll(uint64_t v) { 261 union { uint32_t lv[2]; uint64_t llv; } u; 262 u.llv = v; 263 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 264 } 265 266 static void dest_block_to_network(RDMADestBlock *db) 267 { 268 db->remote_host_addr = htonll(db->remote_host_addr); 269 db->offset = htonll(db->offset); 270 db->length = htonll(db->length); 271 db->remote_rkey = htonl(db->remote_rkey); 272 } 273 274 static void network_to_dest_block(RDMADestBlock *db) 275 { 276 db->remote_host_addr = ntohll(db->remote_host_addr); 277 db->offset = ntohll(db->offset); 278 db->length = ntohll(db->length); 279 db->remote_rkey = ntohl(db->remote_rkey); 280 } 281 282 /* 283 * Virtual address of the above structures used for transmitting 284 * the RAMBlock descriptions at connection-time. 285 * This structure is *not* transmitted. 286 */ 287 typedef struct RDMALocalBlocks { 288 int nb_blocks; 289 bool init; /* main memory init complete */ 290 RDMALocalBlock *block; 291 } RDMALocalBlocks; 292 293 /* 294 * Main data structure for RDMA state. 295 * While there is only one copy of this structure being allocated right now, 296 * this is the place where one would start if you wanted to consider 297 * having more than one RDMA connection open at the same time. 298 */ 299 typedef struct RDMAContext { 300 char *host; 301 int port; 302 303 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 304 305 /* 306 * This is used by *_exchange_send() to figure out whether or not 307 * the initial "READY" message has already been received or not. 308 * This is because other functions may potentially poll() and detect 309 * the READY message before send() does, in which case we need to 310 * know if it completed. 311 */ 312 int control_ready_expected; 313 314 /* number of outstanding writes */ 315 int nb_sent; 316 317 /* store info about current buffer so that we can 318 merge it with future sends */ 319 uint64_t current_addr; 320 uint64_t current_length; 321 /* index of ram block the current buffer belongs to */ 322 int current_index; 323 /* index of the chunk in the current ram block */ 324 int current_chunk; 325 326 bool pin_all; 327 328 /* 329 * infiniband-specific variables for opening the device 330 * and maintaining connection state and so forth. 331 * 332 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 333 * cm_id->verbs, cm_id->channel, and cm_id->qp. 334 */ 335 struct rdma_cm_id *cm_id; /* connection manager ID */ 336 struct rdma_cm_id *listen_id; 337 bool connected; 338 339 struct ibv_context *verbs; 340 struct rdma_event_channel *channel; 341 struct ibv_qp *qp; /* queue pair */ 342 struct ibv_comp_channel *comp_channel; /* completion channel */ 343 struct ibv_pd *pd; /* protection domain */ 344 struct ibv_cq *cq; /* completion queue */ 345 346 /* 347 * If a previous write failed (perhaps because of a failed 348 * memory registration, then do not attempt any future work 349 * and remember the error state. 350 */ 351 int error_state; 352 int error_reported; 353 354 /* 355 * Description of ram blocks used throughout the code. 356 */ 357 RDMALocalBlocks local_ram_blocks; 358 RDMADestBlock *dest_blocks; 359 360 /* Index of the next RAMBlock received during block registration */ 361 unsigned int next_src_index; 362 363 /* 364 * Migration on *destination* started. 365 * Then use coroutine yield function. 366 * Source runs in a thread, so we don't care. 367 */ 368 int migration_started_on_destination; 369 370 int total_registrations; 371 int total_writes; 372 373 int unregister_current, unregister_next; 374 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 375 376 GHashTable *blockmap; 377 } RDMAContext; 378 379 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" 380 #define QIO_CHANNEL_RDMA(obj) \ 381 OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA) 382 383 typedef struct QIOChannelRDMA QIOChannelRDMA; 384 385 386 struct QIOChannelRDMA { 387 QIOChannel parent; 388 RDMAContext *rdma; 389 QEMUFile *file; 390 size_t len; 391 bool blocking; /* XXX we don't actually honour this yet */ 392 }; 393 394 /* 395 * Main structure for IB Send/Recv control messages. 396 * This gets prepended at the beginning of every Send/Recv. 397 */ 398 typedef struct QEMU_PACKED { 399 uint32_t len; /* Total length of data portion */ 400 uint32_t type; /* which control command to perform */ 401 uint32_t repeat; /* number of commands in data portion of same type */ 402 uint32_t padding; 403 } RDMAControlHeader; 404 405 static void control_to_network(RDMAControlHeader *control) 406 { 407 control->type = htonl(control->type); 408 control->len = htonl(control->len); 409 control->repeat = htonl(control->repeat); 410 } 411 412 static void network_to_control(RDMAControlHeader *control) 413 { 414 control->type = ntohl(control->type); 415 control->len = ntohl(control->len); 416 control->repeat = ntohl(control->repeat); 417 } 418 419 /* 420 * Register a single Chunk. 421 * Information sent by the source VM to inform the dest 422 * to register an single chunk of memory before we can perform 423 * the actual RDMA operation. 424 */ 425 typedef struct QEMU_PACKED { 426 union QEMU_PACKED { 427 uint64_t current_addr; /* offset into the ram_addr_t space */ 428 uint64_t chunk; /* chunk to lookup if unregistering */ 429 } key; 430 uint32_t current_index; /* which ramblock the chunk belongs to */ 431 uint32_t padding; 432 uint64_t chunks; /* how many sequential chunks to register */ 433 } RDMARegister; 434 435 static void register_to_network(RDMAContext *rdma, RDMARegister *reg) 436 { 437 RDMALocalBlock *local_block; 438 local_block = &rdma->local_ram_blocks.block[reg->current_index]; 439 440 if (local_block->is_ram_block) { 441 /* 442 * current_addr as passed in is an address in the local ram_addr_t 443 * space, we need to translate this for the destination 444 */ 445 reg->key.current_addr -= local_block->offset; 446 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset; 447 } 448 reg->key.current_addr = htonll(reg->key.current_addr); 449 reg->current_index = htonl(reg->current_index); 450 reg->chunks = htonll(reg->chunks); 451 } 452 453 static void network_to_register(RDMARegister *reg) 454 { 455 reg->key.current_addr = ntohll(reg->key.current_addr); 456 reg->current_index = ntohl(reg->current_index); 457 reg->chunks = ntohll(reg->chunks); 458 } 459 460 typedef struct QEMU_PACKED { 461 uint32_t value; /* if zero, we will madvise() */ 462 uint32_t block_idx; /* which ram block index */ 463 uint64_t offset; /* Address in remote ram_addr_t space */ 464 uint64_t length; /* length of the chunk */ 465 } RDMACompress; 466 467 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp) 468 { 469 comp->value = htonl(comp->value); 470 /* 471 * comp->offset as passed in is an address in the local ram_addr_t 472 * space, we need to translate this for the destination 473 */ 474 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset; 475 comp->offset += rdma->dest_blocks[comp->block_idx].offset; 476 comp->block_idx = htonl(comp->block_idx); 477 comp->offset = htonll(comp->offset); 478 comp->length = htonll(comp->length); 479 } 480 481 static void network_to_compress(RDMACompress *comp) 482 { 483 comp->value = ntohl(comp->value); 484 comp->block_idx = ntohl(comp->block_idx); 485 comp->offset = ntohll(comp->offset); 486 comp->length = ntohll(comp->length); 487 } 488 489 /* 490 * The result of the dest's memory registration produces an "rkey" 491 * which the source VM must reference in order to perform 492 * the RDMA operation. 493 */ 494 typedef struct QEMU_PACKED { 495 uint32_t rkey; 496 uint32_t padding; 497 uint64_t host_addr; 498 } RDMARegisterResult; 499 500 static void result_to_network(RDMARegisterResult *result) 501 { 502 result->rkey = htonl(result->rkey); 503 result->host_addr = htonll(result->host_addr); 504 }; 505 506 static void network_to_result(RDMARegisterResult *result) 507 { 508 result->rkey = ntohl(result->rkey); 509 result->host_addr = ntohll(result->host_addr); 510 }; 511 512 const char *print_wrid(int wrid); 513 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 514 uint8_t *data, RDMAControlHeader *resp, 515 int *resp_idx, 516 int (*callback)(RDMAContext *rdma)); 517 518 static inline uint64_t ram_chunk_index(const uint8_t *start, 519 const uint8_t *host) 520 { 521 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 522 } 523 524 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 525 uint64_t i) 526 { 527 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 528 (i << RDMA_REG_CHUNK_SHIFT)); 529 } 530 531 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 532 uint64_t i) 533 { 534 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 535 (1UL << RDMA_REG_CHUNK_SHIFT); 536 537 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 538 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 539 } 540 541 return result; 542 } 543 544 static int rdma_add_block(RDMAContext *rdma, const char *block_name, 545 void *host_addr, 546 ram_addr_t block_offset, uint64_t length) 547 { 548 RDMALocalBlocks *local = &rdma->local_ram_blocks; 549 RDMALocalBlock *block; 550 RDMALocalBlock *old = local->block; 551 552 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1); 553 554 if (local->nb_blocks) { 555 int x; 556 557 if (rdma->blockmap) { 558 for (x = 0; x < local->nb_blocks; x++) { 559 g_hash_table_remove(rdma->blockmap, 560 (void *)(uintptr_t)old[x].offset); 561 g_hash_table_insert(rdma->blockmap, 562 (void *)(uintptr_t)old[x].offset, 563 &local->block[x]); 564 } 565 } 566 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 567 g_free(old); 568 } 569 570 block = &local->block[local->nb_blocks]; 571 572 block->block_name = g_strdup(block_name); 573 block->local_host_addr = host_addr; 574 block->offset = block_offset; 575 block->length = length; 576 block->index = local->nb_blocks; 577 block->src_index = ~0U; /* Filled in by the receipt of the block list */ 578 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 579 block->transit_bitmap = bitmap_new(block->nb_chunks); 580 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 581 block->unregister_bitmap = bitmap_new(block->nb_chunks); 582 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 583 block->remote_keys = g_new0(uint32_t, block->nb_chunks); 584 585 block->is_ram_block = local->init ? false : true; 586 587 if (rdma->blockmap) { 588 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block); 589 } 590 591 trace_rdma_add_block(block_name, local->nb_blocks, 592 (uintptr_t) block->local_host_addr, 593 block->offset, block->length, 594 (uintptr_t) (block->local_host_addr + block->length), 595 BITS_TO_LONGS(block->nb_chunks) * 596 sizeof(unsigned long) * 8, 597 block->nb_chunks); 598 599 local->nb_blocks++; 600 601 return 0; 602 } 603 604 /* 605 * Memory regions need to be registered with the device and queue pairs setup 606 * in advanced before the migration starts. This tells us where the RAM blocks 607 * are so that we can register them individually. 608 */ 609 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr, 610 ram_addr_t block_offset, ram_addr_t length, void *opaque) 611 { 612 return rdma_add_block(opaque, block_name, host_addr, block_offset, length); 613 } 614 615 /* 616 * Identify the RAMBlocks and their quantity. They will be references to 617 * identify chunk boundaries inside each RAMBlock and also be referenced 618 * during dynamic page registration. 619 */ 620 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma) 621 { 622 RDMALocalBlocks *local = &rdma->local_ram_blocks; 623 624 assert(rdma->blockmap == NULL); 625 memset(local, 0, sizeof *local); 626 qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma); 627 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 628 rdma->dest_blocks = g_new0(RDMADestBlock, 629 rdma->local_ram_blocks.nb_blocks); 630 local->init = true; 631 return 0; 632 } 633 634 /* 635 * Note: If used outside of cleanup, the caller must ensure that the destination 636 * block structures are also updated 637 */ 638 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) 639 { 640 RDMALocalBlocks *local = &rdma->local_ram_blocks; 641 RDMALocalBlock *old = local->block; 642 int x; 643 644 if (rdma->blockmap) { 645 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset); 646 } 647 if (block->pmr) { 648 int j; 649 650 for (j = 0; j < block->nb_chunks; j++) { 651 if (!block->pmr[j]) { 652 continue; 653 } 654 ibv_dereg_mr(block->pmr[j]); 655 rdma->total_registrations--; 656 } 657 g_free(block->pmr); 658 block->pmr = NULL; 659 } 660 661 if (block->mr) { 662 ibv_dereg_mr(block->mr); 663 rdma->total_registrations--; 664 block->mr = NULL; 665 } 666 667 g_free(block->transit_bitmap); 668 block->transit_bitmap = NULL; 669 670 g_free(block->unregister_bitmap); 671 block->unregister_bitmap = NULL; 672 673 g_free(block->remote_keys); 674 block->remote_keys = NULL; 675 676 g_free(block->block_name); 677 block->block_name = NULL; 678 679 if (rdma->blockmap) { 680 for (x = 0; x < local->nb_blocks; x++) { 681 g_hash_table_remove(rdma->blockmap, 682 (void *)(uintptr_t)old[x].offset); 683 } 684 } 685 686 if (local->nb_blocks > 1) { 687 688 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1); 689 690 if (block->index) { 691 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 692 } 693 694 if (block->index < (local->nb_blocks - 1)) { 695 memcpy(local->block + block->index, old + (block->index + 1), 696 sizeof(RDMALocalBlock) * 697 (local->nb_blocks - (block->index + 1))); 698 } 699 } else { 700 assert(block == local->block); 701 local->block = NULL; 702 } 703 704 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr, 705 block->offset, block->length, 706 (uintptr_t)(block->local_host_addr + block->length), 707 BITS_TO_LONGS(block->nb_chunks) * 708 sizeof(unsigned long) * 8, block->nb_chunks); 709 710 g_free(old); 711 712 local->nb_blocks--; 713 714 if (local->nb_blocks && rdma->blockmap) { 715 for (x = 0; x < local->nb_blocks; x++) { 716 g_hash_table_insert(rdma->blockmap, 717 (void *)(uintptr_t)local->block[x].offset, 718 &local->block[x]); 719 } 720 } 721 722 return 0; 723 } 724 725 /* 726 * Put in the log file which RDMA device was opened and the details 727 * associated with that device. 728 */ 729 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 730 { 731 struct ibv_port_attr port; 732 733 if (ibv_query_port(verbs, 1, &port)) { 734 error_report("Failed to query port information"); 735 return; 736 } 737 738 printf("%s RDMA Device opened: kernel name %s " 739 "uverbs device name %s, " 740 "infiniband_verbs class device path %s, " 741 "infiniband class device path %s, " 742 "transport: (%d) %s\n", 743 who, 744 verbs->device->name, 745 verbs->device->dev_name, 746 verbs->device->dev_path, 747 verbs->device->ibdev_path, 748 port.link_layer, 749 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" : 750 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 751 ? "Ethernet" : "Unknown")); 752 } 753 754 /* 755 * Put in the log file the RDMA gid addressing information, 756 * useful for folks who have trouble understanding the 757 * RDMA device hierarchy in the kernel. 758 */ 759 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 760 { 761 char sgid[33]; 762 char dgid[33]; 763 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 764 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 765 trace_qemu_rdma_dump_gid(who, sgid, dgid); 766 } 767 768 /* 769 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 770 * We will try the next addrinfo struct, and fail if there are 771 * no other valid addresses to bind against. 772 * 773 * If user is listening on '[::]', then we will not have a opened a device 774 * yet and have no way of verifying if the device is RoCE or not. 775 * 776 * In this case, the source VM will throw an error for ALL types of 777 * connections (both IPv4 and IPv6) if the destination machine does not have 778 * a regular infiniband network available for use. 779 * 780 * The only way to guarantee that an error is thrown for broken kernels is 781 * for the management software to choose a *specific* interface at bind time 782 * and validate what time of hardware it is. 783 * 784 * Unfortunately, this puts the user in a fix: 785 * 786 * If the source VM connects with an IPv4 address without knowing that the 787 * destination has bound to '[::]' the migration will unconditionally fail 788 * unless the management software is explicitly listening on the IPv4 789 * address while using a RoCE-based device. 790 * 791 * If the source VM connects with an IPv6 address, then we're OK because we can 792 * throw an error on the source (and similarly on the destination). 793 * 794 * But in mixed environments, this will be broken for a while until it is fixed 795 * inside linux. 796 * 797 * We do provide a *tiny* bit of help in this function: We can list all of the 798 * devices in the system and check to see if all the devices are RoCE or 799 * Infiniband. 800 * 801 * If we detect that we have a *pure* RoCE environment, then we can safely 802 * thrown an error even if the management software has specified '[::]' as the 803 * bind address. 804 * 805 * However, if there is are multiple hetergeneous devices, then we cannot make 806 * this assumption and the user just has to be sure they know what they are 807 * doing. 808 * 809 * Patches are being reviewed on linux-rdma. 810 */ 811 static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs) 812 { 813 struct ibv_port_attr port_attr; 814 815 /* This bug only exists in linux, to our knowledge. */ 816 #ifdef CONFIG_LINUX 817 818 /* 819 * Verbs are only NULL if management has bound to '[::]'. 820 * 821 * Let's iterate through all the devices and see if there any pure IB 822 * devices (non-ethernet). 823 * 824 * If not, then we can safely proceed with the migration. 825 * Otherwise, there are no guarantees until the bug is fixed in linux. 826 */ 827 if (!verbs) { 828 int num_devices, x; 829 struct ibv_device ** dev_list = ibv_get_device_list(&num_devices); 830 bool roce_found = false; 831 bool ib_found = false; 832 833 for (x = 0; x < num_devices; x++) { 834 verbs = ibv_open_device(dev_list[x]); 835 if (!verbs) { 836 if (errno == EPERM) { 837 continue; 838 } else { 839 return -EINVAL; 840 } 841 } 842 843 if (ibv_query_port(verbs, 1, &port_attr)) { 844 ibv_close_device(verbs); 845 ERROR(errp, "Could not query initial IB port"); 846 return -EINVAL; 847 } 848 849 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 850 ib_found = true; 851 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 852 roce_found = true; 853 } 854 855 ibv_close_device(verbs); 856 857 } 858 859 if (roce_found) { 860 if (ib_found) { 861 fprintf(stderr, "WARN: migrations may fail:" 862 " IPv6 over RoCE / iWARP in linux" 863 " is broken. But since you appear to have a" 864 " mixed RoCE / IB environment, be sure to only" 865 " migrate over the IB fabric until the kernel " 866 " fixes the bug.\n"); 867 } else { 868 ERROR(errp, "You only have RoCE / iWARP devices in your systems" 869 " and your management software has specified '[::]'" 870 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 871 return -ENONET; 872 } 873 } 874 875 return 0; 876 } 877 878 /* 879 * If we have a verbs context, that means that some other than '[::]' was 880 * used by the management software for binding. In which case we can 881 * actually warn the user about a potentially broken kernel. 882 */ 883 884 /* IB ports start with 1, not 0 */ 885 if (ibv_query_port(verbs, 1, &port_attr)) { 886 ERROR(errp, "Could not query initial IB port"); 887 return -EINVAL; 888 } 889 890 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 891 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 " 892 "(but patches on linux-rdma in progress)"); 893 return -ENONET; 894 } 895 896 #endif 897 898 return 0; 899 } 900 901 /* 902 * Figure out which RDMA device corresponds to the requested IP hostname 903 * Also create the initial connection manager identifiers for opening 904 * the connection. 905 */ 906 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 907 { 908 int ret; 909 struct rdma_addrinfo *res; 910 char port_str[16]; 911 struct rdma_cm_event *cm_event; 912 char ip[40] = "unknown"; 913 struct rdma_addrinfo *e; 914 915 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 916 ERROR(errp, "RDMA hostname has not been set"); 917 return -EINVAL; 918 } 919 920 /* create CM channel */ 921 rdma->channel = rdma_create_event_channel(); 922 if (!rdma->channel) { 923 ERROR(errp, "could not create CM channel"); 924 return -EINVAL; 925 } 926 927 /* create CM id */ 928 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 929 if (ret) { 930 ERROR(errp, "could not create channel id"); 931 goto err_resolve_create_id; 932 } 933 934 snprintf(port_str, 16, "%d", rdma->port); 935 port_str[15] = '\0'; 936 937 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 938 if (ret < 0) { 939 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 940 goto err_resolve_get_addr; 941 } 942 943 for (e = res; e != NULL; e = e->ai_next) { 944 inet_ntop(e->ai_family, 945 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 946 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 947 948 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 949 RDMA_RESOLVE_TIMEOUT_MS); 950 if (!ret) { 951 if (e->ai_family == AF_INET6) { 952 ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs); 953 if (ret) { 954 continue; 955 } 956 } 957 goto route; 958 } 959 } 960 961 ERROR(errp, "could not resolve address %s", rdma->host); 962 goto err_resolve_get_addr; 963 964 route: 965 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 966 967 ret = rdma_get_cm_event(rdma->channel, &cm_event); 968 if (ret) { 969 ERROR(errp, "could not perform event_addr_resolved"); 970 goto err_resolve_get_addr; 971 } 972 973 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 974 ERROR(errp, "result not equal to event_addr_resolved %s", 975 rdma_event_str(cm_event->event)); 976 perror("rdma_resolve_addr"); 977 rdma_ack_cm_event(cm_event); 978 ret = -EINVAL; 979 goto err_resolve_get_addr; 980 } 981 rdma_ack_cm_event(cm_event); 982 983 /* resolve route */ 984 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 985 if (ret) { 986 ERROR(errp, "could not resolve rdma route"); 987 goto err_resolve_get_addr; 988 } 989 990 ret = rdma_get_cm_event(rdma->channel, &cm_event); 991 if (ret) { 992 ERROR(errp, "could not perform event_route_resolved"); 993 goto err_resolve_get_addr; 994 } 995 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 996 ERROR(errp, "result not equal to event_route_resolved: %s", 997 rdma_event_str(cm_event->event)); 998 rdma_ack_cm_event(cm_event); 999 ret = -EINVAL; 1000 goto err_resolve_get_addr; 1001 } 1002 rdma_ack_cm_event(cm_event); 1003 rdma->verbs = rdma->cm_id->verbs; 1004 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 1005 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 1006 return 0; 1007 1008 err_resolve_get_addr: 1009 rdma_destroy_id(rdma->cm_id); 1010 rdma->cm_id = NULL; 1011 err_resolve_create_id: 1012 rdma_destroy_event_channel(rdma->channel); 1013 rdma->channel = NULL; 1014 return ret; 1015 } 1016 1017 /* 1018 * Create protection domain and completion queues 1019 */ 1020 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma) 1021 { 1022 /* allocate pd */ 1023 rdma->pd = ibv_alloc_pd(rdma->verbs); 1024 if (!rdma->pd) { 1025 error_report("failed to allocate protection domain"); 1026 return -1; 1027 } 1028 1029 /* create completion channel */ 1030 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs); 1031 if (!rdma->comp_channel) { 1032 error_report("failed to allocate completion channel"); 1033 goto err_alloc_pd_cq; 1034 } 1035 1036 /* 1037 * Completion queue can be filled by both read and write work requests, 1038 * so must reflect the sum of both possible queue sizes. 1039 */ 1040 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1041 NULL, rdma->comp_channel, 0); 1042 if (!rdma->cq) { 1043 error_report("failed to allocate completion queue"); 1044 goto err_alloc_pd_cq; 1045 } 1046 1047 return 0; 1048 1049 err_alloc_pd_cq: 1050 if (rdma->pd) { 1051 ibv_dealloc_pd(rdma->pd); 1052 } 1053 if (rdma->comp_channel) { 1054 ibv_destroy_comp_channel(rdma->comp_channel); 1055 } 1056 rdma->pd = NULL; 1057 rdma->comp_channel = NULL; 1058 return -1; 1059 1060 } 1061 1062 /* 1063 * Create queue pairs. 1064 */ 1065 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1066 { 1067 struct ibv_qp_init_attr attr = { 0 }; 1068 int ret; 1069 1070 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1071 attr.cap.max_recv_wr = 3; 1072 attr.cap.max_send_sge = 1; 1073 attr.cap.max_recv_sge = 1; 1074 attr.send_cq = rdma->cq; 1075 attr.recv_cq = rdma->cq; 1076 attr.qp_type = IBV_QPT_RC; 1077 1078 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr); 1079 if (ret) { 1080 return -1; 1081 } 1082 1083 rdma->qp = rdma->cm_id->qp; 1084 return 0; 1085 } 1086 1087 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma) 1088 { 1089 int i; 1090 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1091 1092 for (i = 0; i < local->nb_blocks; i++) { 1093 local->block[i].mr = 1094 ibv_reg_mr(rdma->pd, 1095 local->block[i].local_host_addr, 1096 local->block[i].length, 1097 IBV_ACCESS_LOCAL_WRITE | 1098 IBV_ACCESS_REMOTE_WRITE 1099 ); 1100 if (!local->block[i].mr) { 1101 perror("Failed to register local dest ram block!\n"); 1102 break; 1103 } 1104 rdma->total_registrations++; 1105 } 1106 1107 if (i >= local->nb_blocks) { 1108 return 0; 1109 } 1110 1111 for (i--; i >= 0; i--) { 1112 ibv_dereg_mr(local->block[i].mr); 1113 rdma->total_registrations--; 1114 } 1115 1116 return -1; 1117 1118 } 1119 1120 /* 1121 * Find the ram block that corresponds to the page requested to be 1122 * transmitted by QEMU. 1123 * 1124 * Once the block is found, also identify which 'chunk' within that 1125 * block that the page belongs to. 1126 * 1127 * This search cannot fail or the migration will fail. 1128 */ 1129 static int qemu_rdma_search_ram_block(RDMAContext *rdma, 1130 uintptr_t block_offset, 1131 uint64_t offset, 1132 uint64_t length, 1133 uint64_t *block_index, 1134 uint64_t *chunk_index) 1135 { 1136 uint64_t current_addr = block_offset + offset; 1137 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1138 (void *) block_offset); 1139 assert(block); 1140 assert(current_addr >= block->offset); 1141 assert((current_addr + length) <= (block->offset + block->length)); 1142 1143 *block_index = block->index; 1144 *chunk_index = ram_chunk_index(block->local_host_addr, 1145 block->local_host_addr + (current_addr - block->offset)); 1146 1147 return 0; 1148 } 1149 1150 /* 1151 * Register a chunk with IB. If the chunk was already registered 1152 * previously, then skip. 1153 * 1154 * Also return the keys associated with the registration needed 1155 * to perform the actual RDMA operation. 1156 */ 1157 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1158 RDMALocalBlock *block, uintptr_t host_addr, 1159 uint32_t *lkey, uint32_t *rkey, int chunk, 1160 uint8_t *chunk_start, uint8_t *chunk_end) 1161 { 1162 if (block->mr) { 1163 if (lkey) { 1164 *lkey = block->mr->lkey; 1165 } 1166 if (rkey) { 1167 *rkey = block->mr->rkey; 1168 } 1169 return 0; 1170 } 1171 1172 /* allocate memory to store chunk MRs */ 1173 if (!block->pmr) { 1174 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks); 1175 } 1176 1177 /* 1178 * If 'rkey', then we're the destination, so grant access to the source. 1179 * 1180 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1181 */ 1182 if (!block->pmr[chunk]) { 1183 uint64_t len = chunk_end - chunk_start; 1184 1185 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1186 1187 block->pmr[chunk] = ibv_reg_mr(rdma->pd, 1188 chunk_start, len, 1189 (rkey ? (IBV_ACCESS_LOCAL_WRITE | 1190 IBV_ACCESS_REMOTE_WRITE) : 0)); 1191 1192 if (!block->pmr[chunk]) { 1193 perror("Failed to register chunk!"); 1194 fprintf(stderr, "Chunk details: block: %d chunk index %d" 1195 " start %" PRIuPTR " end %" PRIuPTR 1196 " host %" PRIuPTR 1197 " local %" PRIuPTR " registrations: %d\n", 1198 block->index, chunk, (uintptr_t)chunk_start, 1199 (uintptr_t)chunk_end, host_addr, 1200 (uintptr_t)block->local_host_addr, 1201 rdma->total_registrations); 1202 return -1; 1203 } 1204 rdma->total_registrations++; 1205 } 1206 1207 if (lkey) { 1208 *lkey = block->pmr[chunk]->lkey; 1209 } 1210 if (rkey) { 1211 *rkey = block->pmr[chunk]->rkey; 1212 } 1213 return 0; 1214 } 1215 1216 /* 1217 * Register (at connection time) the memory used for control 1218 * channel messages. 1219 */ 1220 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1221 { 1222 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1223 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1224 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1225 if (rdma->wr_data[idx].control_mr) { 1226 rdma->total_registrations++; 1227 return 0; 1228 } 1229 error_report("qemu_rdma_reg_control failed"); 1230 return -1; 1231 } 1232 1233 const char *print_wrid(int wrid) 1234 { 1235 if (wrid >= RDMA_WRID_RECV_CONTROL) { 1236 return wrid_desc[RDMA_WRID_RECV_CONTROL]; 1237 } 1238 return wrid_desc[wrid]; 1239 } 1240 1241 /* 1242 * RDMA requires memory registration (mlock/pinning), but this is not good for 1243 * overcommitment. 1244 * 1245 * In preparation for the future where LRU information or workload-specific 1246 * writable writable working set memory access behavior is available to QEMU 1247 * it would be nice to have in place the ability to UN-register/UN-pin 1248 * particular memory regions from the RDMA hardware when it is determine that 1249 * those regions of memory will likely not be accessed again in the near future. 1250 * 1251 * While we do not yet have such information right now, the following 1252 * compile-time option allows us to perform a non-optimized version of this 1253 * behavior. 1254 * 1255 * By uncommenting this option, you will cause *all* RDMA transfers to be 1256 * unregistered immediately after the transfer completes on both sides of the 1257 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode. 1258 * 1259 * This will have a terrible impact on migration performance, so until future 1260 * workload information or LRU information is available, do not attempt to use 1261 * this feature except for basic testing. 1262 */ 1263 //#define RDMA_UNREGISTRATION_EXAMPLE 1264 1265 /* 1266 * Perform a non-optimized memory unregistration after every transfer 1267 * for demonstration purposes, only if pin-all is not requested. 1268 * 1269 * Potential optimizations: 1270 * 1. Start a new thread to run this function continuously 1271 - for bit clearing 1272 - and for receipt of unregister messages 1273 * 2. Use an LRU. 1274 * 3. Use workload hints. 1275 */ 1276 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1277 { 1278 while (rdma->unregistrations[rdma->unregister_current]) { 1279 int ret; 1280 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1281 uint64_t chunk = 1282 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1283 uint64_t index = 1284 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1285 RDMALocalBlock *block = 1286 &(rdma->local_ram_blocks.block[index]); 1287 RDMARegister reg = { .current_index = index }; 1288 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1289 }; 1290 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1291 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1292 .repeat = 1, 1293 }; 1294 1295 trace_qemu_rdma_unregister_waiting_proc(chunk, 1296 rdma->unregister_current); 1297 1298 rdma->unregistrations[rdma->unregister_current] = 0; 1299 rdma->unregister_current++; 1300 1301 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1302 rdma->unregister_current = 0; 1303 } 1304 1305 1306 /* 1307 * Unregistration is speculative (because migration is single-threaded 1308 * and we cannot break the protocol's inifinband message ordering). 1309 * Thus, if the memory is currently being used for transmission, 1310 * then abort the attempt to unregister and try again 1311 * later the next time a completion is received for this memory. 1312 */ 1313 clear_bit(chunk, block->unregister_bitmap); 1314 1315 if (test_bit(chunk, block->transit_bitmap)) { 1316 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1317 continue; 1318 } 1319 1320 trace_qemu_rdma_unregister_waiting_send(chunk); 1321 1322 ret = ibv_dereg_mr(block->pmr[chunk]); 1323 block->pmr[chunk] = NULL; 1324 block->remote_keys[chunk] = 0; 1325 1326 if (ret != 0) { 1327 perror("unregistration chunk failed"); 1328 return -ret; 1329 } 1330 rdma->total_registrations--; 1331 1332 reg.key.chunk = chunk; 1333 register_to_network(rdma, ®); 1334 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1335 &resp, NULL, NULL); 1336 if (ret < 0) { 1337 return ret; 1338 } 1339 1340 trace_qemu_rdma_unregister_waiting_complete(chunk); 1341 } 1342 1343 return 0; 1344 } 1345 1346 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1347 uint64_t chunk) 1348 { 1349 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1350 1351 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1352 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1353 1354 return result; 1355 } 1356 1357 /* 1358 * Set bit for unregistration in the next iteration. 1359 * We cannot transmit right here, but will unpin later. 1360 */ 1361 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index, 1362 uint64_t chunk, uint64_t wr_id) 1363 { 1364 if (rdma->unregistrations[rdma->unregister_next] != 0) { 1365 error_report("rdma migration: queue is full"); 1366 } else { 1367 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1368 1369 if (!test_and_set_bit(chunk, block->unregister_bitmap)) { 1370 trace_qemu_rdma_signal_unregister_append(chunk, 1371 rdma->unregister_next); 1372 1373 rdma->unregistrations[rdma->unregister_next++] = 1374 qemu_rdma_make_wrid(wr_id, index, chunk); 1375 1376 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) { 1377 rdma->unregister_next = 0; 1378 } 1379 } else { 1380 trace_qemu_rdma_signal_unregister_already(chunk); 1381 } 1382 } 1383 } 1384 1385 /* 1386 * Consult the connection manager to see a work request 1387 * (of any kind) has completed. 1388 * Return the work request ID that completed. 1389 */ 1390 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out, 1391 uint32_t *byte_len) 1392 { 1393 int ret; 1394 struct ibv_wc wc; 1395 uint64_t wr_id; 1396 1397 ret = ibv_poll_cq(rdma->cq, 1, &wc); 1398 1399 if (!ret) { 1400 *wr_id_out = RDMA_WRID_NONE; 1401 return 0; 1402 } 1403 1404 if (ret < 0) { 1405 error_report("ibv_poll_cq return %d", ret); 1406 return ret; 1407 } 1408 1409 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1410 1411 if (wc.status != IBV_WC_SUCCESS) { 1412 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n", 1413 wc.status, ibv_wc_status_str(wc.status)); 1414 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]); 1415 1416 return -1; 1417 } 1418 1419 if (rdma->control_ready_expected && 1420 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1421 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL], 1422 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent); 1423 rdma->control_ready_expected = 0; 1424 } 1425 1426 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1427 uint64_t chunk = 1428 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1429 uint64_t index = 1430 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1431 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1432 1433 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent, 1434 index, chunk, block->local_host_addr, 1435 (void *)(uintptr_t)block->remote_host_addr); 1436 1437 clear_bit(chunk, block->transit_bitmap); 1438 1439 if (rdma->nb_sent > 0) { 1440 rdma->nb_sent--; 1441 } 1442 1443 if (!rdma->pin_all) { 1444 /* 1445 * FYI: If one wanted to signal a specific chunk to be unregistered 1446 * using LRU or workload-specific information, this is the function 1447 * you would call to do so. That chunk would then get asynchronously 1448 * unregistered later. 1449 */ 1450 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1451 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id); 1452 #endif 1453 } 1454 } else { 1455 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent); 1456 } 1457 1458 *wr_id_out = wc.wr_id; 1459 if (byte_len) { 1460 *byte_len = wc.byte_len; 1461 } 1462 1463 return 0; 1464 } 1465 1466 /* 1467 * Block until the next work request has completed. 1468 * 1469 * First poll to see if a work request has already completed, 1470 * otherwise block. 1471 * 1472 * If we encounter completed work requests for IDs other than 1473 * the one we're interested in, then that's generally an error. 1474 * 1475 * The only exception is actual RDMA Write completions. These 1476 * completions only need to be recorded, but do not actually 1477 * need further processing. 1478 */ 1479 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested, 1480 uint32_t *byte_len) 1481 { 1482 int num_cq_events = 0, ret = 0; 1483 struct ibv_cq *cq; 1484 void *cq_ctx; 1485 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1486 1487 if (ibv_req_notify_cq(rdma->cq, 0)) { 1488 return -1; 1489 } 1490 /* poll cq first */ 1491 while (wr_id != wrid_requested) { 1492 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1493 if (ret < 0) { 1494 return ret; 1495 } 1496 1497 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1498 1499 if (wr_id == RDMA_WRID_NONE) { 1500 break; 1501 } 1502 if (wr_id != wrid_requested) { 1503 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1504 wrid_requested, print_wrid(wr_id), wr_id); 1505 } 1506 } 1507 1508 if (wr_id == wrid_requested) { 1509 return 0; 1510 } 1511 1512 while (1) { 1513 /* 1514 * Coroutine doesn't start until migration_fd_process_incoming() 1515 * so don't yield unless we know we're running inside of a coroutine. 1516 */ 1517 if (rdma->migration_started_on_destination) { 1518 yield_until_fd_readable(rdma->comp_channel->fd); 1519 } 1520 1521 if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) { 1522 perror("ibv_get_cq_event"); 1523 goto err_block_for_wrid; 1524 } 1525 1526 num_cq_events++; 1527 1528 if (ibv_req_notify_cq(cq, 0)) { 1529 goto err_block_for_wrid; 1530 } 1531 1532 while (wr_id != wrid_requested) { 1533 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1534 if (ret < 0) { 1535 goto err_block_for_wrid; 1536 } 1537 1538 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1539 1540 if (wr_id == RDMA_WRID_NONE) { 1541 break; 1542 } 1543 if (wr_id != wrid_requested) { 1544 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1545 wrid_requested, print_wrid(wr_id), wr_id); 1546 } 1547 } 1548 1549 if (wr_id == wrid_requested) { 1550 goto success_block_for_wrid; 1551 } 1552 } 1553 1554 success_block_for_wrid: 1555 if (num_cq_events) { 1556 ibv_ack_cq_events(cq, num_cq_events); 1557 } 1558 return 0; 1559 1560 err_block_for_wrid: 1561 if (num_cq_events) { 1562 ibv_ack_cq_events(cq, num_cq_events); 1563 } 1564 return ret; 1565 } 1566 1567 /* 1568 * Post a SEND message work request for the control channel 1569 * containing some data and block until the post completes. 1570 */ 1571 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1572 RDMAControlHeader *head) 1573 { 1574 int ret = 0; 1575 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1576 struct ibv_send_wr *bad_wr; 1577 struct ibv_sge sge = { 1578 .addr = (uintptr_t)(wr->control), 1579 .length = head->len + sizeof(RDMAControlHeader), 1580 .lkey = wr->control_mr->lkey, 1581 }; 1582 struct ibv_send_wr send_wr = { 1583 .wr_id = RDMA_WRID_SEND_CONTROL, 1584 .opcode = IBV_WR_SEND, 1585 .send_flags = IBV_SEND_SIGNALED, 1586 .sg_list = &sge, 1587 .num_sge = 1, 1588 }; 1589 1590 trace_qemu_rdma_post_send_control(control_desc[head->type]); 1591 1592 /* 1593 * We don't actually need to do a memcpy() in here if we used 1594 * the "sge" properly, but since we're only sending control messages 1595 * (not RAM in a performance-critical path), then its OK for now. 1596 * 1597 * The copy makes the RDMAControlHeader simpler to manipulate 1598 * for the time being. 1599 */ 1600 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1601 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1602 control_to_network((void *) wr->control); 1603 1604 if (buf) { 1605 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1606 } 1607 1608 1609 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1610 1611 if (ret > 0) { 1612 error_report("Failed to use post IB SEND for control"); 1613 return -ret; 1614 } 1615 1616 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1617 if (ret < 0) { 1618 error_report("rdma migration: send polling control error"); 1619 } 1620 1621 return ret; 1622 } 1623 1624 /* 1625 * Post a RECV work request in anticipation of some future receipt 1626 * of data on the control channel. 1627 */ 1628 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx) 1629 { 1630 struct ibv_recv_wr *bad_wr; 1631 struct ibv_sge sge = { 1632 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1633 .length = RDMA_CONTROL_MAX_BUFFER, 1634 .lkey = rdma->wr_data[idx].control_mr->lkey, 1635 }; 1636 1637 struct ibv_recv_wr recv_wr = { 1638 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1639 .sg_list = &sge, 1640 .num_sge = 1, 1641 }; 1642 1643 1644 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1645 return -1; 1646 } 1647 1648 return 0; 1649 } 1650 1651 /* 1652 * Block and wait for a RECV control channel message to arrive. 1653 */ 1654 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1655 RDMAControlHeader *head, int expecting, int idx) 1656 { 1657 uint32_t byte_len; 1658 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1659 &byte_len); 1660 1661 if (ret < 0) { 1662 error_report("rdma migration: recv polling control error!"); 1663 return ret; 1664 } 1665 1666 network_to_control((void *) rdma->wr_data[idx].control); 1667 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1668 1669 trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]); 1670 1671 if (expecting == RDMA_CONTROL_NONE) { 1672 trace_qemu_rdma_exchange_get_response_none(control_desc[head->type], 1673 head->type); 1674 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1675 error_report("Was expecting a %s (%d) control message" 1676 ", but got: %s (%d), length: %d", 1677 control_desc[expecting], expecting, 1678 control_desc[head->type], head->type, head->len); 1679 return -EIO; 1680 } 1681 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1682 error_report("too long length: %d", head->len); 1683 return -EINVAL; 1684 } 1685 if (sizeof(*head) + head->len != byte_len) { 1686 error_report("Malformed length: %d byte_len %d", head->len, byte_len); 1687 return -EINVAL; 1688 } 1689 1690 return 0; 1691 } 1692 1693 /* 1694 * When a RECV work request has completed, the work request's 1695 * buffer is pointed at the header. 1696 * 1697 * This will advance the pointer to the data portion 1698 * of the control message of the work request's buffer that 1699 * was populated after the work request finished. 1700 */ 1701 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1702 RDMAControlHeader *head) 1703 { 1704 rdma->wr_data[idx].control_len = head->len; 1705 rdma->wr_data[idx].control_curr = 1706 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1707 } 1708 1709 /* 1710 * This is an 'atomic' high-level operation to deliver a single, unified 1711 * control-channel message. 1712 * 1713 * Additionally, if the user is expecting some kind of reply to this message, 1714 * they can request a 'resp' response message be filled in by posting an 1715 * additional work request on behalf of the user and waiting for an additional 1716 * completion. 1717 * 1718 * The extra (optional) response is used during registration to us from having 1719 * to perform an *additional* exchange of message just to provide a response by 1720 * instead piggy-backing on the acknowledgement. 1721 */ 1722 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1723 uint8_t *data, RDMAControlHeader *resp, 1724 int *resp_idx, 1725 int (*callback)(RDMAContext *rdma)) 1726 { 1727 int ret = 0; 1728 1729 /* 1730 * Wait until the dest is ready before attempting to deliver the message 1731 * by waiting for a READY message. 1732 */ 1733 if (rdma->control_ready_expected) { 1734 RDMAControlHeader resp; 1735 ret = qemu_rdma_exchange_get_response(rdma, 1736 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY); 1737 if (ret < 0) { 1738 return ret; 1739 } 1740 } 1741 1742 /* 1743 * If the user is expecting a response, post a WR in anticipation of it. 1744 */ 1745 if (resp) { 1746 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA); 1747 if (ret) { 1748 error_report("rdma migration: error posting" 1749 " extra control recv for anticipated result!"); 1750 return ret; 1751 } 1752 } 1753 1754 /* 1755 * Post a WR to replace the one we just consumed for the READY message. 1756 */ 1757 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1758 if (ret) { 1759 error_report("rdma migration: error posting first control recv!"); 1760 return ret; 1761 } 1762 1763 /* 1764 * Deliver the control message that was requested. 1765 */ 1766 ret = qemu_rdma_post_send_control(rdma, data, head); 1767 1768 if (ret < 0) { 1769 error_report("Failed to send control buffer!"); 1770 return ret; 1771 } 1772 1773 /* 1774 * If we're expecting a response, block and wait for it. 1775 */ 1776 if (resp) { 1777 if (callback) { 1778 trace_qemu_rdma_exchange_send_issue_callback(); 1779 ret = callback(rdma); 1780 if (ret < 0) { 1781 return ret; 1782 } 1783 } 1784 1785 trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]); 1786 ret = qemu_rdma_exchange_get_response(rdma, resp, 1787 resp->type, RDMA_WRID_DATA); 1788 1789 if (ret < 0) { 1790 return ret; 1791 } 1792 1793 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1794 if (resp_idx) { 1795 *resp_idx = RDMA_WRID_DATA; 1796 } 1797 trace_qemu_rdma_exchange_send_received(control_desc[resp->type]); 1798 } 1799 1800 rdma->control_ready_expected = 1; 1801 1802 return 0; 1803 } 1804 1805 /* 1806 * This is an 'atomic' high-level operation to receive a single, unified 1807 * control-channel message. 1808 */ 1809 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 1810 int expecting) 1811 { 1812 RDMAControlHeader ready = { 1813 .len = 0, 1814 .type = RDMA_CONTROL_READY, 1815 .repeat = 1, 1816 }; 1817 int ret; 1818 1819 /* 1820 * Inform the source that we're ready to receive a message. 1821 */ 1822 ret = qemu_rdma_post_send_control(rdma, NULL, &ready); 1823 1824 if (ret < 0) { 1825 error_report("Failed to send control buffer!"); 1826 return ret; 1827 } 1828 1829 /* 1830 * Block and wait for the message. 1831 */ 1832 ret = qemu_rdma_exchange_get_response(rdma, head, 1833 expecting, RDMA_WRID_READY); 1834 1835 if (ret < 0) { 1836 return ret; 1837 } 1838 1839 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 1840 1841 /* 1842 * Post a new RECV work request to replace the one we just consumed. 1843 */ 1844 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1845 if (ret) { 1846 error_report("rdma migration: error posting second control recv!"); 1847 return ret; 1848 } 1849 1850 return 0; 1851 } 1852 1853 /* 1854 * Write an actual chunk of memory using RDMA. 1855 * 1856 * If we're using dynamic registration on the dest-side, we have to 1857 * send a registration command first. 1858 */ 1859 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, 1860 int current_index, uint64_t current_addr, 1861 uint64_t length) 1862 { 1863 struct ibv_sge sge; 1864 struct ibv_send_wr send_wr = { 0 }; 1865 struct ibv_send_wr *bad_wr; 1866 int reg_result_idx, ret, count = 0; 1867 uint64_t chunk, chunks; 1868 uint8_t *chunk_start, *chunk_end; 1869 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 1870 RDMARegister reg; 1871 RDMARegisterResult *reg_result; 1872 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 1873 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1874 .type = RDMA_CONTROL_REGISTER_REQUEST, 1875 .repeat = 1, 1876 }; 1877 1878 retry: 1879 sge.addr = (uintptr_t)(block->local_host_addr + 1880 (current_addr - block->offset)); 1881 sge.length = length; 1882 1883 chunk = ram_chunk_index(block->local_host_addr, 1884 (uint8_t *)(uintptr_t)sge.addr); 1885 chunk_start = ram_chunk_start(block, chunk); 1886 1887 if (block->is_ram_block) { 1888 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 1889 1890 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1891 chunks--; 1892 } 1893 } else { 1894 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 1895 1896 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1897 chunks--; 1898 } 1899 } 1900 1901 trace_qemu_rdma_write_one_top(chunks + 1, 1902 (chunks + 1) * 1903 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 1904 1905 chunk_end = ram_chunk_end(block, chunk + chunks); 1906 1907 if (!rdma->pin_all) { 1908 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1909 qemu_rdma_unregister_waiting(rdma); 1910 #endif 1911 } 1912 1913 while (test_bit(chunk, block->transit_bitmap)) { 1914 (void)count; 1915 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 1916 sge.addr, length, rdma->nb_sent, block->nb_chunks); 1917 1918 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 1919 1920 if (ret < 0) { 1921 error_report("Failed to Wait for previous write to complete " 1922 "block %d chunk %" PRIu64 1923 " current %" PRIu64 " len %" PRIu64 " %d", 1924 current_index, chunk, sge.addr, length, rdma->nb_sent); 1925 return ret; 1926 } 1927 } 1928 1929 if (!rdma->pin_all || !block->is_ram_block) { 1930 if (!block->remote_keys[chunk]) { 1931 /* 1932 * This chunk has not yet been registered, so first check to see 1933 * if the entire chunk is zero. If so, tell the other size to 1934 * memset() + madvise() the entire chunk without RDMA. 1935 */ 1936 1937 if (can_use_buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr, 1938 length) 1939 && buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr, 1940 length) == length) { 1941 RDMACompress comp = { 1942 .offset = current_addr, 1943 .value = 0, 1944 .block_idx = current_index, 1945 .length = length, 1946 }; 1947 1948 head.len = sizeof(comp); 1949 head.type = RDMA_CONTROL_COMPRESS; 1950 1951 trace_qemu_rdma_write_one_zero(chunk, sge.length, 1952 current_index, current_addr); 1953 1954 compress_to_network(rdma, &comp); 1955 ret = qemu_rdma_exchange_send(rdma, &head, 1956 (uint8_t *) &comp, NULL, NULL, NULL); 1957 1958 if (ret < 0) { 1959 return -EIO; 1960 } 1961 1962 acct_update_position(f, sge.length, true); 1963 1964 return 1; 1965 } 1966 1967 /* 1968 * Otherwise, tell other side to register. 1969 */ 1970 reg.current_index = current_index; 1971 if (block->is_ram_block) { 1972 reg.key.current_addr = current_addr; 1973 } else { 1974 reg.key.chunk = chunk; 1975 } 1976 reg.chunks = chunks; 1977 1978 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 1979 current_addr); 1980 1981 register_to_network(rdma, ®); 1982 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1983 &resp, ®_result_idx, NULL); 1984 if (ret < 0) { 1985 return ret; 1986 } 1987 1988 /* try to overlap this single registration with the one we sent. */ 1989 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1990 &sge.lkey, NULL, chunk, 1991 chunk_start, chunk_end)) { 1992 error_report("cannot get lkey"); 1993 return -EINVAL; 1994 } 1995 1996 reg_result = (RDMARegisterResult *) 1997 rdma->wr_data[reg_result_idx].control_curr; 1998 1999 network_to_result(reg_result); 2000 2001 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 2002 reg_result->rkey, chunk); 2003 2004 block->remote_keys[chunk] = reg_result->rkey; 2005 block->remote_host_addr = reg_result->host_addr; 2006 } else { 2007 /* already registered before */ 2008 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2009 &sge.lkey, NULL, chunk, 2010 chunk_start, chunk_end)) { 2011 error_report("cannot get lkey!"); 2012 return -EINVAL; 2013 } 2014 } 2015 2016 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 2017 } else { 2018 send_wr.wr.rdma.rkey = block->remote_rkey; 2019 2020 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2021 &sge.lkey, NULL, chunk, 2022 chunk_start, chunk_end)) { 2023 error_report("cannot get lkey!"); 2024 return -EINVAL; 2025 } 2026 } 2027 2028 /* 2029 * Encode the ram block index and chunk within this wrid. 2030 * We will use this information at the time of completion 2031 * to figure out which bitmap to check against and then which 2032 * chunk in the bitmap to look for. 2033 */ 2034 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 2035 current_index, chunk); 2036 2037 send_wr.opcode = IBV_WR_RDMA_WRITE; 2038 send_wr.send_flags = IBV_SEND_SIGNALED; 2039 send_wr.sg_list = &sge; 2040 send_wr.num_sge = 1; 2041 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2042 (current_addr - block->offset); 2043 2044 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2045 sge.length); 2046 2047 /* 2048 * ibv_post_send() does not return negative error numbers, 2049 * per the specification they are positive - no idea why. 2050 */ 2051 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2052 2053 if (ret == ENOMEM) { 2054 trace_qemu_rdma_write_one_queue_full(); 2055 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2056 if (ret < 0) { 2057 error_report("rdma migration: failed to make " 2058 "room in full send queue! %d", ret); 2059 return ret; 2060 } 2061 2062 goto retry; 2063 2064 } else if (ret > 0) { 2065 perror("rdma migration: post rdma write failed"); 2066 return -ret; 2067 } 2068 2069 set_bit(chunk, block->transit_bitmap); 2070 acct_update_position(f, sge.length, false); 2071 rdma->total_writes++; 2072 2073 return 0; 2074 } 2075 2076 /* 2077 * Push out any unwritten RDMA operations. 2078 * 2079 * We support sending out multiple chunks at the same time. 2080 * Not all of them need to get signaled in the completion queue. 2081 */ 2082 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) 2083 { 2084 int ret; 2085 2086 if (!rdma->current_length) { 2087 return 0; 2088 } 2089 2090 ret = qemu_rdma_write_one(f, rdma, 2091 rdma->current_index, rdma->current_addr, rdma->current_length); 2092 2093 if (ret < 0) { 2094 return ret; 2095 } 2096 2097 if (ret == 0) { 2098 rdma->nb_sent++; 2099 trace_qemu_rdma_write_flush(rdma->nb_sent); 2100 } 2101 2102 rdma->current_length = 0; 2103 rdma->current_addr = 0; 2104 2105 return 0; 2106 } 2107 2108 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma, 2109 uint64_t offset, uint64_t len) 2110 { 2111 RDMALocalBlock *block; 2112 uint8_t *host_addr; 2113 uint8_t *chunk_end; 2114 2115 if (rdma->current_index < 0) { 2116 return 0; 2117 } 2118 2119 if (rdma->current_chunk < 0) { 2120 return 0; 2121 } 2122 2123 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2124 host_addr = block->local_host_addr + (offset - block->offset); 2125 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2126 2127 if (rdma->current_length == 0) { 2128 return 0; 2129 } 2130 2131 /* 2132 * Only merge into chunk sequentially. 2133 */ 2134 if (offset != (rdma->current_addr + rdma->current_length)) { 2135 return 0; 2136 } 2137 2138 if (offset < block->offset) { 2139 return 0; 2140 } 2141 2142 if ((offset + len) > (block->offset + block->length)) { 2143 return 0; 2144 } 2145 2146 if ((host_addr + len) > chunk_end) { 2147 return 0; 2148 } 2149 2150 return 1; 2151 } 2152 2153 /* 2154 * We're not actually writing here, but doing three things: 2155 * 2156 * 1. Identify the chunk the buffer belongs to. 2157 * 2. If the chunk is full or the buffer doesn't belong to the current 2158 * chunk, then start a new chunk and flush() the old chunk. 2159 * 3. To keep the hardware busy, we also group chunks into batches 2160 * and only require that a batch gets acknowledged in the completion 2161 * qeueue instead of each individual chunk. 2162 */ 2163 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma, 2164 uint64_t block_offset, uint64_t offset, 2165 uint64_t len) 2166 { 2167 uint64_t current_addr = block_offset + offset; 2168 uint64_t index = rdma->current_index; 2169 uint64_t chunk = rdma->current_chunk; 2170 int ret; 2171 2172 /* If we cannot merge it, we flush the current buffer first. */ 2173 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) { 2174 ret = qemu_rdma_write_flush(f, rdma); 2175 if (ret) { 2176 return ret; 2177 } 2178 rdma->current_length = 0; 2179 rdma->current_addr = current_addr; 2180 2181 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2182 offset, len, &index, &chunk); 2183 if (ret) { 2184 error_report("ram block search failed"); 2185 return ret; 2186 } 2187 rdma->current_index = index; 2188 rdma->current_chunk = chunk; 2189 } 2190 2191 /* merge it */ 2192 rdma->current_length += len; 2193 2194 /* flush it if buffer is too large */ 2195 if (rdma->current_length >= RDMA_MERGE_MAX) { 2196 return qemu_rdma_write_flush(f, rdma); 2197 } 2198 2199 return 0; 2200 } 2201 2202 static void qemu_rdma_cleanup(RDMAContext *rdma) 2203 { 2204 struct rdma_cm_event *cm_event; 2205 int ret, idx; 2206 2207 if (rdma->cm_id && rdma->connected) { 2208 if (rdma->error_state) { 2209 RDMAControlHeader head = { .len = 0, 2210 .type = RDMA_CONTROL_ERROR, 2211 .repeat = 1, 2212 }; 2213 error_report("Early error. Sending error."); 2214 qemu_rdma_post_send_control(rdma, NULL, &head); 2215 } 2216 2217 ret = rdma_disconnect(rdma->cm_id); 2218 if (!ret) { 2219 trace_qemu_rdma_cleanup_waiting_for_disconnect(); 2220 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2221 if (!ret) { 2222 rdma_ack_cm_event(cm_event); 2223 } 2224 } 2225 trace_qemu_rdma_cleanup_disconnect(); 2226 rdma->connected = false; 2227 } 2228 2229 g_free(rdma->dest_blocks); 2230 rdma->dest_blocks = NULL; 2231 2232 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2233 if (rdma->wr_data[idx].control_mr) { 2234 rdma->total_registrations--; 2235 ibv_dereg_mr(rdma->wr_data[idx].control_mr); 2236 } 2237 rdma->wr_data[idx].control_mr = NULL; 2238 } 2239 2240 if (rdma->local_ram_blocks.block) { 2241 while (rdma->local_ram_blocks.nb_blocks) { 2242 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); 2243 } 2244 } 2245 2246 if (rdma->qp) { 2247 rdma_destroy_qp(rdma->cm_id); 2248 rdma->qp = NULL; 2249 } 2250 if (rdma->cq) { 2251 ibv_destroy_cq(rdma->cq); 2252 rdma->cq = NULL; 2253 } 2254 if (rdma->comp_channel) { 2255 ibv_destroy_comp_channel(rdma->comp_channel); 2256 rdma->comp_channel = NULL; 2257 } 2258 if (rdma->pd) { 2259 ibv_dealloc_pd(rdma->pd); 2260 rdma->pd = NULL; 2261 } 2262 if (rdma->cm_id) { 2263 rdma_destroy_id(rdma->cm_id); 2264 rdma->cm_id = NULL; 2265 } 2266 if (rdma->listen_id) { 2267 rdma_destroy_id(rdma->listen_id); 2268 rdma->listen_id = NULL; 2269 } 2270 if (rdma->channel) { 2271 rdma_destroy_event_channel(rdma->channel); 2272 rdma->channel = NULL; 2273 } 2274 g_free(rdma->host); 2275 rdma->host = NULL; 2276 } 2277 2278 2279 static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all) 2280 { 2281 int ret, idx; 2282 Error *local_err = NULL, **temp = &local_err; 2283 2284 /* 2285 * Will be validated against destination's actual capabilities 2286 * after the connect() completes. 2287 */ 2288 rdma->pin_all = pin_all; 2289 2290 ret = qemu_rdma_resolve_host(rdma, temp); 2291 if (ret) { 2292 goto err_rdma_source_init; 2293 } 2294 2295 ret = qemu_rdma_alloc_pd_cq(rdma); 2296 if (ret) { 2297 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()" 2298 " limits may be too low. Please check $ ulimit -a # and " 2299 "search for 'ulimit -l' in the output"); 2300 goto err_rdma_source_init; 2301 } 2302 2303 ret = qemu_rdma_alloc_qp(rdma); 2304 if (ret) { 2305 ERROR(temp, "rdma migration: error allocating qp!"); 2306 goto err_rdma_source_init; 2307 } 2308 2309 ret = qemu_rdma_init_ram_blocks(rdma); 2310 if (ret) { 2311 ERROR(temp, "rdma migration: error initializing ram blocks!"); 2312 goto err_rdma_source_init; 2313 } 2314 2315 /* Build the hash that maps from offset to RAMBlock */ 2316 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 2317 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) { 2318 g_hash_table_insert(rdma->blockmap, 2319 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset, 2320 &rdma->local_ram_blocks.block[idx]); 2321 } 2322 2323 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2324 ret = qemu_rdma_reg_control(rdma, idx); 2325 if (ret) { 2326 ERROR(temp, "rdma migration: error registering %d control!", 2327 idx); 2328 goto err_rdma_source_init; 2329 } 2330 } 2331 2332 return 0; 2333 2334 err_rdma_source_init: 2335 error_propagate(errp, local_err); 2336 qemu_rdma_cleanup(rdma); 2337 return -1; 2338 } 2339 2340 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp) 2341 { 2342 RDMACapabilities cap = { 2343 .version = RDMA_CONTROL_VERSION_CURRENT, 2344 .flags = 0, 2345 }; 2346 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2347 .retry_count = 5, 2348 .private_data = &cap, 2349 .private_data_len = sizeof(cap), 2350 }; 2351 struct rdma_cm_event *cm_event; 2352 int ret; 2353 2354 /* 2355 * Only negotiate the capability with destination if the user 2356 * on the source first requested the capability. 2357 */ 2358 if (rdma->pin_all) { 2359 trace_qemu_rdma_connect_pin_all_requested(); 2360 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2361 } 2362 2363 caps_to_network(&cap); 2364 2365 ret = rdma_connect(rdma->cm_id, &conn_param); 2366 if (ret) { 2367 perror("rdma_connect"); 2368 ERROR(errp, "connecting to destination!"); 2369 goto err_rdma_source_connect; 2370 } 2371 2372 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2373 if (ret) { 2374 perror("rdma_get_cm_event after rdma_connect"); 2375 ERROR(errp, "connecting to destination!"); 2376 rdma_ack_cm_event(cm_event); 2377 goto err_rdma_source_connect; 2378 } 2379 2380 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2381 perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect"); 2382 ERROR(errp, "connecting to destination!"); 2383 rdma_ack_cm_event(cm_event); 2384 goto err_rdma_source_connect; 2385 } 2386 rdma->connected = true; 2387 2388 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2389 network_to_caps(&cap); 2390 2391 /* 2392 * Verify that the *requested* capabilities are supported by the destination 2393 * and disable them otherwise. 2394 */ 2395 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2396 ERROR(errp, "Server cannot support pinning all memory. " 2397 "Will register memory dynamically."); 2398 rdma->pin_all = false; 2399 } 2400 2401 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2402 2403 rdma_ack_cm_event(cm_event); 2404 2405 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2406 if (ret) { 2407 ERROR(errp, "posting second control recv!"); 2408 goto err_rdma_source_connect; 2409 } 2410 2411 rdma->control_ready_expected = 1; 2412 rdma->nb_sent = 0; 2413 return 0; 2414 2415 err_rdma_source_connect: 2416 qemu_rdma_cleanup(rdma); 2417 return -1; 2418 } 2419 2420 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2421 { 2422 int ret, idx; 2423 struct rdma_cm_id *listen_id; 2424 char ip[40] = "unknown"; 2425 struct rdma_addrinfo *res, *e; 2426 char port_str[16]; 2427 2428 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2429 rdma->wr_data[idx].control_len = 0; 2430 rdma->wr_data[idx].control_curr = NULL; 2431 } 2432 2433 if (!rdma->host || !rdma->host[0]) { 2434 ERROR(errp, "RDMA host is not set!"); 2435 rdma->error_state = -EINVAL; 2436 return -1; 2437 } 2438 /* create CM channel */ 2439 rdma->channel = rdma_create_event_channel(); 2440 if (!rdma->channel) { 2441 ERROR(errp, "could not create rdma event channel"); 2442 rdma->error_state = -EINVAL; 2443 return -1; 2444 } 2445 2446 /* create CM id */ 2447 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2448 if (ret) { 2449 ERROR(errp, "could not create cm_id!"); 2450 goto err_dest_init_create_listen_id; 2451 } 2452 2453 snprintf(port_str, 16, "%d", rdma->port); 2454 port_str[15] = '\0'; 2455 2456 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2457 if (ret < 0) { 2458 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 2459 goto err_dest_init_bind_addr; 2460 } 2461 2462 for (e = res; e != NULL; e = e->ai_next) { 2463 inet_ntop(e->ai_family, 2464 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2465 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2466 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2467 if (ret) { 2468 continue; 2469 } 2470 if (e->ai_family == AF_INET6) { 2471 ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs); 2472 if (ret) { 2473 continue; 2474 } 2475 } 2476 break; 2477 } 2478 2479 if (!e) { 2480 ERROR(errp, "Error: could not rdma_bind_addr!"); 2481 goto err_dest_init_bind_addr; 2482 } 2483 2484 rdma->listen_id = listen_id; 2485 qemu_rdma_dump_gid("dest_init", listen_id); 2486 return 0; 2487 2488 err_dest_init_bind_addr: 2489 rdma_destroy_id(listen_id); 2490 err_dest_init_create_listen_id: 2491 rdma_destroy_event_channel(rdma->channel); 2492 rdma->channel = NULL; 2493 rdma->error_state = ret; 2494 return ret; 2495 2496 } 2497 2498 static void *qemu_rdma_data_init(const char *host_port, Error **errp) 2499 { 2500 RDMAContext *rdma = NULL; 2501 InetSocketAddress *addr; 2502 2503 if (host_port) { 2504 rdma = g_new0(RDMAContext, 1); 2505 rdma->current_index = -1; 2506 rdma->current_chunk = -1; 2507 2508 addr = inet_parse(host_port, NULL); 2509 if (addr != NULL) { 2510 rdma->port = atoi(addr->port); 2511 rdma->host = g_strdup(addr->host); 2512 } else { 2513 ERROR(errp, "bad RDMA migration address '%s'", host_port); 2514 g_free(rdma); 2515 rdma = NULL; 2516 } 2517 2518 qapi_free_InetSocketAddress(addr); 2519 } 2520 2521 return rdma; 2522 } 2523 2524 /* 2525 * QEMUFile interface to the control channel. 2526 * SEND messages for control only. 2527 * VM's ram is handled with regular RDMA messages. 2528 */ 2529 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, 2530 const struct iovec *iov, 2531 size_t niov, 2532 int *fds, 2533 size_t nfds, 2534 Error **errp) 2535 { 2536 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2537 QEMUFile *f = rioc->file; 2538 RDMAContext *rdma = rioc->rdma; 2539 int ret; 2540 ssize_t done = 0; 2541 size_t i; 2542 2543 CHECK_ERROR_STATE(); 2544 2545 /* 2546 * Push out any writes that 2547 * we're queued up for VM's ram. 2548 */ 2549 ret = qemu_rdma_write_flush(f, rdma); 2550 if (ret < 0) { 2551 rdma->error_state = ret; 2552 return ret; 2553 } 2554 2555 for (i = 0; i < niov; i++) { 2556 size_t remaining = iov[i].iov_len; 2557 uint8_t * data = (void *)iov[i].iov_base; 2558 while (remaining) { 2559 RDMAControlHeader head; 2560 2561 rioc->len = MIN(remaining, RDMA_SEND_INCREMENT); 2562 remaining -= rioc->len; 2563 2564 head.len = rioc->len; 2565 head.type = RDMA_CONTROL_QEMU_FILE; 2566 2567 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); 2568 2569 if (ret < 0) { 2570 rdma->error_state = ret; 2571 return ret; 2572 } 2573 2574 data += rioc->len; 2575 done += rioc->len; 2576 } 2577 } 2578 2579 return done; 2580 } 2581 2582 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2583 size_t size, int idx) 2584 { 2585 size_t len = 0; 2586 2587 if (rdma->wr_data[idx].control_len) { 2588 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2589 2590 len = MIN(size, rdma->wr_data[idx].control_len); 2591 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2592 rdma->wr_data[idx].control_curr += len; 2593 rdma->wr_data[idx].control_len -= len; 2594 } 2595 2596 return len; 2597 } 2598 2599 /* 2600 * QEMUFile interface to the control channel. 2601 * RDMA links don't use bytestreams, so we have to 2602 * return bytes to QEMUFile opportunistically. 2603 */ 2604 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, 2605 const struct iovec *iov, 2606 size_t niov, 2607 int **fds, 2608 size_t *nfds, 2609 Error **errp) 2610 { 2611 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2612 RDMAContext *rdma = rioc->rdma; 2613 RDMAControlHeader head; 2614 int ret = 0; 2615 ssize_t i; 2616 size_t done = 0; 2617 2618 CHECK_ERROR_STATE(); 2619 2620 for (i = 0; i < niov; i++) { 2621 size_t want = iov[i].iov_len; 2622 uint8_t *data = (void *)iov[i].iov_base; 2623 2624 /* 2625 * First, we hold on to the last SEND message we 2626 * were given and dish out the bytes until we run 2627 * out of bytes. 2628 */ 2629 ret = qemu_rdma_fill(rioc->rdma, data, want, 0); 2630 done += ret; 2631 want -= ret; 2632 /* Got what we needed, so go to next iovec */ 2633 if (want == 0) { 2634 continue; 2635 } 2636 2637 /* If we got any data so far, then don't wait 2638 * for more, just return what we have */ 2639 if (done > 0) { 2640 break; 2641 } 2642 2643 2644 /* We've got nothing at all, so lets wait for 2645 * more to arrive 2646 */ 2647 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); 2648 2649 if (ret < 0) { 2650 rdma->error_state = ret; 2651 return ret; 2652 } 2653 2654 /* 2655 * SEND was received with new bytes, now try again. 2656 */ 2657 ret = qemu_rdma_fill(rioc->rdma, data, want, 0); 2658 done += ret; 2659 want -= ret; 2660 2661 /* Still didn't get enough, so lets just return */ 2662 if (want) { 2663 if (done == 0) { 2664 return QIO_CHANNEL_ERR_BLOCK; 2665 } else { 2666 break; 2667 } 2668 } 2669 } 2670 rioc->len = done; 2671 return rioc->len; 2672 } 2673 2674 /* 2675 * Block until all the outstanding chunks have been delivered by the hardware. 2676 */ 2677 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) 2678 { 2679 int ret; 2680 2681 if (qemu_rdma_write_flush(f, rdma) < 0) { 2682 return -EIO; 2683 } 2684 2685 while (rdma->nb_sent) { 2686 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2687 if (ret < 0) { 2688 error_report("rdma migration: complete polling error!"); 2689 return -EIO; 2690 } 2691 } 2692 2693 qemu_rdma_unregister_waiting(rdma); 2694 2695 return 0; 2696 } 2697 2698 2699 static int qio_channel_rdma_set_blocking(QIOChannel *ioc, 2700 bool blocking, 2701 Error **errp) 2702 { 2703 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2704 /* XXX we should make readv/writev actually honour this :-) */ 2705 rioc->blocking = blocking; 2706 return 0; 2707 } 2708 2709 2710 typedef struct QIOChannelRDMASource QIOChannelRDMASource; 2711 struct QIOChannelRDMASource { 2712 GSource parent; 2713 QIOChannelRDMA *rioc; 2714 GIOCondition condition; 2715 }; 2716 2717 static gboolean 2718 qio_channel_rdma_source_prepare(GSource *source, 2719 gint *timeout) 2720 { 2721 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2722 RDMAContext *rdma = rsource->rioc->rdma; 2723 GIOCondition cond = 0; 2724 *timeout = -1; 2725 2726 if (rdma->wr_data[0].control_len) { 2727 cond |= G_IO_IN; 2728 } 2729 cond |= G_IO_OUT; 2730 2731 return cond & rsource->condition; 2732 } 2733 2734 static gboolean 2735 qio_channel_rdma_source_check(GSource *source) 2736 { 2737 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2738 RDMAContext *rdma = rsource->rioc->rdma; 2739 GIOCondition cond = 0; 2740 2741 if (rdma->wr_data[0].control_len) { 2742 cond |= G_IO_IN; 2743 } 2744 cond |= G_IO_OUT; 2745 2746 return cond & rsource->condition; 2747 } 2748 2749 static gboolean 2750 qio_channel_rdma_source_dispatch(GSource *source, 2751 GSourceFunc callback, 2752 gpointer user_data) 2753 { 2754 QIOChannelFunc func = (QIOChannelFunc)callback; 2755 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2756 RDMAContext *rdma = rsource->rioc->rdma; 2757 GIOCondition cond = 0; 2758 2759 if (rdma->wr_data[0].control_len) { 2760 cond |= G_IO_IN; 2761 } 2762 cond |= G_IO_OUT; 2763 2764 return (*func)(QIO_CHANNEL(rsource->rioc), 2765 (cond & rsource->condition), 2766 user_data); 2767 } 2768 2769 static void 2770 qio_channel_rdma_source_finalize(GSource *source) 2771 { 2772 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source; 2773 2774 object_unref(OBJECT(ssource->rioc)); 2775 } 2776 2777 GSourceFuncs qio_channel_rdma_source_funcs = { 2778 qio_channel_rdma_source_prepare, 2779 qio_channel_rdma_source_check, 2780 qio_channel_rdma_source_dispatch, 2781 qio_channel_rdma_source_finalize 2782 }; 2783 2784 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, 2785 GIOCondition condition) 2786 { 2787 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2788 QIOChannelRDMASource *ssource; 2789 GSource *source; 2790 2791 source = g_source_new(&qio_channel_rdma_source_funcs, 2792 sizeof(QIOChannelRDMASource)); 2793 ssource = (QIOChannelRDMASource *)source; 2794 2795 ssource->rioc = rioc; 2796 object_ref(OBJECT(rioc)); 2797 2798 ssource->condition = condition; 2799 2800 return source; 2801 } 2802 2803 2804 static int qio_channel_rdma_close(QIOChannel *ioc, 2805 Error **errp) 2806 { 2807 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2808 trace_qemu_rdma_close(); 2809 if (rioc->rdma) { 2810 qemu_rdma_cleanup(rioc->rdma); 2811 g_free(rioc->rdma); 2812 rioc->rdma = NULL; 2813 } 2814 return 0; 2815 } 2816 2817 /* 2818 * Parameters: 2819 * @offset == 0 : 2820 * This means that 'block_offset' is a full virtual address that does not 2821 * belong to a RAMBlock of the virtual machine and instead 2822 * represents a private malloc'd memory area that the caller wishes to 2823 * transfer. 2824 * 2825 * @offset != 0 : 2826 * Offset is an offset to be added to block_offset and used 2827 * to also lookup the corresponding RAMBlock. 2828 * 2829 * @size > 0 : 2830 * Initiate an transfer this size. 2831 * 2832 * @size == 0 : 2833 * A 'hint' or 'advice' that means that we wish to speculatively 2834 * and asynchronously unregister this memory. In this case, there is no 2835 * guarantee that the unregister will actually happen, for example, 2836 * if the memory is being actively transmitted. Additionally, the memory 2837 * may be re-registered at any future time if a write within the same 2838 * chunk was requested again, even if you attempted to unregister it 2839 * here. 2840 * 2841 * @size < 0 : TODO, not yet supported 2842 * Unregister the memory NOW. This means that the caller does not 2843 * expect there to be any future RDMA transfers and we just want to clean 2844 * things up. This is used in case the upper layer owns the memory and 2845 * cannot wait for qemu_fclose() to occur. 2846 * 2847 * @bytes_sent : User-specificed pointer to indicate how many bytes were 2848 * sent. Usually, this will not be more than a few bytes of 2849 * the protocol because most transfers are sent asynchronously. 2850 */ 2851 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, 2852 ram_addr_t block_offset, ram_addr_t offset, 2853 size_t size, uint64_t *bytes_sent) 2854 { 2855 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 2856 RDMAContext *rdma = rioc->rdma; 2857 int ret; 2858 2859 CHECK_ERROR_STATE(); 2860 2861 qemu_fflush(f); 2862 2863 if (size > 0) { 2864 /* 2865 * Add this page to the current 'chunk'. If the chunk 2866 * is full, or the page doen't belong to the current chunk, 2867 * an actual RDMA write will occur and a new chunk will be formed. 2868 */ 2869 ret = qemu_rdma_write(f, rdma, block_offset, offset, size); 2870 if (ret < 0) { 2871 error_report("rdma migration: write error! %d", ret); 2872 goto err; 2873 } 2874 2875 /* 2876 * We always return 1 bytes because the RDMA 2877 * protocol is completely asynchronous. We do not yet know 2878 * whether an identified chunk is zero or not because we're 2879 * waiting for other pages to potentially be merged with 2880 * the current chunk. So, we have to call qemu_update_position() 2881 * later on when the actual write occurs. 2882 */ 2883 if (bytes_sent) { 2884 *bytes_sent = 1; 2885 } 2886 } else { 2887 uint64_t index, chunk; 2888 2889 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long 2890 if (size < 0) { 2891 ret = qemu_rdma_drain_cq(f, rdma); 2892 if (ret < 0) { 2893 fprintf(stderr, "rdma: failed to synchronously drain" 2894 " completion queue before unregistration.\n"); 2895 goto err; 2896 } 2897 } 2898 */ 2899 2900 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2901 offset, size, &index, &chunk); 2902 2903 if (ret) { 2904 error_report("ram block search failed"); 2905 goto err; 2906 } 2907 2908 qemu_rdma_signal_unregister(rdma, index, chunk, 0); 2909 2910 /* 2911 * TODO: Synchronous, guaranteed unregistration (should not occur during 2912 * fast-path). Otherwise, unregisters will process on the next call to 2913 * qemu_rdma_drain_cq() 2914 if (size < 0) { 2915 qemu_rdma_unregister_waiting(rdma); 2916 } 2917 */ 2918 } 2919 2920 /* 2921 * Drain the Completion Queue if possible, but do not block, 2922 * just poll. 2923 * 2924 * If nothing to poll, the end of the iteration will do this 2925 * again to make sure we don't overflow the request queue. 2926 */ 2927 while (1) { 2928 uint64_t wr_id, wr_id_in; 2929 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL); 2930 if (ret < 0) { 2931 error_report("rdma migration: polling error! %d", ret); 2932 goto err; 2933 } 2934 2935 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 2936 2937 if (wr_id == RDMA_WRID_NONE) { 2938 break; 2939 } 2940 } 2941 2942 return RAM_SAVE_CONTROL_DELAYED; 2943 err: 2944 rdma->error_state = ret; 2945 return ret; 2946 } 2947 2948 static int qemu_rdma_accept(RDMAContext *rdma) 2949 { 2950 RDMACapabilities cap; 2951 struct rdma_conn_param conn_param = { 2952 .responder_resources = 2, 2953 .private_data = &cap, 2954 .private_data_len = sizeof(cap), 2955 }; 2956 struct rdma_cm_event *cm_event; 2957 struct ibv_context *verbs; 2958 int ret = -EINVAL; 2959 int idx; 2960 2961 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2962 if (ret) { 2963 goto err_rdma_dest_wait; 2964 } 2965 2966 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 2967 rdma_ack_cm_event(cm_event); 2968 goto err_rdma_dest_wait; 2969 } 2970 2971 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2972 2973 network_to_caps(&cap); 2974 2975 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 2976 error_report("Unknown source RDMA version: %d, bailing...", 2977 cap.version); 2978 rdma_ack_cm_event(cm_event); 2979 goto err_rdma_dest_wait; 2980 } 2981 2982 /* 2983 * Respond with only the capabilities this version of QEMU knows about. 2984 */ 2985 cap.flags &= known_capabilities; 2986 2987 /* 2988 * Enable the ones that we do know about. 2989 * Add other checks here as new ones are introduced. 2990 */ 2991 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 2992 rdma->pin_all = true; 2993 } 2994 2995 rdma->cm_id = cm_event->id; 2996 verbs = cm_event->id->verbs; 2997 2998 rdma_ack_cm_event(cm_event); 2999 3000 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 3001 3002 caps_to_network(&cap); 3003 3004 trace_qemu_rdma_accept_pin_verbsc(verbs); 3005 3006 if (!rdma->verbs) { 3007 rdma->verbs = verbs; 3008 } else if (rdma->verbs != verbs) { 3009 error_report("ibv context not matching %p, %p!", rdma->verbs, 3010 verbs); 3011 goto err_rdma_dest_wait; 3012 } 3013 3014 qemu_rdma_dump_id("dest_init", verbs); 3015 3016 ret = qemu_rdma_alloc_pd_cq(rdma); 3017 if (ret) { 3018 error_report("rdma migration: error allocating pd and cq!"); 3019 goto err_rdma_dest_wait; 3020 } 3021 3022 ret = qemu_rdma_alloc_qp(rdma); 3023 if (ret) { 3024 error_report("rdma migration: error allocating qp!"); 3025 goto err_rdma_dest_wait; 3026 } 3027 3028 ret = qemu_rdma_init_ram_blocks(rdma); 3029 if (ret) { 3030 error_report("rdma migration: error initializing ram blocks!"); 3031 goto err_rdma_dest_wait; 3032 } 3033 3034 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 3035 ret = qemu_rdma_reg_control(rdma, idx); 3036 if (ret) { 3037 error_report("rdma: error registering %d control", idx); 3038 goto err_rdma_dest_wait; 3039 } 3040 } 3041 3042 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 3043 3044 ret = rdma_accept(rdma->cm_id, &conn_param); 3045 if (ret) { 3046 error_report("rdma_accept returns %d", ret); 3047 goto err_rdma_dest_wait; 3048 } 3049 3050 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3051 if (ret) { 3052 error_report("rdma_accept get_cm_event failed %d", ret); 3053 goto err_rdma_dest_wait; 3054 } 3055 3056 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 3057 error_report("rdma_accept not event established"); 3058 rdma_ack_cm_event(cm_event); 3059 goto err_rdma_dest_wait; 3060 } 3061 3062 rdma_ack_cm_event(cm_event); 3063 rdma->connected = true; 3064 3065 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 3066 if (ret) { 3067 error_report("rdma migration: error posting second control recv"); 3068 goto err_rdma_dest_wait; 3069 } 3070 3071 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 3072 3073 return 0; 3074 3075 err_rdma_dest_wait: 3076 rdma->error_state = ret; 3077 qemu_rdma_cleanup(rdma); 3078 return ret; 3079 } 3080 3081 static int dest_ram_sort_func(const void *a, const void *b) 3082 { 3083 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index; 3084 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index; 3085 3086 return (a_index < b_index) ? -1 : (a_index != b_index); 3087 } 3088 3089 /* 3090 * During each iteration of the migration, we listen for instructions 3091 * by the source VM to perform dynamic page registrations before they 3092 * can perform RDMA operations. 3093 * 3094 * We respond with the 'rkey'. 3095 * 3096 * Keep doing this until the source tells us to stop. 3097 */ 3098 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) 3099 { 3100 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 3101 .type = RDMA_CONTROL_REGISTER_RESULT, 3102 .repeat = 0, 3103 }; 3104 RDMAControlHeader unreg_resp = { .len = 0, 3105 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 3106 .repeat = 0, 3107 }; 3108 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 3109 .repeat = 1 }; 3110 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3111 RDMAContext *rdma = rioc->rdma; 3112 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3113 RDMAControlHeader head; 3114 RDMARegister *reg, *registers; 3115 RDMACompress *comp; 3116 RDMARegisterResult *reg_result; 3117 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 3118 RDMALocalBlock *block; 3119 void *host_addr; 3120 int ret = 0; 3121 int idx = 0; 3122 int count = 0; 3123 int i = 0; 3124 3125 CHECK_ERROR_STATE(); 3126 3127 do { 3128 trace_qemu_rdma_registration_handle_wait(); 3129 3130 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE); 3131 3132 if (ret < 0) { 3133 break; 3134 } 3135 3136 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 3137 error_report("rdma: Too many requests in this message (%d)." 3138 "Bailing.", head.repeat); 3139 ret = -EIO; 3140 break; 3141 } 3142 3143 switch (head.type) { 3144 case RDMA_CONTROL_COMPRESS: 3145 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 3146 network_to_compress(comp); 3147 3148 trace_qemu_rdma_registration_handle_compress(comp->length, 3149 comp->block_idx, 3150 comp->offset); 3151 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) { 3152 error_report("rdma: 'compress' bad block index %u (vs %d)", 3153 (unsigned int)comp->block_idx, 3154 rdma->local_ram_blocks.nb_blocks); 3155 ret = -EIO; 3156 goto out; 3157 } 3158 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 3159 3160 host_addr = block->local_host_addr + 3161 (comp->offset - block->offset); 3162 3163 ram_handle_compressed(host_addr, comp->value, comp->length); 3164 break; 3165 3166 case RDMA_CONTROL_REGISTER_FINISHED: 3167 trace_qemu_rdma_registration_handle_finished(); 3168 goto out; 3169 3170 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 3171 trace_qemu_rdma_registration_handle_ram_blocks(); 3172 3173 /* Sort our local RAM Block list so it's the same as the source, 3174 * we can do this since we've filled in a src_index in the list 3175 * as we received the RAMBlock list earlier. 3176 */ 3177 qsort(rdma->local_ram_blocks.block, 3178 rdma->local_ram_blocks.nb_blocks, 3179 sizeof(RDMALocalBlock), dest_ram_sort_func); 3180 if (rdma->pin_all) { 3181 ret = qemu_rdma_reg_whole_ram_blocks(rdma); 3182 if (ret) { 3183 error_report("rdma migration: error dest " 3184 "registering ram blocks"); 3185 goto out; 3186 } 3187 } 3188 3189 /* 3190 * Dest uses this to prepare to transmit the RAMBlock descriptions 3191 * to the source VM after connection setup. 3192 * Both sides use the "remote" structure to communicate and update 3193 * their "local" descriptions with what was sent. 3194 */ 3195 for (i = 0; i < local->nb_blocks; i++) { 3196 rdma->dest_blocks[i].remote_host_addr = 3197 (uintptr_t)(local->block[i].local_host_addr); 3198 3199 if (rdma->pin_all) { 3200 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 3201 } 3202 3203 rdma->dest_blocks[i].offset = local->block[i].offset; 3204 rdma->dest_blocks[i].length = local->block[i].length; 3205 3206 dest_block_to_network(&rdma->dest_blocks[i]); 3207 trace_qemu_rdma_registration_handle_ram_blocks_loop( 3208 local->block[i].block_name, 3209 local->block[i].offset, 3210 local->block[i].length, 3211 local->block[i].local_host_addr, 3212 local->block[i].src_index); 3213 } 3214 3215 blocks.len = rdma->local_ram_blocks.nb_blocks 3216 * sizeof(RDMADestBlock); 3217 3218 3219 ret = qemu_rdma_post_send_control(rdma, 3220 (uint8_t *) rdma->dest_blocks, &blocks); 3221 3222 if (ret < 0) { 3223 error_report("rdma migration: error sending remote info"); 3224 goto out; 3225 } 3226 3227 break; 3228 case RDMA_CONTROL_REGISTER_REQUEST: 3229 trace_qemu_rdma_registration_handle_register(head.repeat); 3230 3231 reg_resp.repeat = head.repeat; 3232 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3233 3234 for (count = 0; count < head.repeat; count++) { 3235 uint64_t chunk; 3236 uint8_t *chunk_start, *chunk_end; 3237 3238 reg = ®isters[count]; 3239 network_to_register(reg); 3240 3241 reg_result = &results[count]; 3242 3243 trace_qemu_rdma_registration_handle_register_loop(count, 3244 reg->current_index, reg->key.current_addr, reg->chunks); 3245 3246 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) { 3247 error_report("rdma: 'register' bad block index %u (vs %d)", 3248 (unsigned int)reg->current_index, 3249 rdma->local_ram_blocks.nb_blocks); 3250 ret = -ENOENT; 3251 goto out; 3252 } 3253 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3254 if (block->is_ram_block) { 3255 if (block->offset > reg->key.current_addr) { 3256 error_report("rdma: bad register address for block %s" 3257 " offset: %" PRIx64 " current_addr: %" PRIx64, 3258 block->block_name, block->offset, 3259 reg->key.current_addr); 3260 ret = -ERANGE; 3261 goto out; 3262 } 3263 host_addr = (block->local_host_addr + 3264 (reg->key.current_addr - block->offset)); 3265 chunk = ram_chunk_index(block->local_host_addr, 3266 (uint8_t *) host_addr); 3267 } else { 3268 chunk = reg->key.chunk; 3269 host_addr = block->local_host_addr + 3270 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3271 /* Check for particularly bad chunk value */ 3272 if (host_addr < (void *)block->local_host_addr) { 3273 error_report("rdma: bad chunk for block %s" 3274 " chunk: %" PRIx64, 3275 block->block_name, reg->key.chunk); 3276 ret = -ERANGE; 3277 goto out; 3278 } 3279 } 3280 chunk_start = ram_chunk_start(block, chunk); 3281 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3282 if (qemu_rdma_register_and_get_keys(rdma, block, 3283 (uintptr_t)host_addr, NULL, ®_result->rkey, 3284 chunk, chunk_start, chunk_end)) { 3285 error_report("cannot get rkey"); 3286 ret = -EINVAL; 3287 goto out; 3288 } 3289 3290 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3291 3292 trace_qemu_rdma_registration_handle_register_rkey( 3293 reg_result->rkey); 3294 3295 result_to_network(reg_result); 3296 } 3297 3298 ret = qemu_rdma_post_send_control(rdma, 3299 (uint8_t *) results, ®_resp); 3300 3301 if (ret < 0) { 3302 error_report("Failed to send control buffer"); 3303 goto out; 3304 } 3305 break; 3306 case RDMA_CONTROL_UNREGISTER_REQUEST: 3307 trace_qemu_rdma_registration_handle_unregister(head.repeat); 3308 unreg_resp.repeat = head.repeat; 3309 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3310 3311 for (count = 0; count < head.repeat; count++) { 3312 reg = ®isters[count]; 3313 network_to_register(reg); 3314 3315 trace_qemu_rdma_registration_handle_unregister_loop(count, 3316 reg->current_index, reg->key.chunk); 3317 3318 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3319 3320 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3321 block->pmr[reg->key.chunk] = NULL; 3322 3323 if (ret != 0) { 3324 perror("rdma unregistration chunk failed"); 3325 ret = -ret; 3326 goto out; 3327 } 3328 3329 rdma->total_registrations--; 3330 3331 trace_qemu_rdma_registration_handle_unregister_success( 3332 reg->key.chunk); 3333 } 3334 3335 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp); 3336 3337 if (ret < 0) { 3338 error_report("Failed to send control buffer"); 3339 goto out; 3340 } 3341 break; 3342 case RDMA_CONTROL_REGISTER_RESULT: 3343 error_report("Invalid RESULT message at dest."); 3344 ret = -EIO; 3345 goto out; 3346 default: 3347 error_report("Unknown control message %s", control_desc[head.type]); 3348 ret = -EIO; 3349 goto out; 3350 } 3351 } while (1); 3352 out: 3353 if (ret < 0) { 3354 rdma->error_state = ret; 3355 } 3356 return ret; 3357 } 3358 3359 /* Destination: 3360 * Called via a ram_control_load_hook during the initial RAM load section which 3361 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks 3362 * on the source. 3363 * We've already built our local RAMBlock list, but not yet sent the list to 3364 * the source. 3365 */ 3366 static int 3367 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) 3368 { 3369 RDMAContext *rdma = rioc->rdma; 3370 int curr; 3371 int found = -1; 3372 3373 /* Find the matching RAMBlock in our local list */ 3374 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { 3375 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { 3376 found = curr; 3377 break; 3378 } 3379 } 3380 3381 if (found == -1) { 3382 error_report("RAMBlock '%s' not found on destination", name); 3383 return -ENOENT; 3384 } 3385 3386 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index; 3387 trace_rdma_block_notification_handle(name, rdma->next_src_index); 3388 rdma->next_src_index++; 3389 3390 return 0; 3391 } 3392 3393 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) 3394 { 3395 switch (flags) { 3396 case RAM_CONTROL_BLOCK_REG: 3397 return rdma_block_notification_handle(opaque, data); 3398 3399 case RAM_CONTROL_HOOK: 3400 return qemu_rdma_registration_handle(f, opaque); 3401 3402 default: 3403 /* Shouldn't be called with any other values */ 3404 abort(); 3405 } 3406 } 3407 3408 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, 3409 uint64_t flags, void *data) 3410 { 3411 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3412 RDMAContext *rdma = rioc->rdma; 3413 3414 CHECK_ERROR_STATE(); 3415 3416 trace_qemu_rdma_registration_start(flags); 3417 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3418 qemu_fflush(f); 3419 3420 return 0; 3421 } 3422 3423 /* 3424 * Inform dest that dynamic registrations are done for now. 3425 * First, flush writes, if any. 3426 */ 3427 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, 3428 uint64_t flags, void *data) 3429 { 3430 Error *local_err = NULL, **errp = &local_err; 3431 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3432 RDMAContext *rdma = rioc->rdma; 3433 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3434 int ret = 0; 3435 3436 CHECK_ERROR_STATE(); 3437 3438 qemu_fflush(f); 3439 ret = qemu_rdma_drain_cq(f, rdma); 3440 3441 if (ret < 0) { 3442 goto err; 3443 } 3444 3445 if (flags == RAM_CONTROL_SETUP) { 3446 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3447 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3448 int reg_result_idx, i, nb_dest_blocks; 3449 3450 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3451 trace_qemu_rdma_registration_stop_ram(); 3452 3453 /* 3454 * Make sure that we parallelize the pinning on both sides. 3455 * For very large guests, doing this serially takes a really 3456 * long time, so we have to 'interleave' the pinning locally 3457 * with the control messages by performing the pinning on this 3458 * side before we receive the control response from the other 3459 * side that the pinning has completed. 3460 */ 3461 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3462 ®_result_idx, rdma->pin_all ? 3463 qemu_rdma_reg_whole_ram_blocks : NULL); 3464 if (ret < 0) { 3465 ERROR(errp, "receiving remote info!"); 3466 return ret; 3467 } 3468 3469 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3470 3471 /* 3472 * The protocol uses two different sets of rkeys (mutually exclusive): 3473 * 1. One key to represent the virtual address of the entire ram block. 3474 * (dynamic chunk registration disabled - pin everything with one rkey.) 3475 * 2. One to represent individual chunks within a ram block. 3476 * (dynamic chunk registration enabled - pin individual chunks.) 3477 * 3478 * Once the capability is successfully negotiated, the destination transmits 3479 * the keys to use (or sends them later) including the virtual addresses 3480 * and then propagates the remote ram block descriptions to his local copy. 3481 */ 3482 3483 if (local->nb_blocks != nb_dest_blocks) { 3484 ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) " 3485 "Your QEMU command line parameters are probably " 3486 "not identical on both the source and destination.", 3487 local->nb_blocks, nb_dest_blocks); 3488 rdma->error_state = -EINVAL; 3489 return -EINVAL; 3490 } 3491 3492 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 3493 memcpy(rdma->dest_blocks, 3494 rdma->wr_data[reg_result_idx].control_curr, resp.len); 3495 for (i = 0; i < nb_dest_blocks; i++) { 3496 network_to_dest_block(&rdma->dest_blocks[i]); 3497 3498 /* We require that the blocks are in the same order */ 3499 if (rdma->dest_blocks[i].length != local->block[i].length) { 3500 ERROR(errp, "Block %s/%d has a different length %" PRIu64 3501 "vs %" PRIu64, local->block[i].block_name, i, 3502 local->block[i].length, 3503 rdma->dest_blocks[i].length); 3504 rdma->error_state = -EINVAL; 3505 return -EINVAL; 3506 } 3507 local->block[i].remote_host_addr = 3508 rdma->dest_blocks[i].remote_host_addr; 3509 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; 3510 } 3511 } 3512 3513 trace_qemu_rdma_registration_stop(flags); 3514 3515 head.type = RDMA_CONTROL_REGISTER_FINISHED; 3516 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); 3517 3518 if (ret < 0) { 3519 goto err; 3520 } 3521 3522 return 0; 3523 err: 3524 rdma->error_state = ret; 3525 return ret; 3526 } 3527 3528 static const QEMUFileHooks rdma_read_hooks = { 3529 .hook_ram_load = rdma_load_hook, 3530 }; 3531 3532 static const QEMUFileHooks rdma_write_hooks = { 3533 .before_ram_iterate = qemu_rdma_registration_start, 3534 .after_ram_iterate = qemu_rdma_registration_stop, 3535 .save_page = qemu_rdma_save_page, 3536 }; 3537 3538 3539 static void qio_channel_rdma_finalize(Object *obj) 3540 { 3541 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); 3542 if (rioc->rdma) { 3543 qemu_rdma_cleanup(rioc->rdma); 3544 g_free(rioc->rdma); 3545 rioc->rdma = NULL; 3546 } 3547 } 3548 3549 static void qio_channel_rdma_class_init(ObjectClass *klass, 3550 void *class_data G_GNUC_UNUSED) 3551 { 3552 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 3553 3554 ioc_klass->io_writev = qio_channel_rdma_writev; 3555 ioc_klass->io_readv = qio_channel_rdma_readv; 3556 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; 3557 ioc_klass->io_close = qio_channel_rdma_close; 3558 ioc_klass->io_create_watch = qio_channel_rdma_create_watch; 3559 } 3560 3561 static const TypeInfo qio_channel_rdma_info = { 3562 .parent = TYPE_QIO_CHANNEL, 3563 .name = TYPE_QIO_CHANNEL_RDMA, 3564 .instance_size = sizeof(QIOChannelRDMA), 3565 .instance_finalize = qio_channel_rdma_finalize, 3566 .class_init = qio_channel_rdma_class_init, 3567 }; 3568 3569 static void qio_channel_rdma_register_types(void) 3570 { 3571 type_register_static(&qio_channel_rdma_info); 3572 } 3573 3574 type_init(qio_channel_rdma_register_types); 3575 3576 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) 3577 { 3578 QIOChannelRDMA *rioc; 3579 3580 if (qemu_file_mode_is_not_valid(mode)) { 3581 return NULL; 3582 } 3583 3584 rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); 3585 rioc->rdma = rdma; 3586 3587 if (mode[0] == 'w') { 3588 rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); 3589 qemu_file_set_hooks(rioc->file, &rdma_write_hooks); 3590 } else { 3591 rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc)); 3592 qemu_file_set_hooks(rioc->file, &rdma_read_hooks); 3593 } 3594 3595 return rioc->file; 3596 } 3597 3598 static void rdma_accept_incoming_migration(void *opaque) 3599 { 3600 RDMAContext *rdma = opaque; 3601 int ret; 3602 QEMUFile *f; 3603 Error *local_err = NULL, **errp = &local_err; 3604 3605 trace_qemu_rdma_accept_incoming_migration(); 3606 ret = qemu_rdma_accept(rdma); 3607 3608 if (ret) { 3609 ERROR(errp, "RDMA Migration initialization failed!"); 3610 return; 3611 } 3612 3613 trace_qemu_rdma_accept_incoming_migration_accepted(); 3614 3615 f = qemu_fopen_rdma(rdma, "rb"); 3616 if (f == NULL) { 3617 ERROR(errp, "could not qemu_fopen_rdma!"); 3618 qemu_rdma_cleanup(rdma); 3619 return; 3620 } 3621 3622 rdma->migration_started_on_destination = 1; 3623 migration_fd_process_incoming(f); 3624 } 3625 3626 void rdma_start_incoming_migration(const char *host_port, Error **errp) 3627 { 3628 int ret; 3629 RDMAContext *rdma; 3630 Error *local_err = NULL; 3631 3632 trace_rdma_start_incoming_migration(); 3633 rdma = qemu_rdma_data_init(host_port, &local_err); 3634 3635 if (rdma == NULL) { 3636 goto err; 3637 } 3638 3639 ret = qemu_rdma_dest_init(rdma, &local_err); 3640 3641 if (ret) { 3642 goto err; 3643 } 3644 3645 trace_rdma_start_incoming_migration_after_dest_init(); 3646 3647 ret = rdma_listen(rdma->listen_id, 5); 3648 3649 if (ret) { 3650 ERROR(errp, "listening on socket!"); 3651 goto err; 3652 } 3653 3654 trace_rdma_start_incoming_migration_after_rdma_listen(); 3655 3656 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3657 NULL, (void *)(intptr_t)rdma); 3658 return; 3659 err: 3660 error_propagate(errp, local_err); 3661 g_free(rdma); 3662 } 3663 3664 void rdma_start_outgoing_migration(void *opaque, 3665 const char *host_port, Error **errp) 3666 { 3667 MigrationState *s = opaque; 3668 RDMAContext *rdma = qemu_rdma_data_init(host_port, errp); 3669 int ret = 0; 3670 3671 if (rdma == NULL) { 3672 goto err; 3673 } 3674 3675 ret = qemu_rdma_source_init(rdma, errp, 3676 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]); 3677 3678 if (ret) { 3679 goto err; 3680 } 3681 3682 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 3683 ret = qemu_rdma_connect(rdma, errp); 3684 3685 if (ret) { 3686 goto err; 3687 } 3688 3689 trace_rdma_start_outgoing_migration_after_rdma_connect(); 3690 3691 s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); 3692 migrate_fd_connect(s); 3693 return; 3694 err: 3695 g_free(rdma); 3696 } 3697