1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * Copyright Red Hat, Inc. 2015-2016 6 * 7 * Authors: 8 * Michael R. Hines <mrhines@us.ibm.com> 9 * Jiuxing Liu <jl@us.ibm.com> 10 * Daniel P. Berrange <berrange@redhat.com> 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or 13 * later. See the COPYING file in the top-level directory. 14 * 15 */ 16 17 #include "qemu/osdep.h" 18 #include "qapi/error.h" 19 #include "qemu/cutils.h" 20 #include "rdma.h" 21 #include "migration.h" 22 #include "qemu-file.h" 23 #include "ram.h" 24 #include "qemu-file-channel.h" 25 #include "qemu/error-report.h" 26 #include "qemu/main-loop.h" 27 #include "qemu/module.h" 28 #include "qemu/rcu.h" 29 #include "qemu/sockets.h" 30 #include "qemu/bitmap.h" 31 #include "qemu/coroutine.h" 32 #include "exec/memory.h" 33 #include <sys/socket.h> 34 #include <netdb.h> 35 #include <arpa/inet.h> 36 #include <rdma/rdma_cma.h> 37 #include "trace.h" 38 #include "qom/object.h" 39 #include <poll.h> 40 41 /* 42 * Print and error on both the Monitor and the Log file. 43 */ 44 #define ERROR(errp, fmt, ...) \ 45 do { \ 46 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \ 47 if (errp && (*(errp) == NULL)) { \ 48 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \ 49 } \ 50 } while (0) 51 52 #define RDMA_RESOLVE_TIMEOUT_MS 10000 53 54 /* Do not merge data if larger than this. */ 55 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 56 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 57 58 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 59 60 /* 61 * This is only for non-live state being migrated. 62 * Instead of RDMA_WRITE messages, we use RDMA_SEND 63 * messages for that state, which requires a different 64 * delivery design than main memory. 65 */ 66 #define RDMA_SEND_INCREMENT 32768 67 68 /* 69 * Maximum size infiniband SEND message 70 */ 71 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 72 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 73 74 #define RDMA_CONTROL_VERSION_CURRENT 1 75 /* 76 * Capabilities for negotiation. 77 */ 78 #define RDMA_CAPABILITY_PIN_ALL 0x01 79 80 /* 81 * Add the other flags above to this list of known capabilities 82 * as they are introduced. 83 */ 84 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 85 86 #define CHECK_ERROR_STATE() \ 87 do { \ 88 if (rdma->error_state) { \ 89 if (!rdma->error_reported) { \ 90 error_report("RDMA is in an error state waiting migration" \ 91 " to abort!"); \ 92 rdma->error_reported = 1; \ 93 } \ 94 return rdma->error_state; \ 95 } \ 96 } while (0) 97 98 /* 99 * A work request ID is 64-bits and we split up these bits 100 * into 3 parts: 101 * 102 * bits 0-15 : type of control message, 2^16 103 * bits 16-29: ram block index, 2^14 104 * bits 30-63: ram block chunk number, 2^34 105 * 106 * The last two bit ranges are only used for RDMA writes, 107 * in order to track their completion and potentially 108 * also track unregistration status of the message. 109 */ 110 #define RDMA_WRID_TYPE_SHIFT 0UL 111 #define RDMA_WRID_BLOCK_SHIFT 16UL 112 #define RDMA_WRID_CHUNK_SHIFT 30UL 113 114 #define RDMA_WRID_TYPE_MASK \ 115 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 116 117 #define RDMA_WRID_BLOCK_MASK \ 118 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 119 120 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 121 122 /* 123 * RDMA migration protocol: 124 * 1. RDMA Writes (data messages, i.e. RAM) 125 * 2. IB Send/Recv (control channel messages) 126 */ 127 enum { 128 RDMA_WRID_NONE = 0, 129 RDMA_WRID_RDMA_WRITE = 1, 130 RDMA_WRID_SEND_CONTROL = 2000, 131 RDMA_WRID_RECV_CONTROL = 4000, 132 }; 133 134 static const char *wrid_desc[] = { 135 [RDMA_WRID_NONE] = "NONE", 136 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA", 137 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND", 138 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", 139 }; 140 141 /* 142 * Work request IDs for IB SEND messages only (not RDMA writes). 143 * This is used by the migration protocol to transmit 144 * control messages (such as device state and registration commands) 145 * 146 * We could use more WRs, but we have enough for now. 147 */ 148 enum { 149 RDMA_WRID_READY = 0, 150 RDMA_WRID_DATA, 151 RDMA_WRID_CONTROL, 152 RDMA_WRID_MAX, 153 }; 154 155 /* 156 * SEND/RECV IB Control Messages. 157 */ 158 enum { 159 RDMA_CONTROL_NONE = 0, 160 RDMA_CONTROL_ERROR, 161 RDMA_CONTROL_READY, /* ready to receive */ 162 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 163 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 164 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 165 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 166 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 167 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 168 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 169 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 170 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 171 }; 172 173 174 /* 175 * Memory and MR structures used to represent an IB Send/Recv work request. 176 * This is *not* used for RDMA writes, only IB Send/Recv. 177 */ 178 typedef struct { 179 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 180 struct ibv_mr *control_mr; /* registration metadata */ 181 size_t control_len; /* length of the message */ 182 uint8_t *control_curr; /* start of unconsumed bytes */ 183 } RDMAWorkRequestData; 184 185 /* 186 * Negotiate RDMA capabilities during connection-setup time. 187 */ 188 typedef struct { 189 uint32_t version; 190 uint32_t flags; 191 } RDMACapabilities; 192 193 static void caps_to_network(RDMACapabilities *cap) 194 { 195 cap->version = htonl(cap->version); 196 cap->flags = htonl(cap->flags); 197 } 198 199 static void network_to_caps(RDMACapabilities *cap) 200 { 201 cap->version = ntohl(cap->version); 202 cap->flags = ntohl(cap->flags); 203 } 204 205 /* 206 * Representation of a RAMBlock from an RDMA perspective. 207 * This is not transmitted, only local. 208 * This and subsequent structures cannot be linked lists 209 * because we're using a single IB message to transmit 210 * the information. It's small anyway, so a list is overkill. 211 */ 212 typedef struct RDMALocalBlock { 213 char *block_name; 214 uint8_t *local_host_addr; /* local virtual address */ 215 uint64_t remote_host_addr; /* remote virtual address */ 216 uint64_t offset; 217 uint64_t length; 218 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 219 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 220 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 221 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 222 int index; /* which block are we */ 223 unsigned int src_index; /* (Only used on dest) */ 224 bool is_ram_block; 225 int nb_chunks; 226 unsigned long *transit_bitmap; 227 unsigned long *unregister_bitmap; 228 } RDMALocalBlock; 229 230 /* 231 * Also represents a RAMblock, but only on the dest. 232 * This gets transmitted by the dest during connection-time 233 * to the source VM and then is used to populate the 234 * corresponding RDMALocalBlock with 235 * the information needed to perform the actual RDMA. 236 */ 237 typedef struct QEMU_PACKED RDMADestBlock { 238 uint64_t remote_host_addr; 239 uint64_t offset; 240 uint64_t length; 241 uint32_t remote_rkey; 242 uint32_t padding; 243 } RDMADestBlock; 244 245 static const char *control_desc(unsigned int rdma_control) 246 { 247 static const char *strs[] = { 248 [RDMA_CONTROL_NONE] = "NONE", 249 [RDMA_CONTROL_ERROR] = "ERROR", 250 [RDMA_CONTROL_READY] = "READY", 251 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 252 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 253 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 254 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 255 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 256 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 257 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 258 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 259 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 260 }; 261 262 if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) { 263 return "??BAD CONTROL VALUE??"; 264 } 265 266 return strs[rdma_control]; 267 } 268 269 static uint64_t htonll(uint64_t v) 270 { 271 union { uint32_t lv[2]; uint64_t llv; } u; 272 u.lv[0] = htonl(v >> 32); 273 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 274 return u.llv; 275 } 276 277 static uint64_t ntohll(uint64_t v) 278 { 279 union { uint32_t lv[2]; uint64_t llv; } u; 280 u.llv = v; 281 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 282 } 283 284 static void dest_block_to_network(RDMADestBlock *db) 285 { 286 db->remote_host_addr = htonll(db->remote_host_addr); 287 db->offset = htonll(db->offset); 288 db->length = htonll(db->length); 289 db->remote_rkey = htonl(db->remote_rkey); 290 } 291 292 static void network_to_dest_block(RDMADestBlock *db) 293 { 294 db->remote_host_addr = ntohll(db->remote_host_addr); 295 db->offset = ntohll(db->offset); 296 db->length = ntohll(db->length); 297 db->remote_rkey = ntohl(db->remote_rkey); 298 } 299 300 /* 301 * Virtual address of the above structures used for transmitting 302 * the RAMBlock descriptions at connection-time. 303 * This structure is *not* transmitted. 304 */ 305 typedef struct RDMALocalBlocks { 306 int nb_blocks; 307 bool init; /* main memory init complete */ 308 RDMALocalBlock *block; 309 } RDMALocalBlocks; 310 311 /* 312 * Main data structure for RDMA state. 313 * While there is only one copy of this structure being allocated right now, 314 * this is the place where one would start if you wanted to consider 315 * having more than one RDMA connection open at the same time. 316 */ 317 typedef struct RDMAContext { 318 char *host; 319 int port; 320 char *host_port; 321 322 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 323 324 /* 325 * This is used by *_exchange_send() to figure out whether or not 326 * the initial "READY" message has already been received or not. 327 * This is because other functions may potentially poll() and detect 328 * the READY message before send() does, in which case we need to 329 * know if it completed. 330 */ 331 int control_ready_expected; 332 333 /* number of outstanding writes */ 334 int nb_sent; 335 336 /* store info about current buffer so that we can 337 merge it with future sends */ 338 uint64_t current_addr; 339 uint64_t current_length; 340 /* index of ram block the current buffer belongs to */ 341 int current_index; 342 /* index of the chunk in the current ram block */ 343 int current_chunk; 344 345 bool pin_all; 346 347 /* 348 * infiniband-specific variables for opening the device 349 * and maintaining connection state and so forth. 350 * 351 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 352 * cm_id->verbs, cm_id->channel, and cm_id->qp. 353 */ 354 struct rdma_cm_id *cm_id; /* connection manager ID */ 355 struct rdma_cm_id *listen_id; 356 bool connected; 357 358 struct ibv_context *verbs; 359 struct rdma_event_channel *channel; 360 struct ibv_qp *qp; /* queue pair */ 361 struct ibv_comp_channel *recv_comp_channel; /* recv completion channel */ 362 struct ibv_comp_channel *send_comp_channel; /* send completion channel */ 363 struct ibv_pd *pd; /* protection domain */ 364 struct ibv_cq *recv_cq; /* recvieve completion queue */ 365 struct ibv_cq *send_cq; /* send completion queue */ 366 367 /* 368 * If a previous write failed (perhaps because of a failed 369 * memory registration, then do not attempt any future work 370 * and remember the error state. 371 */ 372 int error_state; 373 int error_reported; 374 int received_error; 375 376 /* 377 * Description of ram blocks used throughout the code. 378 */ 379 RDMALocalBlocks local_ram_blocks; 380 RDMADestBlock *dest_blocks; 381 382 /* Index of the next RAMBlock received during block registration */ 383 unsigned int next_src_index; 384 385 /* 386 * Migration on *destination* started. 387 * Then use coroutine yield function. 388 * Source runs in a thread, so we don't care. 389 */ 390 int migration_started_on_destination; 391 392 int total_registrations; 393 int total_writes; 394 395 int unregister_current, unregister_next; 396 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 397 398 GHashTable *blockmap; 399 400 /* the RDMAContext for return path */ 401 struct RDMAContext *return_path; 402 bool is_return_path; 403 } RDMAContext; 404 405 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" 406 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA) 407 408 409 410 struct QIOChannelRDMA { 411 QIOChannel parent; 412 RDMAContext *rdmain; 413 RDMAContext *rdmaout; 414 QEMUFile *file; 415 bool blocking; /* XXX we don't actually honour this yet */ 416 }; 417 418 /* 419 * Main structure for IB Send/Recv control messages. 420 * This gets prepended at the beginning of every Send/Recv. 421 */ 422 typedef struct QEMU_PACKED { 423 uint32_t len; /* Total length of data portion */ 424 uint32_t type; /* which control command to perform */ 425 uint32_t repeat; /* number of commands in data portion of same type */ 426 uint32_t padding; 427 } RDMAControlHeader; 428 429 static void control_to_network(RDMAControlHeader *control) 430 { 431 control->type = htonl(control->type); 432 control->len = htonl(control->len); 433 control->repeat = htonl(control->repeat); 434 } 435 436 static void network_to_control(RDMAControlHeader *control) 437 { 438 control->type = ntohl(control->type); 439 control->len = ntohl(control->len); 440 control->repeat = ntohl(control->repeat); 441 } 442 443 /* 444 * Register a single Chunk. 445 * Information sent by the source VM to inform the dest 446 * to register an single chunk of memory before we can perform 447 * the actual RDMA operation. 448 */ 449 typedef struct QEMU_PACKED { 450 union QEMU_PACKED { 451 uint64_t current_addr; /* offset into the ram_addr_t space */ 452 uint64_t chunk; /* chunk to lookup if unregistering */ 453 } key; 454 uint32_t current_index; /* which ramblock the chunk belongs to */ 455 uint32_t padding; 456 uint64_t chunks; /* how many sequential chunks to register */ 457 } RDMARegister; 458 459 static void register_to_network(RDMAContext *rdma, RDMARegister *reg) 460 { 461 RDMALocalBlock *local_block; 462 local_block = &rdma->local_ram_blocks.block[reg->current_index]; 463 464 if (local_block->is_ram_block) { 465 /* 466 * current_addr as passed in is an address in the local ram_addr_t 467 * space, we need to translate this for the destination 468 */ 469 reg->key.current_addr -= local_block->offset; 470 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset; 471 } 472 reg->key.current_addr = htonll(reg->key.current_addr); 473 reg->current_index = htonl(reg->current_index); 474 reg->chunks = htonll(reg->chunks); 475 } 476 477 static void network_to_register(RDMARegister *reg) 478 { 479 reg->key.current_addr = ntohll(reg->key.current_addr); 480 reg->current_index = ntohl(reg->current_index); 481 reg->chunks = ntohll(reg->chunks); 482 } 483 484 typedef struct QEMU_PACKED { 485 uint32_t value; /* if zero, we will madvise() */ 486 uint32_t block_idx; /* which ram block index */ 487 uint64_t offset; /* Address in remote ram_addr_t space */ 488 uint64_t length; /* length of the chunk */ 489 } RDMACompress; 490 491 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp) 492 { 493 comp->value = htonl(comp->value); 494 /* 495 * comp->offset as passed in is an address in the local ram_addr_t 496 * space, we need to translate this for the destination 497 */ 498 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset; 499 comp->offset += rdma->dest_blocks[comp->block_idx].offset; 500 comp->block_idx = htonl(comp->block_idx); 501 comp->offset = htonll(comp->offset); 502 comp->length = htonll(comp->length); 503 } 504 505 static void network_to_compress(RDMACompress *comp) 506 { 507 comp->value = ntohl(comp->value); 508 comp->block_idx = ntohl(comp->block_idx); 509 comp->offset = ntohll(comp->offset); 510 comp->length = ntohll(comp->length); 511 } 512 513 /* 514 * The result of the dest's memory registration produces an "rkey" 515 * which the source VM must reference in order to perform 516 * the RDMA operation. 517 */ 518 typedef struct QEMU_PACKED { 519 uint32_t rkey; 520 uint32_t padding; 521 uint64_t host_addr; 522 } RDMARegisterResult; 523 524 static void result_to_network(RDMARegisterResult *result) 525 { 526 result->rkey = htonl(result->rkey); 527 result->host_addr = htonll(result->host_addr); 528 }; 529 530 static void network_to_result(RDMARegisterResult *result) 531 { 532 result->rkey = ntohl(result->rkey); 533 result->host_addr = ntohll(result->host_addr); 534 }; 535 536 const char *print_wrid(int wrid); 537 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 538 uint8_t *data, RDMAControlHeader *resp, 539 int *resp_idx, 540 int (*callback)(RDMAContext *rdma)); 541 542 static inline uint64_t ram_chunk_index(const uint8_t *start, 543 const uint8_t *host) 544 { 545 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 546 } 547 548 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 549 uint64_t i) 550 { 551 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 552 (i << RDMA_REG_CHUNK_SHIFT)); 553 } 554 555 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 556 uint64_t i) 557 { 558 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 559 (1UL << RDMA_REG_CHUNK_SHIFT); 560 561 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 562 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 563 } 564 565 return result; 566 } 567 568 static int rdma_add_block(RDMAContext *rdma, const char *block_name, 569 void *host_addr, 570 ram_addr_t block_offset, uint64_t length) 571 { 572 RDMALocalBlocks *local = &rdma->local_ram_blocks; 573 RDMALocalBlock *block; 574 RDMALocalBlock *old = local->block; 575 576 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1); 577 578 if (local->nb_blocks) { 579 int x; 580 581 if (rdma->blockmap) { 582 for (x = 0; x < local->nb_blocks; x++) { 583 g_hash_table_remove(rdma->blockmap, 584 (void *)(uintptr_t)old[x].offset); 585 g_hash_table_insert(rdma->blockmap, 586 (void *)(uintptr_t)old[x].offset, 587 &local->block[x]); 588 } 589 } 590 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 591 g_free(old); 592 } 593 594 block = &local->block[local->nb_blocks]; 595 596 block->block_name = g_strdup(block_name); 597 block->local_host_addr = host_addr; 598 block->offset = block_offset; 599 block->length = length; 600 block->index = local->nb_blocks; 601 block->src_index = ~0U; /* Filled in by the receipt of the block list */ 602 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 603 block->transit_bitmap = bitmap_new(block->nb_chunks); 604 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 605 block->unregister_bitmap = bitmap_new(block->nb_chunks); 606 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 607 block->remote_keys = g_new0(uint32_t, block->nb_chunks); 608 609 block->is_ram_block = local->init ? false : true; 610 611 if (rdma->blockmap) { 612 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block); 613 } 614 615 trace_rdma_add_block(block_name, local->nb_blocks, 616 (uintptr_t) block->local_host_addr, 617 block->offset, block->length, 618 (uintptr_t) (block->local_host_addr + block->length), 619 BITS_TO_LONGS(block->nb_chunks) * 620 sizeof(unsigned long) * 8, 621 block->nb_chunks); 622 623 local->nb_blocks++; 624 625 return 0; 626 } 627 628 /* 629 * Memory regions need to be registered with the device and queue pairs setup 630 * in advanced before the migration starts. This tells us where the RAM blocks 631 * are so that we can register them individually. 632 */ 633 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque) 634 { 635 const char *block_name = qemu_ram_get_idstr(rb); 636 void *host_addr = qemu_ram_get_host_addr(rb); 637 ram_addr_t block_offset = qemu_ram_get_offset(rb); 638 ram_addr_t length = qemu_ram_get_used_length(rb); 639 return rdma_add_block(opaque, block_name, host_addr, block_offset, length); 640 } 641 642 /* 643 * Identify the RAMBlocks and their quantity. They will be references to 644 * identify chunk boundaries inside each RAMBlock and also be referenced 645 * during dynamic page registration. 646 */ 647 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma) 648 { 649 RDMALocalBlocks *local = &rdma->local_ram_blocks; 650 int ret; 651 652 assert(rdma->blockmap == NULL); 653 memset(local, 0, sizeof *local); 654 ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma); 655 if (ret) { 656 return ret; 657 } 658 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 659 rdma->dest_blocks = g_new0(RDMADestBlock, 660 rdma->local_ram_blocks.nb_blocks); 661 local->init = true; 662 return 0; 663 } 664 665 /* 666 * Note: If used outside of cleanup, the caller must ensure that the destination 667 * block structures are also updated 668 */ 669 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) 670 { 671 RDMALocalBlocks *local = &rdma->local_ram_blocks; 672 RDMALocalBlock *old = local->block; 673 int x; 674 675 if (rdma->blockmap) { 676 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset); 677 } 678 if (block->pmr) { 679 int j; 680 681 for (j = 0; j < block->nb_chunks; j++) { 682 if (!block->pmr[j]) { 683 continue; 684 } 685 ibv_dereg_mr(block->pmr[j]); 686 rdma->total_registrations--; 687 } 688 g_free(block->pmr); 689 block->pmr = NULL; 690 } 691 692 if (block->mr) { 693 ibv_dereg_mr(block->mr); 694 rdma->total_registrations--; 695 block->mr = NULL; 696 } 697 698 g_free(block->transit_bitmap); 699 block->transit_bitmap = NULL; 700 701 g_free(block->unregister_bitmap); 702 block->unregister_bitmap = NULL; 703 704 g_free(block->remote_keys); 705 block->remote_keys = NULL; 706 707 g_free(block->block_name); 708 block->block_name = NULL; 709 710 if (rdma->blockmap) { 711 for (x = 0; x < local->nb_blocks; x++) { 712 g_hash_table_remove(rdma->blockmap, 713 (void *)(uintptr_t)old[x].offset); 714 } 715 } 716 717 if (local->nb_blocks > 1) { 718 719 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1); 720 721 if (block->index) { 722 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 723 } 724 725 if (block->index < (local->nb_blocks - 1)) { 726 memcpy(local->block + block->index, old + (block->index + 1), 727 sizeof(RDMALocalBlock) * 728 (local->nb_blocks - (block->index + 1))); 729 for (x = block->index; x < local->nb_blocks - 1; x++) { 730 local->block[x].index--; 731 } 732 } 733 } else { 734 assert(block == local->block); 735 local->block = NULL; 736 } 737 738 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr, 739 block->offset, block->length, 740 (uintptr_t)(block->local_host_addr + block->length), 741 BITS_TO_LONGS(block->nb_chunks) * 742 sizeof(unsigned long) * 8, block->nb_chunks); 743 744 g_free(old); 745 746 local->nb_blocks--; 747 748 if (local->nb_blocks && rdma->blockmap) { 749 for (x = 0; x < local->nb_blocks; x++) { 750 g_hash_table_insert(rdma->blockmap, 751 (void *)(uintptr_t)local->block[x].offset, 752 &local->block[x]); 753 } 754 } 755 756 return 0; 757 } 758 759 /* 760 * Put in the log file which RDMA device was opened and the details 761 * associated with that device. 762 */ 763 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 764 { 765 struct ibv_port_attr port; 766 767 if (ibv_query_port(verbs, 1, &port)) { 768 error_report("Failed to query port information"); 769 return; 770 } 771 772 printf("%s RDMA Device opened: kernel name %s " 773 "uverbs device name %s, " 774 "infiniband_verbs class device path %s, " 775 "infiniband class device path %s, " 776 "transport: (%d) %s\n", 777 who, 778 verbs->device->name, 779 verbs->device->dev_name, 780 verbs->device->dev_path, 781 verbs->device->ibdev_path, 782 port.link_layer, 783 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" : 784 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 785 ? "Ethernet" : "Unknown")); 786 } 787 788 /* 789 * Put in the log file the RDMA gid addressing information, 790 * useful for folks who have trouble understanding the 791 * RDMA device hierarchy in the kernel. 792 */ 793 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 794 { 795 char sgid[33]; 796 char dgid[33]; 797 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 798 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 799 trace_qemu_rdma_dump_gid(who, sgid, dgid); 800 } 801 802 /* 803 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 804 * We will try the next addrinfo struct, and fail if there are 805 * no other valid addresses to bind against. 806 * 807 * If user is listening on '[::]', then we will not have a opened a device 808 * yet and have no way of verifying if the device is RoCE or not. 809 * 810 * In this case, the source VM will throw an error for ALL types of 811 * connections (both IPv4 and IPv6) if the destination machine does not have 812 * a regular infiniband network available for use. 813 * 814 * The only way to guarantee that an error is thrown for broken kernels is 815 * for the management software to choose a *specific* interface at bind time 816 * and validate what time of hardware it is. 817 * 818 * Unfortunately, this puts the user in a fix: 819 * 820 * If the source VM connects with an IPv4 address without knowing that the 821 * destination has bound to '[::]' the migration will unconditionally fail 822 * unless the management software is explicitly listening on the IPv4 823 * address while using a RoCE-based device. 824 * 825 * If the source VM connects with an IPv6 address, then we're OK because we can 826 * throw an error on the source (and similarly on the destination). 827 * 828 * But in mixed environments, this will be broken for a while until it is fixed 829 * inside linux. 830 * 831 * We do provide a *tiny* bit of help in this function: We can list all of the 832 * devices in the system and check to see if all the devices are RoCE or 833 * Infiniband. 834 * 835 * If we detect that we have a *pure* RoCE environment, then we can safely 836 * thrown an error even if the management software has specified '[::]' as the 837 * bind address. 838 * 839 * However, if there is are multiple hetergeneous devices, then we cannot make 840 * this assumption and the user just has to be sure they know what they are 841 * doing. 842 * 843 * Patches are being reviewed on linux-rdma. 844 */ 845 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp) 846 { 847 /* This bug only exists in linux, to our knowledge. */ 848 #ifdef CONFIG_LINUX 849 struct ibv_port_attr port_attr; 850 851 /* 852 * Verbs are only NULL if management has bound to '[::]'. 853 * 854 * Let's iterate through all the devices and see if there any pure IB 855 * devices (non-ethernet). 856 * 857 * If not, then we can safely proceed with the migration. 858 * Otherwise, there are no guarantees until the bug is fixed in linux. 859 */ 860 if (!verbs) { 861 int num_devices, x; 862 struct ibv_device **dev_list = ibv_get_device_list(&num_devices); 863 bool roce_found = false; 864 bool ib_found = false; 865 866 for (x = 0; x < num_devices; x++) { 867 verbs = ibv_open_device(dev_list[x]); 868 if (!verbs) { 869 if (errno == EPERM) { 870 continue; 871 } else { 872 return -EINVAL; 873 } 874 } 875 876 if (ibv_query_port(verbs, 1, &port_attr)) { 877 ibv_close_device(verbs); 878 ERROR(errp, "Could not query initial IB port"); 879 return -EINVAL; 880 } 881 882 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 883 ib_found = true; 884 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 885 roce_found = true; 886 } 887 888 ibv_close_device(verbs); 889 890 } 891 892 if (roce_found) { 893 if (ib_found) { 894 fprintf(stderr, "WARN: migrations may fail:" 895 " IPv6 over RoCE / iWARP in linux" 896 " is broken. But since you appear to have a" 897 " mixed RoCE / IB environment, be sure to only" 898 " migrate over the IB fabric until the kernel " 899 " fixes the bug.\n"); 900 } else { 901 ERROR(errp, "You only have RoCE / iWARP devices in your systems" 902 " and your management software has specified '[::]'" 903 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 904 return -ENONET; 905 } 906 } 907 908 return 0; 909 } 910 911 /* 912 * If we have a verbs context, that means that some other than '[::]' was 913 * used by the management software for binding. In which case we can 914 * actually warn the user about a potentially broken kernel. 915 */ 916 917 /* IB ports start with 1, not 0 */ 918 if (ibv_query_port(verbs, 1, &port_attr)) { 919 ERROR(errp, "Could not query initial IB port"); 920 return -EINVAL; 921 } 922 923 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 924 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 " 925 "(but patches on linux-rdma in progress)"); 926 return -ENONET; 927 } 928 929 #endif 930 931 return 0; 932 } 933 934 /* 935 * Figure out which RDMA device corresponds to the requested IP hostname 936 * Also create the initial connection manager identifiers for opening 937 * the connection. 938 */ 939 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 940 { 941 int ret; 942 struct rdma_addrinfo *res; 943 char port_str[16]; 944 struct rdma_cm_event *cm_event; 945 char ip[40] = "unknown"; 946 struct rdma_addrinfo *e; 947 948 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 949 ERROR(errp, "RDMA hostname has not been set"); 950 return -EINVAL; 951 } 952 953 /* create CM channel */ 954 rdma->channel = rdma_create_event_channel(); 955 if (!rdma->channel) { 956 ERROR(errp, "could not create CM channel"); 957 return -EINVAL; 958 } 959 960 /* create CM id */ 961 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 962 if (ret) { 963 ERROR(errp, "could not create channel id"); 964 goto err_resolve_create_id; 965 } 966 967 snprintf(port_str, 16, "%d", rdma->port); 968 port_str[15] = '\0'; 969 970 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 971 if (ret < 0) { 972 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 973 goto err_resolve_get_addr; 974 } 975 976 for (e = res; e != NULL; e = e->ai_next) { 977 inet_ntop(e->ai_family, 978 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 979 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 980 981 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 982 RDMA_RESOLVE_TIMEOUT_MS); 983 if (!ret) { 984 if (e->ai_family == AF_INET6) { 985 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp); 986 if (ret) { 987 continue; 988 } 989 } 990 goto route; 991 } 992 } 993 994 rdma_freeaddrinfo(res); 995 ERROR(errp, "could not resolve address %s", rdma->host); 996 goto err_resolve_get_addr; 997 998 route: 999 rdma_freeaddrinfo(res); 1000 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 1001 1002 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1003 if (ret) { 1004 ERROR(errp, "could not perform event_addr_resolved"); 1005 goto err_resolve_get_addr; 1006 } 1007 1008 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 1009 ERROR(errp, "result not equal to event_addr_resolved %s", 1010 rdma_event_str(cm_event->event)); 1011 error_report("rdma_resolve_addr"); 1012 rdma_ack_cm_event(cm_event); 1013 ret = -EINVAL; 1014 goto err_resolve_get_addr; 1015 } 1016 rdma_ack_cm_event(cm_event); 1017 1018 /* resolve route */ 1019 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 1020 if (ret) { 1021 ERROR(errp, "could not resolve rdma route"); 1022 goto err_resolve_get_addr; 1023 } 1024 1025 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1026 if (ret) { 1027 ERROR(errp, "could not perform event_route_resolved"); 1028 goto err_resolve_get_addr; 1029 } 1030 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 1031 ERROR(errp, "result not equal to event_route_resolved: %s", 1032 rdma_event_str(cm_event->event)); 1033 rdma_ack_cm_event(cm_event); 1034 ret = -EINVAL; 1035 goto err_resolve_get_addr; 1036 } 1037 rdma_ack_cm_event(cm_event); 1038 rdma->verbs = rdma->cm_id->verbs; 1039 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 1040 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 1041 return 0; 1042 1043 err_resolve_get_addr: 1044 rdma_destroy_id(rdma->cm_id); 1045 rdma->cm_id = NULL; 1046 err_resolve_create_id: 1047 rdma_destroy_event_channel(rdma->channel); 1048 rdma->channel = NULL; 1049 return ret; 1050 } 1051 1052 /* 1053 * Create protection domain and completion queues 1054 */ 1055 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma) 1056 { 1057 /* allocate pd */ 1058 rdma->pd = ibv_alloc_pd(rdma->verbs); 1059 if (!rdma->pd) { 1060 error_report("failed to allocate protection domain"); 1061 return -1; 1062 } 1063 1064 /* create receive completion channel */ 1065 rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs); 1066 if (!rdma->recv_comp_channel) { 1067 error_report("failed to allocate receive completion channel"); 1068 goto err_alloc_pd_cq; 1069 } 1070 1071 /* 1072 * Completion queue can be filled by read work requests. 1073 */ 1074 rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1075 NULL, rdma->recv_comp_channel, 0); 1076 if (!rdma->recv_cq) { 1077 error_report("failed to allocate receive completion queue"); 1078 goto err_alloc_pd_cq; 1079 } 1080 1081 /* create send completion channel */ 1082 rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs); 1083 if (!rdma->send_comp_channel) { 1084 error_report("failed to allocate send completion channel"); 1085 goto err_alloc_pd_cq; 1086 } 1087 1088 rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1089 NULL, rdma->send_comp_channel, 0); 1090 if (!rdma->send_cq) { 1091 error_report("failed to allocate send completion queue"); 1092 goto err_alloc_pd_cq; 1093 } 1094 1095 return 0; 1096 1097 err_alloc_pd_cq: 1098 if (rdma->pd) { 1099 ibv_dealloc_pd(rdma->pd); 1100 } 1101 if (rdma->recv_comp_channel) { 1102 ibv_destroy_comp_channel(rdma->recv_comp_channel); 1103 } 1104 if (rdma->send_comp_channel) { 1105 ibv_destroy_comp_channel(rdma->send_comp_channel); 1106 } 1107 if (rdma->recv_cq) { 1108 ibv_destroy_cq(rdma->recv_cq); 1109 rdma->recv_cq = NULL; 1110 } 1111 rdma->pd = NULL; 1112 rdma->recv_comp_channel = NULL; 1113 rdma->send_comp_channel = NULL; 1114 return -1; 1115 1116 } 1117 1118 /* 1119 * Create queue pairs. 1120 */ 1121 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1122 { 1123 struct ibv_qp_init_attr attr = { 0 }; 1124 int ret; 1125 1126 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1127 attr.cap.max_recv_wr = 3; 1128 attr.cap.max_send_sge = 1; 1129 attr.cap.max_recv_sge = 1; 1130 attr.send_cq = rdma->send_cq; 1131 attr.recv_cq = rdma->recv_cq; 1132 attr.qp_type = IBV_QPT_RC; 1133 1134 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr); 1135 if (ret) { 1136 return -1; 1137 } 1138 1139 rdma->qp = rdma->cm_id->qp; 1140 return 0; 1141 } 1142 1143 /* Check whether On-Demand Paging is supported by RDAM device */ 1144 static bool rdma_support_odp(struct ibv_context *dev) 1145 { 1146 struct ibv_device_attr_ex attr = {0}; 1147 int ret = ibv_query_device_ex(dev, NULL, &attr); 1148 if (ret) { 1149 return false; 1150 } 1151 1152 if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) { 1153 return true; 1154 } 1155 1156 return false; 1157 } 1158 1159 /* 1160 * ibv_advise_mr to avoid RNR NAK error as far as possible. 1161 * The responder mr registering with ODP will sent RNR NAK back to 1162 * the requester in the face of the page fault. 1163 */ 1164 static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr, 1165 uint32_t len, uint32_t lkey, 1166 const char *name, bool wr) 1167 { 1168 #ifdef HAVE_IBV_ADVISE_MR 1169 int ret; 1170 int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE : 1171 IBV_ADVISE_MR_ADVICE_PREFETCH; 1172 struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len}; 1173 1174 ret = ibv_advise_mr(pd, advice, 1175 IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1); 1176 /* ignore the error */ 1177 if (ret) { 1178 trace_qemu_rdma_advise_mr(name, len, addr, strerror(errno)); 1179 } else { 1180 trace_qemu_rdma_advise_mr(name, len, addr, "successed"); 1181 } 1182 #endif 1183 } 1184 1185 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma) 1186 { 1187 int i; 1188 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1189 1190 for (i = 0; i < local->nb_blocks; i++) { 1191 int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE; 1192 1193 local->block[i].mr = 1194 ibv_reg_mr(rdma->pd, 1195 local->block[i].local_host_addr, 1196 local->block[i].length, access 1197 ); 1198 1199 if (!local->block[i].mr && 1200 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) { 1201 access |= IBV_ACCESS_ON_DEMAND; 1202 /* register ODP mr */ 1203 local->block[i].mr = 1204 ibv_reg_mr(rdma->pd, 1205 local->block[i].local_host_addr, 1206 local->block[i].length, access); 1207 trace_qemu_rdma_register_odp_mr(local->block[i].block_name); 1208 1209 if (local->block[i].mr) { 1210 qemu_rdma_advise_prefetch_mr(rdma->pd, 1211 (uintptr_t)local->block[i].local_host_addr, 1212 local->block[i].length, 1213 local->block[i].mr->lkey, 1214 local->block[i].block_name, 1215 true); 1216 } 1217 } 1218 1219 if (!local->block[i].mr) { 1220 perror("Failed to register local dest ram block!"); 1221 break; 1222 } 1223 rdma->total_registrations++; 1224 } 1225 1226 if (i >= local->nb_blocks) { 1227 return 0; 1228 } 1229 1230 for (i--; i >= 0; i--) { 1231 ibv_dereg_mr(local->block[i].mr); 1232 local->block[i].mr = NULL; 1233 rdma->total_registrations--; 1234 } 1235 1236 return -1; 1237 1238 } 1239 1240 /* 1241 * Find the ram block that corresponds to the page requested to be 1242 * transmitted by QEMU. 1243 * 1244 * Once the block is found, also identify which 'chunk' within that 1245 * block that the page belongs to. 1246 * 1247 * This search cannot fail or the migration will fail. 1248 */ 1249 static int qemu_rdma_search_ram_block(RDMAContext *rdma, 1250 uintptr_t block_offset, 1251 uint64_t offset, 1252 uint64_t length, 1253 uint64_t *block_index, 1254 uint64_t *chunk_index) 1255 { 1256 uint64_t current_addr = block_offset + offset; 1257 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1258 (void *) block_offset); 1259 assert(block); 1260 assert(current_addr >= block->offset); 1261 assert((current_addr + length) <= (block->offset + block->length)); 1262 1263 *block_index = block->index; 1264 *chunk_index = ram_chunk_index(block->local_host_addr, 1265 block->local_host_addr + (current_addr - block->offset)); 1266 1267 return 0; 1268 } 1269 1270 /* 1271 * Register a chunk with IB. If the chunk was already registered 1272 * previously, then skip. 1273 * 1274 * Also return the keys associated with the registration needed 1275 * to perform the actual RDMA operation. 1276 */ 1277 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1278 RDMALocalBlock *block, uintptr_t host_addr, 1279 uint32_t *lkey, uint32_t *rkey, int chunk, 1280 uint8_t *chunk_start, uint8_t *chunk_end) 1281 { 1282 if (block->mr) { 1283 if (lkey) { 1284 *lkey = block->mr->lkey; 1285 } 1286 if (rkey) { 1287 *rkey = block->mr->rkey; 1288 } 1289 return 0; 1290 } 1291 1292 /* allocate memory to store chunk MRs */ 1293 if (!block->pmr) { 1294 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks); 1295 } 1296 1297 /* 1298 * If 'rkey', then we're the destination, so grant access to the source. 1299 * 1300 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1301 */ 1302 if (!block->pmr[chunk]) { 1303 uint64_t len = chunk_end - chunk_start; 1304 int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE : 1305 0; 1306 1307 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1308 1309 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access); 1310 if (!block->pmr[chunk] && 1311 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) { 1312 access |= IBV_ACCESS_ON_DEMAND; 1313 /* register ODP mr */ 1314 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access); 1315 trace_qemu_rdma_register_odp_mr(block->block_name); 1316 1317 if (block->pmr[chunk]) { 1318 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start, 1319 len, block->pmr[chunk]->lkey, 1320 block->block_name, rkey); 1321 1322 } 1323 } 1324 } 1325 if (!block->pmr[chunk]) { 1326 perror("Failed to register chunk!"); 1327 fprintf(stderr, "Chunk details: block: %d chunk index %d" 1328 " start %" PRIuPTR " end %" PRIuPTR 1329 " host %" PRIuPTR 1330 " local %" PRIuPTR " registrations: %d\n", 1331 block->index, chunk, (uintptr_t)chunk_start, 1332 (uintptr_t)chunk_end, host_addr, 1333 (uintptr_t)block->local_host_addr, 1334 rdma->total_registrations); 1335 return -1; 1336 } 1337 rdma->total_registrations++; 1338 1339 if (lkey) { 1340 *lkey = block->pmr[chunk]->lkey; 1341 } 1342 if (rkey) { 1343 *rkey = block->pmr[chunk]->rkey; 1344 } 1345 return 0; 1346 } 1347 1348 /* 1349 * Register (at connection time) the memory used for control 1350 * channel messages. 1351 */ 1352 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1353 { 1354 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1355 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1356 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1357 if (rdma->wr_data[idx].control_mr) { 1358 rdma->total_registrations++; 1359 return 0; 1360 } 1361 error_report("qemu_rdma_reg_control failed"); 1362 return -1; 1363 } 1364 1365 const char *print_wrid(int wrid) 1366 { 1367 if (wrid >= RDMA_WRID_RECV_CONTROL) { 1368 return wrid_desc[RDMA_WRID_RECV_CONTROL]; 1369 } 1370 return wrid_desc[wrid]; 1371 } 1372 1373 /* 1374 * RDMA requires memory registration (mlock/pinning), but this is not good for 1375 * overcommitment. 1376 * 1377 * In preparation for the future where LRU information or workload-specific 1378 * writable writable working set memory access behavior is available to QEMU 1379 * it would be nice to have in place the ability to UN-register/UN-pin 1380 * particular memory regions from the RDMA hardware when it is determine that 1381 * those regions of memory will likely not be accessed again in the near future. 1382 * 1383 * While we do not yet have such information right now, the following 1384 * compile-time option allows us to perform a non-optimized version of this 1385 * behavior. 1386 * 1387 * By uncommenting this option, you will cause *all* RDMA transfers to be 1388 * unregistered immediately after the transfer completes on both sides of the 1389 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode. 1390 * 1391 * This will have a terrible impact on migration performance, so until future 1392 * workload information or LRU information is available, do not attempt to use 1393 * this feature except for basic testing. 1394 */ 1395 /* #define RDMA_UNREGISTRATION_EXAMPLE */ 1396 1397 /* 1398 * Perform a non-optimized memory unregistration after every transfer 1399 * for demonstration purposes, only if pin-all is not requested. 1400 * 1401 * Potential optimizations: 1402 * 1. Start a new thread to run this function continuously 1403 - for bit clearing 1404 - and for receipt of unregister messages 1405 * 2. Use an LRU. 1406 * 3. Use workload hints. 1407 */ 1408 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1409 { 1410 while (rdma->unregistrations[rdma->unregister_current]) { 1411 int ret; 1412 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1413 uint64_t chunk = 1414 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1415 uint64_t index = 1416 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1417 RDMALocalBlock *block = 1418 &(rdma->local_ram_blocks.block[index]); 1419 RDMARegister reg = { .current_index = index }; 1420 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1421 }; 1422 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1423 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1424 .repeat = 1, 1425 }; 1426 1427 trace_qemu_rdma_unregister_waiting_proc(chunk, 1428 rdma->unregister_current); 1429 1430 rdma->unregistrations[rdma->unregister_current] = 0; 1431 rdma->unregister_current++; 1432 1433 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1434 rdma->unregister_current = 0; 1435 } 1436 1437 1438 /* 1439 * Unregistration is speculative (because migration is single-threaded 1440 * and we cannot break the protocol's inifinband message ordering). 1441 * Thus, if the memory is currently being used for transmission, 1442 * then abort the attempt to unregister and try again 1443 * later the next time a completion is received for this memory. 1444 */ 1445 clear_bit(chunk, block->unregister_bitmap); 1446 1447 if (test_bit(chunk, block->transit_bitmap)) { 1448 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1449 continue; 1450 } 1451 1452 trace_qemu_rdma_unregister_waiting_send(chunk); 1453 1454 ret = ibv_dereg_mr(block->pmr[chunk]); 1455 block->pmr[chunk] = NULL; 1456 block->remote_keys[chunk] = 0; 1457 1458 if (ret != 0) { 1459 perror("unregistration chunk failed"); 1460 return -ret; 1461 } 1462 rdma->total_registrations--; 1463 1464 reg.key.chunk = chunk; 1465 register_to_network(rdma, ®); 1466 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1467 &resp, NULL, NULL); 1468 if (ret < 0) { 1469 return ret; 1470 } 1471 1472 trace_qemu_rdma_unregister_waiting_complete(chunk); 1473 } 1474 1475 return 0; 1476 } 1477 1478 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1479 uint64_t chunk) 1480 { 1481 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1482 1483 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1484 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1485 1486 return result; 1487 } 1488 1489 /* 1490 * Set bit for unregistration in the next iteration. 1491 * We cannot transmit right here, but will unpin later. 1492 */ 1493 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index, 1494 uint64_t chunk, uint64_t wr_id) 1495 { 1496 if (rdma->unregistrations[rdma->unregister_next] != 0) { 1497 error_report("rdma migration: queue is full"); 1498 } else { 1499 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1500 1501 if (!test_and_set_bit(chunk, block->unregister_bitmap)) { 1502 trace_qemu_rdma_signal_unregister_append(chunk, 1503 rdma->unregister_next); 1504 1505 rdma->unregistrations[rdma->unregister_next++] = 1506 qemu_rdma_make_wrid(wr_id, index, chunk); 1507 1508 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) { 1509 rdma->unregister_next = 0; 1510 } 1511 } else { 1512 trace_qemu_rdma_signal_unregister_already(chunk); 1513 } 1514 } 1515 } 1516 1517 /* 1518 * Consult the connection manager to see a work request 1519 * (of any kind) has completed. 1520 * Return the work request ID that completed. 1521 */ 1522 static uint64_t qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq, 1523 uint64_t *wr_id_out, uint32_t *byte_len) 1524 { 1525 int ret; 1526 struct ibv_wc wc; 1527 uint64_t wr_id; 1528 1529 ret = ibv_poll_cq(cq, 1, &wc); 1530 1531 if (!ret) { 1532 *wr_id_out = RDMA_WRID_NONE; 1533 return 0; 1534 } 1535 1536 if (ret < 0) { 1537 error_report("ibv_poll_cq return %d", ret); 1538 return ret; 1539 } 1540 1541 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1542 1543 if (wc.status != IBV_WC_SUCCESS) { 1544 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n", 1545 wc.status, ibv_wc_status_str(wc.status)); 1546 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]); 1547 1548 return -1; 1549 } 1550 1551 if (rdma->control_ready_expected && 1552 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1553 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL], 1554 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent); 1555 rdma->control_ready_expected = 0; 1556 } 1557 1558 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1559 uint64_t chunk = 1560 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1561 uint64_t index = 1562 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1563 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1564 1565 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent, 1566 index, chunk, block->local_host_addr, 1567 (void *)(uintptr_t)block->remote_host_addr); 1568 1569 clear_bit(chunk, block->transit_bitmap); 1570 1571 if (rdma->nb_sent > 0) { 1572 rdma->nb_sent--; 1573 } 1574 1575 if (!rdma->pin_all) { 1576 /* 1577 * FYI: If one wanted to signal a specific chunk to be unregistered 1578 * using LRU or workload-specific information, this is the function 1579 * you would call to do so. That chunk would then get asynchronously 1580 * unregistered later. 1581 */ 1582 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1583 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id); 1584 #endif 1585 } 1586 } else { 1587 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent); 1588 } 1589 1590 *wr_id_out = wc.wr_id; 1591 if (byte_len) { 1592 *byte_len = wc.byte_len; 1593 } 1594 1595 return 0; 1596 } 1597 1598 /* Wait for activity on the completion channel. 1599 * Returns 0 on success, none-0 on error. 1600 */ 1601 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma, 1602 struct ibv_comp_channel *comp_channel) 1603 { 1604 struct rdma_cm_event *cm_event; 1605 int ret = -1; 1606 1607 /* 1608 * Coroutine doesn't start until migration_fd_process_incoming() 1609 * so don't yield unless we know we're running inside of a coroutine. 1610 */ 1611 if (rdma->migration_started_on_destination && 1612 migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) { 1613 yield_until_fd_readable(comp_channel->fd); 1614 } else { 1615 /* This is the source side, we're in a separate thread 1616 * or destination prior to migration_fd_process_incoming() 1617 * after postcopy, the destination also in a separate thread. 1618 * we can't yield; so we have to poll the fd. 1619 * But we need to be able to handle 'cancel' or an error 1620 * without hanging forever. 1621 */ 1622 while (!rdma->error_state && !rdma->received_error) { 1623 GPollFD pfds[2]; 1624 pfds[0].fd = comp_channel->fd; 1625 pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1626 pfds[0].revents = 0; 1627 1628 pfds[1].fd = rdma->channel->fd; 1629 pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1630 pfds[1].revents = 0; 1631 1632 /* 0.1s timeout, should be fine for a 'cancel' */ 1633 switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) { 1634 case 2: 1635 case 1: /* fd active */ 1636 if (pfds[0].revents) { 1637 return 0; 1638 } 1639 1640 if (pfds[1].revents) { 1641 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1642 if (ret) { 1643 error_report("failed to get cm event while wait " 1644 "completion channel"); 1645 return -EPIPE; 1646 } 1647 1648 error_report("receive cm event while wait comp channel," 1649 "cm event is %d", cm_event->event); 1650 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 1651 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 1652 rdma_ack_cm_event(cm_event); 1653 return -EPIPE; 1654 } 1655 rdma_ack_cm_event(cm_event); 1656 } 1657 break; 1658 1659 case 0: /* Timeout, go around again */ 1660 break; 1661 1662 default: /* Error of some type - 1663 * I don't trust errno from qemu_poll_ns 1664 */ 1665 error_report("%s: poll failed", __func__); 1666 return -EPIPE; 1667 } 1668 1669 if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) { 1670 /* Bail out and let the cancellation happen */ 1671 return -EPIPE; 1672 } 1673 } 1674 } 1675 1676 if (rdma->received_error) { 1677 return -EPIPE; 1678 } 1679 return rdma->error_state; 1680 } 1681 1682 static struct ibv_comp_channel *to_channel(RDMAContext *rdma, int wrid) 1683 { 1684 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel : 1685 rdma->recv_comp_channel; 1686 } 1687 1688 static struct ibv_cq *to_cq(RDMAContext *rdma, int wrid) 1689 { 1690 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq; 1691 } 1692 1693 /* 1694 * Block until the next work request has completed. 1695 * 1696 * First poll to see if a work request has already completed, 1697 * otherwise block. 1698 * 1699 * If we encounter completed work requests for IDs other than 1700 * the one we're interested in, then that's generally an error. 1701 * 1702 * The only exception is actual RDMA Write completions. These 1703 * completions only need to be recorded, but do not actually 1704 * need further processing. 1705 */ 1706 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested, 1707 uint32_t *byte_len) 1708 { 1709 int num_cq_events = 0, ret = 0; 1710 struct ibv_cq *cq; 1711 void *cq_ctx; 1712 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1713 struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested); 1714 struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested); 1715 1716 if (ibv_req_notify_cq(poll_cq, 0)) { 1717 return -1; 1718 } 1719 /* poll cq first */ 1720 while (wr_id != wrid_requested) { 1721 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len); 1722 if (ret < 0) { 1723 return ret; 1724 } 1725 1726 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1727 1728 if (wr_id == RDMA_WRID_NONE) { 1729 break; 1730 } 1731 if (wr_id != wrid_requested) { 1732 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1733 wrid_requested, print_wrid(wr_id), wr_id); 1734 } 1735 } 1736 1737 if (wr_id == wrid_requested) { 1738 return 0; 1739 } 1740 1741 while (1) { 1742 ret = qemu_rdma_wait_comp_channel(rdma, ch); 1743 if (ret) { 1744 goto err_block_for_wrid; 1745 } 1746 1747 ret = ibv_get_cq_event(ch, &cq, &cq_ctx); 1748 if (ret) { 1749 perror("ibv_get_cq_event"); 1750 goto err_block_for_wrid; 1751 } 1752 1753 num_cq_events++; 1754 1755 ret = -ibv_req_notify_cq(cq, 0); 1756 if (ret) { 1757 goto err_block_for_wrid; 1758 } 1759 1760 while (wr_id != wrid_requested) { 1761 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len); 1762 if (ret < 0) { 1763 goto err_block_for_wrid; 1764 } 1765 1766 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1767 1768 if (wr_id == RDMA_WRID_NONE) { 1769 break; 1770 } 1771 if (wr_id != wrid_requested) { 1772 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1773 wrid_requested, print_wrid(wr_id), wr_id); 1774 } 1775 } 1776 1777 if (wr_id == wrid_requested) { 1778 goto success_block_for_wrid; 1779 } 1780 } 1781 1782 success_block_for_wrid: 1783 if (num_cq_events) { 1784 ibv_ack_cq_events(cq, num_cq_events); 1785 } 1786 return 0; 1787 1788 err_block_for_wrid: 1789 if (num_cq_events) { 1790 ibv_ack_cq_events(cq, num_cq_events); 1791 } 1792 1793 rdma->error_state = ret; 1794 return ret; 1795 } 1796 1797 /* 1798 * Post a SEND message work request for the control channel 1799 * containing some data and block until the post completes. 1800 */ 1801 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1802 RDMAControlHeader *head) 1803 { 1804 int ret = 0; 1805 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1806 struct ibv_send_wr *bad_wr; 1807 struct ibv_sge sge = { 1808 .addr = (uintptr_t)(wr->control), 1809 .length = head->len + sizeof(RDMAControlHeader), 1810 .lkey = wr->control_mr->lkey, 1811 }; 1812 struct ibv_send_wr send_wr = { 1813 .wr_id = RDMA_WRID_SEND_CONTROL, 1814 .opcode = IBV_WR_SEND, 1815 .send_flags = IBV_SEND_SIGNALED, 1816 .sg_list = &sge, 1817 .num_sge = 1, 1818 }; 1819 1820 trace_qemu_rdma_post_send_control(control_desc(head->type)); 1821 1822 /* 1823 * We don't actually need to do a memcpy() in here if we used 1824 * the "sge" properly, but since we're only sending control messages 1825 * (not RAM in a performance-critical path), then its OK for now. 1826 * 1827 * The copy makes the RDMAControlHeader simpler to manipulate 1828 * for the time being. 1829 */ 1830 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1831 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1832 control_to_network((void *) wr->control); 1833 1834 if (buf) { 1835 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1836 } 1837 1838 1839 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1840 1841 if (ret > 0) { 1842 error_report("Failed to use post IB SEND for control"); 1843 return -ret; 1844 } 1845 1846 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1847 if (ret < 0) { 1848 error_report("rdma migration: send polling control error"); 1849 } 1850 1851 return ret; 1852 } 1853 1854 /* 1855 * Post a RECV work request in anticipation of some future receipt 1856 * of data on the control channel. 1857 */ 1858 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx) 1859 { 1860 struct ibv_recv_wr *bad_wr; 1861 struct ibv_sge sge = { 1862 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1863 .length = RDMA_CONTROL_MAX_BUFFER, 1864 .lkey = rdma->wr_data[idx].control_mr->lkey, 1865 }; 1866 1867 struct ibv_recv_wr recv_wr = { 1868 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1869 .sg_list = &sge, 1870 .num_sge = 1, 1871 }; 1872 1873 1874 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1875 return -1; 1876 } 1877 1878 return 0; 1879 } 1880 1881 /* 1882 * Block and wait for a RECV control channel message to arrive. 1883 */ 1884 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1885 RDMAControlHeader *head, int expecting, int idx) 1886 { 1887 uint32_t byte_len; 1888 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1889 &byte_len); 1890 1891 if (ret < 0) { 1892 error_report("rdma migration: recv polling control error!"); 1893 return ret; 1894 } 1895 1896 network_to_control((void *) rdma->wr_data[idx].control); 1897 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1898 1899 trace_qemu_rdma_exchange_get_response_start(control_desc(expecting)); 1900 1901 if (expecting == RDMA_CONTROL_NONE) { 1902 trace_qemu_rdma_exchange_get_response_none(control_desc(head->type), 1903 head->type); 1904 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1905 error_report("Was expecting a %s (%d) control message" 1906 ", but got: %s (%d), length: %d", 1907 control_desc(expecting), expecting, 1908 control_desc(head->type), head->type, head->len); 1909 if (head->type == RDMA_CONTROL_ERROR) { 1910 rdma->received_error = true; 1911 } 1912 return -EIO; 1913 } 1914 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1915 error_report("too long length: %d", head->len); 1916 return -EINVAL; 1917 } 1918 if (sizeof(*head) + head->len != byte_len) { 1919 error_report("Malformed length: %d byte_len %d", head->len, byte_len); 1920 return -EINVAL; 1921 } 1922 1923 return 0; 1924 } 1925 1926 /* 1927 * When a RECV work request has completed, the work request's 1928 * buffer is pointed at the header. 1929 * 1930 * This will advance the pointer to the data portion 1931 * of the control message of the work request's buffer that 1932 * was populated after the work request finished. 1933 */ 1934 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1935 RDMAControlHeader *head) 1936 { 1937 rdma->wr_data[idx].control_len = head->len; 1938 rdma->wr_data[idx].control_curr = 1939 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1940 } 1941 1942 /* 1943 * This is an 'atomic' high-level operation to deliver a single, unified 1944 * control-channel message. 1945 * 1946 * Additionally, if the user is expecting some kind of reply to this message, 1947 * they can request a 'resp' response message be filled in by posting an 1948 * additional work request on behalf of the user and waiting for an additional 1949 * completion. 1950 * 1951 * The extra (optional) response is used during registration to us from having 1952 * to perform an *additional* exchange of message just to provide a response by 1953 * instead piggy-backing on the acknowledgement. 1954 */ 1955 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1956 uint8_t *data, RDMAControlHeader *resp, 1957 int *resp_idx, 1958 int (*callback)(RDMAContext *rdma)) 1959 { 1960 int ret = 0; 1961 1962 /* 1963 * Wait until the dest is ready before attempting to deliver the message 1964 * by waiting for a READY message. 1965 */ 1966 if (rdma->control_ready_expected) { 1967 RDMAControlHeader resp; 1968 ret = qemu_rdma_exchange_get_response(rdma, 1969 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY); 1970 if (ret < 0) { 1971 return ret; 1972 } 1973 } 1974 1975 /* 1976 * If the user is expecting a response, post a WR in anticipation of it. 1977 */ 1978 if (resp) { 1979 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA); 1980 if (ret) { 1981 error_report("rdma migration: error posting" 1982 " extra control recv for anticipated result!"); 1983 return ret; 1984 } 1985 } 1986 1987 /* 1988 * Post a WR to replace the one we just consumed for the READY message. 1989 */ 1990 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1991 if (ret) { 1992 error_report("rdma migration: error posting first control recv!"); 1993 return ret; 1994 } 1995 1996 /* 1997 * Deliver the control message that was requested. 1998 */ 1999 ret = qemu_rdma_post_send_control(rdma, data, head); 2000 2001 if (ret < 0) { 2002 error_report("Failed to send control buffer!"); 2003 return ret; 2004 } 2005 2006 /* 2007 * If we're expecting a response, block and wait for it. 2008 */ 2009 if (resp) { 2010 if (callback) { 2011 trace_qemu_rdma_exchange_send_issue_callback(); 2012 ret = callback(rdma); 2013 if (ret < 0) { 2014 return ret; 2015 } 2016 } 2017 2018 trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type)); 2019 ret = qemu_rdma_exchange_get_response(rdma, resp, 2020 resp->type, RDMA_WRID_DATA); 2021 2022 if (ret < 0) { 2023 return ret; 2024 } 2025 2026 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 2027 if (resp_idx) { 2028 *resp_idx = RDMA_WRID_DATA; 2029 } 2030 trace_qemu_rdma_exchange_send_received(control_desc(resp->type)); 2031 } 2032 2033 rdma->control_ready_expected = 1; 2034 2035 return 0; 2036 } 2037 2038 /* 2039 * This is an 'atomic' high-level operation to receive a single, unified 2040 * control-channel message. 2041 */ 2042 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 2043 int expecting) 2044 { 2045 RDMAControlHeader ready = { 2046 .len = 0, 2047 .type = RDMA_CONTROL_READY, 2048 .repeat = 1, 2049 }; 2050 int ret; 2051 2052 /* 2053 * Inform the source that we're ready to receive a message. 2054 */ 2055 ret = qemu_rdma_post_send_control(rdma, NULL, &ready); 2056 2057 if (ret < 0) { 2058 error_report("Failed to send control buffer!"); 2059 return ret; 2060 } 2061 2062 /* 2063 * Block and wait for the message. 2064 */ 2065 ret = qemu_rdma_exchange_get_response(rdma, head, 2066 expecting, RDMA_WRID_READY); 2067 2068 if (ret < 0) { 2069 return ret; 2070 } 2071 2072 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 2073 2074 /* 2075 * Post a new RECV work request to replace the one we just consumed. 2076 */ 2077 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2078 if (ret) { 2079 error_report("rdma migration: error posting second control recv!"); 2080 return ret; 2081 } 2082 2083 return 0; 2084 } 2085 2086 /* 2087 * Write an actual chunk of memory using RDMA. 2088 * 2089 * If we're using dynamic registration on the dest-side, we have to 2090 * send a registration command first. 2091 */ 2092 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, 2093 int current_index, uint64_t current_addr, 2094 uint64_t length) 2095 { 2096 struct ibv_sge sge; 2097 struct ibv_send_wr send_wr = { 0 }; 2098 struct ibv_send_wr *bad_wr; 2099 int reg_result_idx, ret, count = 0; 2100 uint64_t chunk, chunks; 2101 uint8_t *chunk_start, *chunk_end; 2102 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 2103 RDMARegister reg; 2104 RDMARegisterResult *reg_result; 2105 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 2106 RDMAControlHeader head = { .len = sizeof(RDMARegister), 2107 .type = RDMA_CONTROL_REGISTER_REQUEST, 2108 .repeat = 1, 2109 }; 2110 2111 retry: 2112 sge.addr = (uintptr_t)(block->local_host_addr + 2113 (current_addr - block->offset)); 2114 sge.length = length; 2115 2116 chunk = ram_chunk_index(block->local_host_addr, 2117 (uint8_t *)(uintptr_t)sge.addr); 2118 chunk_start = ram_chunk_start(block, chunk); 2119 2120 if (block->is_ram_block) { 2121 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 2122 2123 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 2124 chunks--; 2125 } 2126 } else { 2127 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 2128 2129 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 2130 chunks--; 2131 } 2132 } 2133 2134 trace_qemu_rdma_write_one_top(chunks + 1, 2135 (chunks + 1) * 2136 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 2137 2138 chunk_end = ram_chunk_end(block, chunk + chunks); 2139 2140 if (!rdma->pin_all) { 2141 #ifdef RDMA_UNREGISTRATION_EXAMPLE 2142 qemu_rdma_unregister_waiting(rdma); 2143 #endif 2144 } 2145 2146 while (test_bit(chunk, block->transit_bitmap)) { 2147 (void)count; 2148 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 2149 sge.addr, length, rdma->nb_sent, block->nb_chunks); 2150 2151 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2152 2153 if (ret < 0) { 2154 error_report("Failed to Wait for previous write to complete " 2155 "block %d chunk %" PRIu64 2156 " current %" PRIu64 " len %" PRIu64 " %d", 2157 current_index, chunk, sge.addr, length, rdma->nb_sent); 2158 return ret; 2159 } 2160 } 2161 2162 if (!rdma->pin_all || !block->is_ram_block) { 2163 if (!block->remote_keys[chunk]) { 2164 /* 2165 * This chunk has not yet been registered, so first check to see 2166 * if the entire chunk is zero. If so, tell the other size to 2167 * memset() + madvise() the entire chunk without RDMA. 2168 */ 2169 2170 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) { 2171 RDMACompress comp = { 2172 .offset = current_addr, 2173 .value = 0, 2174 .block_idx = current_index, 2175 .length = length, 2176 }; 2177 2178 head.len = sizeof(comp); 2179 head.type = RDMA_CONTROL_COMPRESS; 2180 2181 trace_qemu_rdma_write_one_zero(chunk, sge.length, 2182 current_index, current_addr); 2183 2184 compress_to_network(rdma, &comp); 2185 ret = qemu_rdma_exchange_send(rdma, &head, 2186 (uint8_t *) &comp, NULL, NULL, NULL); 2187 2188 if (ret < 0) { 2189 return -EIO; 2190 } 2191 2192 acct_update_position(f, sge.length, true); 2193 2194 return 1; 2195 } 2196 2197 /* 2198 * Otherwise, tell other side to register. 2199 */ 2200 reg.current_index = current_index; 2201 if (block->is_ram_block) { 2202 reg.key.current_addr = current_addr; 2203 } else { 2204 reg.key.chunk = chunk; 2205 } 2206 reg.chunks = chunks; 2207 2208 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 2209 current_addr); 2210 2211 register_to_network(rdma, ®); 2212 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 2213 &resp, ®_result_idx, NULL); 2214 if (ret < 0) { 2215 return ret; 2216 } 2217 2218 /* try to overlap this single registration with the one we sent. */ 2219 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2220 &sge.lkey, NULL, chunk, 2221 chunk_start, chunk_end)) { 2222 error_report("cannot get lkey"); 2223 return -EINVAL; 2224 } 2225 2226 reg_result = (RDMARegisterResult *) 2227 rdma->wr_data[reg_result_idx].control_curr; 2228 2229 network_to_result(reg_result); 2230 2231 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 2232 reg_result->rkey, chunk); 2233 2234 block->remote_keys[chunk] = reg_result->rkey; 2235 block->remote_host_addr = reg_result->host_addr; 2236 } else { 2237 /* already registered before */ 2238 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2239 &sge.lkey, NULL, chunk, 2240 chunk_start, chunk_end)) { 2241 error_report("cannot get lkey!"); 2242 return -EINVAL; 2243 } 2244 } 2245 2246 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 2247 } else { 2248 send_wr.wr.rdma.rkey = block->remote_rkey; 2249 2250 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2251 &sge.lkey, NULL, chunk, 2252 chunk_start, chunk_end)) { 2253 error_report("cannot get lkey!"); 2254 return -EINVAL; 2255 } 2256 } 2257 2258 /* 2259 * Encode the ram block index and chunk within this wrid. 2260 * We will use this information at the time of completion 2261 * to figure out which bitmap to check against and then which 2262 * chunk in the bitmap to look for. 2263 */ 2264 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 2265 current_index, chunk); 2266 2267 send_wr.opcode = IBV_WR_RDMA_WRITE; 2268 send_wr.send_flags = IBV_SEND_SIGNALED; 2269 send_wr.sg_list = &sge; 2270 send_wr.num_sge = 1; 2271 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2272 (current_addr - block->offset); 2273 2274 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2275 sge.length); 2276 2277 /* 2278 * ibv_post_send() does not return negative error numbers, 2279 * per the specification they are positive - no idea why. 2280 */ 2281 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2282 2283 if (ret == ENOMEM) { 2284 trace_qemu_rdma_write_one_queue_full(); 2285 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2286 if (ret < 0) { 2287 error_report("rdma migration: failed to make " 2288 "room in full send queue! %d", ret); 2289 return ret; 2290 } 2291 2292 goto retry; 2293 2294 } else if (ret > 0) { 2295 perror("rdma migration: post rdma write failed"); 2296 return -ret; 2297 } 2298 2299 set_bit(chunk, block->transit_bitmap); 2300 acct_update_position(f, sge.length, false); 2301 rdma->total_writes++; 2302 2303 return 0; 2304 } 2305 2306 /* 2307 * Push out any unwritten RDMA operations. 2308 * 2309 * We support sending out multiple chunks at the same time. 2310 * Not all of them need to get signaled in the completion queue. 2311 */ 2312 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) 2313 { 2314 int ret; 2315 2316 if (!rdma->current_length) { 2317 return 0; 2318 } 2319 2320 ret = qemu_rdma_write_one(f, rdma, 2321 rdma->current_index, rdma->current_addr, rdma->current_length); 2322 2323 if (ret < 0) { 2324 return ret; 2325 } 2326 2327 if (ret == 0) { 2328 rdma->nb_sent++; 2329 trace_qemu_rdma_write_flush(rdma->nb_sent); 2330 } 2331 2332 rdma->current_length = 0; 2333 rdma->current_addr = 0; 2334 2335 return 0; 2336 } 2337 2338 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma, 2339 uint64_t offset, uint64_t len) 2340 { 2341 RDMALocalBlock *block; 2342 uint8_t *host_addr; 2343 uint8_t *chunk_end; 2344 2345 if (rdma->current_index < 0) { 2346 return 0; 2347 } 2348 2349 if (rdma->current_chunk < 0) { 2350 return 0; 2351 } 2352 2353 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2354 host_addr = block->local_host_addr + (offset - block->offset); 2355 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2356 2357 if (rdma->current_length == 0) { 2358 return 0; 2359 } 2360 2361 /* 2362 * Only merge into chunk sequentially. 2363 */ 2364 if (offset != (rdma->current_addr + rdma->current_length)) { 2365 return 0; 2366 } 2367 2368 if (offset < block->offset) { 2369 return 0; 2370 } 2371 2372 if ((offset + len) > (block->offset + block->length)) { 2373 return 0; 2374 } 2375 2376 if ((host_addr + len) > chunk_end) { 2377 return 0; 2378 } 2379 2380 return 1; 2381 } 2382 2383 /* 2384 * We're not actually writing here, but doing three things: 2385 * 2386 * 1. Identify the chunk the buffer belongs to. 2387 * 2. If the chunk is full or the buffer doesn't belong to the current 2388 * chunk, then start a new chunk and flush() the old chunk. 2389 * 3. To keep the hardware busy, we also group chunks into batches 2390 * and only require that a batch gets acknowledged in the completion 2391 * queue instead of each individual chunk. 2392 */ 2393 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma, 2394 uint64_t block_offset, uint64_t offset, 2395 uint64_t len) 2396 { 2397 uint64_t current_addr = block_offset + offset; 2398 uint64_t index = rdma->current_index; 2399 uint64_t chunk = rdma->current_chunk; 2400 int ret; 2401 2402 /* If we cannot merge it, we flush the current buffer first. */ 2403 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) { 2404 ret = qemu_rdma_write_flush(f, rdma); 2405 if (ret) { 2406 return ret; 2407 } 2408 rdma->current_length = 0; 2409 rdma->current_addr = current_addr; 2410 2411 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2412 offset, len, &index, &chunk); 2413 if (ret) { 2414 error_report("ram block search failed"); 2415 return ret; 2416 } 2417 rdma->current_index = index; 2418 rdma->current_chunk = chunk; 2419 } 2420 2421 /* merge it */ 2422 rdma->current_length += len; 2423 2424 /* flush it if buffer is too large */ 2425 if (rdma->current_length >= RDMA_MERGE_MAX) { 2426 return qemu_rdma_write_flush(f, rdma); 2427 } 2428 2429 return 0; 2430 } 2431 2432 static void qemu_rdma_cleanup(RDMAContext *rdma) 2433 { 2434 int idx; 2435 2436 if (rdma->cm_id && rdma->connected) { 2437 if ((rdma->error_state || 2438 migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) && 2439 !rdma->received_error) { 2440 RDMAControlHeader head = { .len = 0, 2441 .type = RDMA_CONTROL_ERROR, 2442 .repeat = 1, 2443 }; 2444 error_report("Early error. Sending error."); 2445 qemu_rdma_post_send_control(rdma, NULL, &head); 2446 } 2447 2448 rdma_disconnect(rdma->cm_id); 2449 trace_qemu_rdma_cleanup_disconnect(); 2450 rdma->connected = false; 2451 } 2452 2453 if (rdma->channel) { 2454 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 2455 } 2456 g_free(rdma->dest_blocks); 2457 rdma->dest_blocks = NULL; 2458 2459 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2460 if (rdma->wr_data[idx].control_mr) { 2461 rdma->total_registrations--; 2462 ibv_dereg_mr(rdma->wr_data[idx].control_mr); 2463 } 2464 rdma->wr_data[idx].control_mr = NULL; 2465 } 2466 2467 if (rdma->local_ram_blocks.block) { 2468 while (rdma->local_ram_blocks.nb_blocks) { 2469 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); 2470 } 2471 } 2472 2473 if (rdma->qp) { 2474 rdma_destroy_qp(rdma->cm_id); 2475 rdma->qp = NULL; 2476 } 2477 if (rdma->recv_cq) { 2478 ibv_destroy_cq(rdma->recv_cq); 2479 rdma->recv_cq = NULL; 2480 } 2481 if (rdma->send_cq) { 2482 ibv_destroy_cq(rdma->send_cq); 2483 rdma->send_cq = NULL; 2484 } 2485 if (rdma->recv_comp_channel) { 2486 ibv_destroy_comp_channel(rdma->recv_comp_channel); 2487 rdma->recv_comp_channel = NULL; 2488 } 2489 if (rdma->send_comp_channel) { 2490 ibv_destroy_comp_channel(rdma->send_comp_channel); 2491 rdma->send_comp_channel = NULL; 2492 } 2493 if (rdma->pd) { 2494 ibv_dealloc_pd(rdma->pd); 2495 rdma->pd = NULL; 2496 } 2497 if (rdma->cm_id) { 2498 rdma_destroy_id(rdma->cm_id); 2499 rdma->cm_id = NULL; 2500 } 2501 2502 /* the destination side, listen_id and channel is shared */ 2503 if (rdma->listen_id) { 2504 if (!rdma->is_return_path) { 2505 rdma_destroy_id(rdma->listen_id); 2506 } 2507 rdma->listen_id = NULL; 2508 2509 if (rdma->channel) { 2510 if (!rdma->is_return_path) { 2511 rdma_destroy_event_channel(rdma->channel); 2512 } 2513 rdma->channel = NULL; 2514 } 2515 } 2516 2517 if (rdma->channel) { 2518 rdma_destroy_event_channel(rdma->channel); 2519 rdma->channel = NULL; 2520 } 2521 g_free(rdma->host); 2522 g_free(rdma->host_port); 2523 rdma->host = NULL; 2524 rdma->host_port = NULL; 2525 } 2526 2527 2528 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp) 2529 { 2530 int ret, idx; 2531 Error *local_err = NULL, **temp = &local_err; 2532 2533 /* 2534 * Will be validated against destination's actual capabilities 2535 * after the connect() completes. 2536 */ 2537 rdma->pin_all = pin_all; 2538 2539 ret = qemu_rdma_resolve_host(rdma, temp); 2540 if (ret) { 2541 goto err_rdma_source_init; 2542 } 2543 2544 ret = qemu_rdma_alloc_pd_cq(rdma); 2545 if (ret) { 2546 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()" 2547 " limits may be too low. Please check $ ulimit -a # and " 2548 "search for 'ulimit -l' in the output"); 2549 goto err_rdma_source_init; 2550 } 2551 2552 ret = qemu_rdma_alloc_qp(rdma); 2553 if (ret) { 2554 ERROR(temp, "rdma migration: error allocating qp!"); 2555 goto err_rdma_source_init; 2556 } 2557 2558 ret = qemu_rdma_init_ram_blocks(rdma); 2559 if (ret) { 2560 ERROR(temp, "rdma migration: error initializing ram blocks!"); 2561 goto err_rdma_source_init; 2562 } 2563 2564 /* Build the hash that maps from offset to RAMBlock */ 2565 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 2566 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) { 2567 g_hash_table_insert(rdma->blockmap, 2568 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset, 2569 &rdma->local_ram_blocks.block[idx]); 2570 } 2571 2572 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2573 ret = qemu_rdma_reg_control(rdma, idx); 2574 if (ret) { 2575 ERROR(temp, "rdma migration: error registering %d control!", 2576 idx); 2577 goto err_rdma_source_init; 2578 } 2579 } 2580 2581 return 0; 2582 2583 err_rdma_source_init: 2584 error_propagate(errp, local_err); 2585 qemu_rdma_cleanup(rdma); 2586 return -1; 2587 } 2588 2589 static int qemu_get_cm_event_timeout(RDMAContext *rdma, 2590 struct rdma_cm_event **cm_event, 2591 long msec, Error **errp) 2592 { 2593 int ret; 2594 struct pollfd poll_fd = { 2595 .fd = rdma->channel->fd, 2596 .events = POLLIN, 2597 .revents = 0 2598 }; 2599 2600 do { 2601 ret = poll(&poll_fd, 1, msec); 2602 } while (ret < 0 && errno == EINTR); 2603 2604 if (ret == 0) { 2605 ERROR(errp, "poll cm event timeout"); 2606 return -1; 2607 } else if (ret < 0) { 2608 ERROR(errp, "failed to poll cm event, errno=%i", errno); 2609 return -1; 2610 } else if (poll_fd.revents & POLLIN) { 2611 return rdma_get_cm_event(rdma->channel, cm_event); 2612 } else { 2613 ERROR(errp, "no POLLIN event, revent=%x", poll_fd.revents); 2614 return -1; 2615 } 2616 } 2617 2618 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path) 2619 { 2620 RDMACapabilities cap = { 2621 .version = RDMA_CONTROL_VERSION_CURRENT, 2622 .flags = 0, 2623 }; 2624 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2625 .retry_count = 5, 2626 .private_data = &cap, 2627 .private_data_len = sizeof(cap), 2628 }; 2629 struct rdma_cm_event *cm_event; 2630 int ret; 2631 2632 /* 2633 * Only negotiate the capability with destination if the user 2634 * on the source first requested the capability. 2635 */ 2636 if (rdma->pin_all) { 2637 trace_qemu_rdma_connect_pin_all_requested(); 2638 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2639 } 2640 2641 caps_to_network(&cap); 2642 2643 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2644 if (ret) { 2645 ERROR(errp, "posting second control recv"); 2646 goto err_rdma_source_connect; 2647 } 2648 2649 ret = rdma_connect(rdma->cm_id, &conn_param); 2650 if (ret) { 2651 perror("rdma_connect"); 2652 ERROR(errp, "connecting to destination!"); 2653 goto err_rdma_source_connect; 2654 } 2655 2656 if (return_path) { 2657 ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp); 2658 } else { 2659 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2660 } 2661 if (ret) { 2662 perror("rdma_get_cm_event after rdma_connect"); 2663 ERROR(errp, "connecting to destination!"); 2664 goto err_rdma_source_connect; 2665 } 2666 2667 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2668 error_report("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect"); 2669 ERROR(errp, "connecting to destination!"); 2670 rdma_ack_cm_event(cm_event); 2671 goto err_rdma_source_connect; 2672 } 2673 rdma->connected = true; 2674 2675 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2676 network_to_caps(&cap); 2677 2678 /* 2679 * Verify that the *requested* capabilities are supported by the destination 2680 * and disable them otherwise. 2681 */ 2682 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2683 ERROR(errp, "Server cannot support pinning all memory. " 2684 "Will register memory dynamically."); 2685 rdma->pin_all = false; 2686 } 2687 2688 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2689 2690 rdma_ack_cm_event(cm_event); 2691 2692 rdma->control_ready_expected = 1; 2693 rdma->nb_sent = 0; 2694 return 0; 2695 2696 err_rdma_source_connect: 2697 qemu_rdma_cleanup(rdma); 2698 return -1; 2699 } 2700 2701 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2702 { 2703 int ret, idx; 2704 struct rdma_cm_id *listen_id; 2705 char ip[40] = "unknown"; 2706 struct rdma_addrinfo *res, *e; 2707 char port_str[16]; 2708 int reuse = 1; 2709 2710 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2711 rdma->wr_data[idx].control_len = 0; 2712 rdma->wr_data[idx].control_curr = NULL; 2713 } 2714 2715 if (!rdma->host || !rdma->host[0]) { 2716 ERROR(errp, "RDMA host is not set!"); 2717 rdma->error_state = -EINVAL; 2718 return -1; 2719 } 2720 /* create CM channel */ 2721 rdma->channel = rdma_create_event_channel(); 2722 if (!rdma->channel) { 2723 ERROR(errp, "could not create rdma event channel"); 2724 rdma->error_state = -EINVAL; 2725 return -1; 2726 } 2727 2728 /* create CM id */ 2729 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2730 if (ret) { 2731 ERROR(errp, "could not create cm_id!"); 2732 goto err_dest_init_create_listen_id; 2733 } 2734 2735 snprintf(port_str, 16, "%d", rdma->port); 2736 port_str[15] = '\0'; 2737 2738 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2739 if (ret < 0) { 2740 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 2741 goto err_dest_init_bind_addr; 2742 } 2743 2744 ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR, 2745 &reuse, sizeof reuse); 2746 if (ret) { 2747 ERROR(errp, "Error: could not set REUSEADDR option"); 2748 goto err_dest_init_bind_addr; 2749 } 2750 for (e = res; e != NULL; e = e->ai_next) { 2751 inet_ntop(e->ai_family, 2752 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2753 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2754 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2755 if (ret) { 2756 continue; 2757 } 2758 if (e->ai_family == AF_INET6) { 2759 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp); 2760 if (ret) { 2761 continue; 2762 } 2763 } 2764 break; 2765 } 2766 2767 rdma_freeaddrinfo(res); 2768 if (!e) { 2769 ERROR(errp, "Error: could not rdma_bind_addr!"); 2770 goto err_dest_init_bind_addr; 2771 } 2772 2773 rdma->listen_id = listen_id; 2774 qemu_rdma_dump_gid("dest_init", listen_id); 2775 return 0; 2776 2777 err_dest_init_bind_addr: 2778 rdma_destroy_id(listen_id); 2779 err_dest_init_create_listen_id: 2780 rdma_destroy_event_channel(rdma->channel); 2781 rdma->channel = NULL; 2782 rdma->error_state = ret; 2783 return ret; 2784 2785 } 2786 2787 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path, 2788 RDMAContext *rdma) 2789 { 2790 int idx; 2791 2792 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2793 rdma_return_path->wr_data[idx].control_len = 0; 2794 rdma_return_path->wr_data[idx].control_curr = NULL; 2795 } 2796 2797 /*the CM channel and CM id is shared*/ 2798 rdma_return_path->channel = rdma->channel; 2799 rdma_return_path->listen_id = rdma->listen_id; 2800 2801 rdma->return_path = rdma_return_path; 2802 rdma_return_path->return_path = rdma; 2803 rdma_return_path->is_return_path = true; 2804 } 2805 2806 static void *qemu_rdma_data_init(const char *host_port, Error **errp) 2807 { 2808 RDMAContext *rdma = NULL; 2809 InetSocketAddress *addr; 2810 2811 if (host_port) { 2812 rdma = g_new0(RDMAContext, 1); 2813 rdma->current_index = -1; 2814 rdma->current_chunk = -1; 2815 2816 addr = g_new(InetSocketAddress, 1); 2817 if (!inet_parse(addr, host_port, NULL)) { 2818 rdma->port = atoi(addr->port); 2819 rdma->host = g_strdup(addr->host); 2820 rdma->host_port = g_strdup(host_port); 2821 } else { 2822 ERROR(errp, "bad RDMA migration address '%s'", host_port); 2823 g_free(rdma); 2824 rdma = NULL; 2825 } 2826 2827 qapi_free_InetSocketAddress(addr); 2828 } 2829 2830 return rdma; 2831 } 2832 2833 /* 2834 * QEMUFile interface to the control channel. 2835 * SEND messages for control only. 2836 * VM's ram is handled with regular RDMA messages. 2837 */ 2838 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, 2839 const struct iovec *iov, 2840 size_t niov, 2841 int *fds, 2842 size_t nfds, 2843 int flags, 2844 Error **errp) 2845 { 2846 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2847 QEMUFile *f = rioc->file; 2848 RDMAContext *rdma; 2849 int ret; 2850 ssize_t done = 0; 2851 size_t i; 2852 size_t len = 0; 2853 2854 RCU_READ_LOCK_GUARD(); 2855 rdma = qatomic_rcu_read(&rioc->rdmaout); 2856 2857 if (!rdma) { 2858 return -EIO; 2859 } 2860 2861 CHECK_ERROR_STATE(); 2862 2863 /* 2864 * Push out any writes that 2865 * we're queued up for VM's ram. 2866 */ 2867 ret = qemu_rdma_write_flush(f, rdma); 2868 if (ret < 0) { 2869 rdma->error_state = ret; 2870 return ret; 2871 } 2872 2873 for (i = 0; i < niov; i++) { 2874 size_t remaining = iov[i].iov_len; 2875 uint8_t * data = (void *)iov[i].iov_base; 2876 while (remaining) { 2877 RDMAControlHeader head; 2878 2879 len = MIN(remaining, RDMA_SEND_INCREMENT); 2880 remaining -= len; 2881 2882 head.len = len; 2883 head.type = RDMA_CONTROL_QEMU_FILE; 2884 2885 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); 2886 2887 if (ret < 0) { 2888 rdma->error_state = ret; 2889 return ret; 2890 } 2891 2892 data += len; 2893 done += len; 2894 } 2895 } 2896 2897 return done; 2898 } 2899 2900 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2901 size_t size, int idx) 2902 { 2903 size_t len = 0; 2904 2905 if (rdma->wr_data[idx].control_len) { 2906 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2907 2908 len = MIN(size, rdma->wr_data[idx].control_len); 2909 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2910 rdma->wr_data[idx].control_curr += len; 2911 rdma->wr_data[idx].control_len -= len; 2912 } 2913 2914 return len; 2915 } 2916 2917 /* 2918 * QEMUFile interface to the control channel. 2919 * RDMA links don't use bytestreams, so we have to 2920 * return bytes to QEMUFile opportunistically. 2921 */ 2922 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, 2923 const struct iovec *iov, 2924 size_t niov, 2925 int **fds, 2926 size_t *nfds, 2927 Error **errp) 2928 { 2929 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2930 RDMAContext *rdma; 2931 RDMAControlHeader head; 2932 int ret = 0; 2933 ssize_t i; 2934 size_t done = 0; 2935 2936 RCU_READ_LOCK_GUARD(); 2937 rdma = qatomic_rcu_read(&rioc->rdmain); 2938 2939 if (!rdma) { 2940 return -EIO; 2941 } 2942 2943 CHECK_ERROR_STATE(); 2944 2945 for (i = 0; i < niov; i++) { 2946 size_t want = iov[i].iov_len; 2947 uint8_t *data = (void *)iov[i].iov_base; 2948 2949 /* 2950 * First, we hold on to the last SEND message we 2951 * were given and dish out the bytes until we run 2952 * out of bytes. 2953 */ 2954 ret = qemu_rdma_fill(rdma, data, want, 0); 2955 done += ret; 2956 want -= ret; 2957 /* Got what we needed, so go to next iovec */ 2958 if (want == 0) { 2959 continue; 2960 } 2961 2962 /* If we got any data so far, then don't wait 2963 * for more, just return what we have */ 2964 if (done > 0) { 2965 break; 2966 } 2967 2968 2969 /* We've got nothing at all, so lets wait for 2970 * more to arrive 2971 */ 2972 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); 2973 2974 if (ret < 0) { 2975 rdma->error_state = ret; 2976 return ret; 2977 } 2978 2979 /* 2980 * SEND was received with new bytes, now try again. 2981 */ 2982 ret = qemu_rdma_fill(rdma, data, want, 0); 2983 done += ret; 2984 want -= ret; 2985 2986 /* Still didn't get enough, so lets just return */ 2987 if (want) { 2988 if (done == 0) { 2989 return QIO_CHANNEL_ERR_BLOCK; 2990 } else { 2991 break; 2992 } 2993 } 2994 } 2995 return done; 2996 } 2997 2998 /* 2999 * Block until all the outstanding chunks have been delivered by the hardware. 3000 */ 3001 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) 3002 { 3003 int ret; 3004 3005 if (qemu_rdma_write_flush(f, rdma) < 0) { 3006 return -EIO; 3007 } 3008 3009 while (rdma->nb_sent) { 3010 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 3011 if (ret < 0) { 3012 error_report("rdma migration: complete polling error!"); 3013 return -EIO; 3014 } 3015 } 3016 3017 qemu_rdma_unregister_waiting(rdma); 3018 3019 return 0; 3020 } 3021 3022 3023 static int qio_channel_rdma_set_blocking(QIOChannel *ioc, 3024 bool blocking, 3025 Error **errp) 3026 { 3027 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3028 /* XXX we should make readv/writev actually honour this :-) */ 3029 rioc->blocking = blocking; 3030 return 0; 3031 } 3032 3033 3034 typedef struct QIOChannelRDMASource QIOChannelRDMASource; 3035 struct QIOChannelRDMASource { 3036 GSource parent; 3037 QIOChannelRDMA *rioc; 3038 GIOCondition condition; 3039 }; 3040 3041 static gboolean 3042 qio_channel_rdma_source_prepare(GSource *source, 3043 gint *timeout) 3044 { 3045 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 3046 RDMAContext *rdma; 3047 GIOCondition cond = 0; 3048 *timeout = -1; 3049 3050 RCU_READ_LOCK_GUARD(); 3051 if (rsource->condition == G_IO_IN) { 3052 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 3053 } else { 3054 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 3055 } 3056 3057 if (!rdma) { 3058 error_report("RDMAContext is NULL when prepare Gsource"); 3059 return FALSE; 3060 } 3061 3062 if (rdma->wr_data[0].control_len) { 3063 cond |= G_IO_IN; 3064 } 3065 cond |= G_IO_OUT; 3066 3067 return cond & rsource->condition; 3068 } 3069 3070 static gboolean 3071 qio_channel_rdma_source_check(GSource *source) 3072 { 3073 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 3074 RDMAContext *rdma; 3075 GIOCondition cond = 0; 3076 3077 RCU_READ_LOCK_GUARD(); 3078 if (rsource->condition == G_IO_IN) { 3079 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 3080 } else { 3081 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 3082 } 3083 3084 if (!rdma) { 3085 error_report("RDMAContext is NULL when check Gsource"); 3086 return FALSE; 3087 } 3088 3089 if (rdma->wr_data[0].control_len) { 3090 cond |= G_IO_IN; 3091 } 3092 cond |= G_IO_OUT; 3093 3094 return cond & rsource->condition; 3095 } 3096 3097 static gboolean 3098 qio_channel_rdma_source_dispatch(GSource *source, 3099 GSourceFunc callback, 3100 gpointer user_data) 3101 { 3102 QIOChannelFunc func = (QIOChannelFunc)callback; 3103 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 3104 RDMAContext *rdma; 3105 GIOCondition cond = 0; 3106 3107 RCU_READ_LOCK_GUARD(); 3108 if (rsource->condition == G_IO_IN) { 3109 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 3110 } else { 3111 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 3112 } 3113 3114 if (!rdma) { 3115 error_report("RDMAContext is NULL when dispatch Gsource"); 3116 return FALSE; 3117 } 3118 3119 if (rdma->wr_data[0].control_len) { 3120 cond |= G_IO_IN; 3121 } 3122 cond |= G_IO_OUT; 3123 3124 return (*func)(QIO_CHANNEL(rsource->rioc), 3125 (cond & rsource->condition), 3126 user_data); 3127 } 3128 3129 static void 3130 qio_channel_rdma_source_finalize(GSource *source) 3131 { 3132 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source; 3133 3134 object_unref(OBJECT(ssource->rioc)); 3135 } 3136 3137 GSourceFuncs qio_channel_rdma_source_funcs = { 3138 qio_channel_rdma_source_prepare, 3139 qio_channel_rdma_source_check, 3140 qio_channel_rdma_source_dispatch, 3141 qio_channel_rdma_source_finalize 3142 }; 3143 3144 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, 3145 GIOCondition condition) 3146 { 3147 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3148 QIOChannelRDMASource *ssource; 3149 GSource *source; 3150 3151 source = g_source_new(&qio_channel_rdma_source_funcs, 3152 sizeof(QIOChannelRDMASource)); 3153 ssource = (QIOChannelRDMASource *)source; 3154 3155 ssource->rioc = rioc; 3156 object_ref(OBJECT(rioc)); 3157 3158 ssource->condition = condition; 3159 3160 return source; 3161 } 3162 3163 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, 3164 AioContext *ctx, 3165 IOHandler *io_read, 3166 IOHandler *io_write, 3167 void *opaque) 3168 { 3169 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3170 if (io_read) { 3171 aio_set_fd_handler(ctx, rioc->rdmain->recv_comp_channel->fd, 3172 false, io_read, io_write, NULL, NULL, opaque); 3173 aio_set_fd_handler(ctx, rioc->rdmain->send_comp_channel->fd, 3174 false, io_read, io_write, NULL, NULL, opaque); 3175 } else { 3176 aio_set_fd_handler(ctx, rioc->rdmaout->recv_comp_channel->fd, 3177 false, io_read, io_write, NULL, NULL, opaque); 3178 aio_set_fd_handler(ctx, rioc->rdmaout->send_comp_channel->fd, 3179 false, io_read, io_write, NULL, NULL, opaque); 3180 } 3181 } 3182 3183 struct rdma_close_rcu { 3184 struct rcu_head rcu; 3185 RDMAContext *rdmain; 3186 RDMAContext *rdmaout; 3187 }; 3188 3189 /* callback from qio_channel_rdma_close via call_rcu */ 3190 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu) 3191 { 3192 if (rcu->rdmain) { 3193 qemu_rdma_cleanup(rcu->rdmain); 3194 } 3195 3196 if (rcu->rdmaout) { 3197 qemu_rdma_cleanup(rcu->rdmaout); 3198 } 3199 3200 g_free(rcu->rdmain); 3201 g_free(rcu->rdmaout); 3202 g_free(rcu); 3203 } 3204 3205 static int qio_channel_rdma_close(QIOChannel *ioc, 3206 Error **errp) 3207 { 3208 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3209 RDMAContext *rdmain, *rdmaout; 3210 struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1); 3211 3212 trace_qemu_rdma_close(); 3213 3214 rdmain = rioc->rdmain; 3215 if (rdmain) { 3216 qatomic_rcu_set(&rioc->rdmain, NULL); 3217 } 3218 3219 rdmaout = rioc->rdmaout; 3220 if (rdmaout) { 3221 qatomic_rcu_set(&rioc->rdmaout, NULL); 3222 } 3223 3224 rcu->rdmain = rdmain; 3225 rcu->rdmaout = rdmaout; 3226 call_rcu(rcu, qio_channel_rdma_close_rcu, rcu); 3227 3228 return 0; 3229 } 3230 3231 static int 3232 qio_channel_rdma_shutdown(QIOChannel *ioc, 3233 QIOChannelShutdown how, 3234 Error **errp) 3235 { 3236 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3237 RDMAContext *rdmain, *rdmaout; 3238 3239 RCU_READ_LOCK_GUARD(); 3240 3241 rdmain = qatomic_rcu_read(&rioc->rdmain); 3242 rdmaout = qatomic_rcu_read(&rioc->rdmain); 3243 3244 switch (how) { 3245 case QIO_CHANNEL_SHUTDOWN_READ: 3246 if (rdmain) { 3247 rdmain->error_state = -1; 3248 } 3249 break; 3250 case QIO_CHANNEL_SHUTDOWN_WRITE: 3251 if (rdmaout) { 3252 rdmaout->error_state = -1; 3253 } 3254 break; 3255 case QIO_CHANNEL_SHUTDOWN_BOTH: 3256 default: 3257 if (rdmain) { 3258 rdmain->error_state = -1; 3259 } 3260 if (rdmaout) { 3261 rdmaout->error_state = -1; 3262 } 3263 break; 3264 } 3265 3266 return 0; 3267 } 3268 3269 /* 3270 * Parameters: 3271 * @offset == 0 : 3272 * This means that 'block_offset' is a full virtual address that does not 3273 * belong to a RAMBlock of the virtual machine and instead 3274 * represents a private malloc'd memory area that the caller wishes to 3275 * transfer. 3276 * 3277 * @offset != 0 : 3278 * Offset is an offset to be added to block_offset and used 3279 * to also lookup the corresponding RAMBlock. 3280 * 3281 * @size > 0 : 3282 * Initiate an transfer this size. 3283 * 3284 * @size == 0 : 3285 * A 'hint' or 'advice' that means that we wish to speculatively 3286 * and asynchronously unregister this memory. In this case, there is no 3287 * guarantee that the unregister will actually happen, for example, 3288 * if the memory is being actively transmitted. Additionally, the memory 3289 * may be re-registered at any future time if a write within the same 3290 * chunk was requested again, even if you attempted to unregister it 3291 * here. 3292 * 3293 * @size < 0 : TODO, not yet supported 3294 * Unregister the memory NOW. This means that the caller does not 3295 * expect there to be any future RDMA transfers and we just want to clean 3296 * things up. This is used in case the upper layer owns the memory and 3297 * cannot wait for qemu_fclose() to occur. 3298 * 3299 * @bytes_sent : User-specificed pointer to indicate how many bytes were 3300 * sent. Usually, this will not be more than a few bytes of 3301 * the protocol because most transfers are sent asynchronously. 3302 */ 3303 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, 3304 ram_addr_t block_offset, ram_addr_t offset, 3305 size_t size, uint64_t *bytes_sent) 3306 { 3307 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3308 RDMAContext *rdma; 3309 int ret; 3310 3311 RCU_READ_LOCK_GUARD(); 3312 rdma = qatomic_rcu_read(&rioc->rdmaout); 3313 3314 if (!rdma) { 3315 return -EIO; 3316 } 3317 3318 CHECK_ERROR_STATE(); 3319 3320 if (migration_in_postcopy()) { 3321 return RAM_SAVE_CONTROL_NOT_SUPP; 3322 } 3323 3324 qemu_fflush(f); 3325 3326 if (size > 0) { 3327 /* 3328 * Add this page to the current 'chunk'. If the chunk 3329 * is full, or the page doesn't belong to the current chunk, 3330 * an actual RDMA write will occur and a new chunk will be formed. 3331 */ 3332 ret = qemu_rdma_write(f, rdma, block_offset, offset, size); 3333 if (ret < 0) { 3334 error_report("rdma migration: write error! %d", ret); 3335 goto err; 3336 } 3337 3338 /* 3339 * We always return 1 bytes because the RDMA 3340 * protocol is completely asynchronous. We do not yet know 3341 * whether an identified chunk is zero or not because we're 3342 * waiting for other pages to potentially be merged with 3343 * the current chunk. So, we have to call qemu_update_position() 3344 * later on when the actual write occurs. 3345 */ 3346 if (bytes_sent) { 3347 *bytes_sent = 1; 3348 } 3349 } else { 3350 uint64_t index, chunk; 3351 3352 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long 3353 if (size < 0) { 3354 ret = qemu_rdma_drain_cq(f, rdma); 3355 if (ret < 0) { 3356 fprintf(stderr, "rdma: failed to synchronously drain" 3357 " completion queue before unregistration.\n"); 3358 goto err; 3359 } 3360 } 3361 */ 3362 3363 ret = qemu_rdma_search_ram_block(rdma, block_offset, 3364 offset, size, &index, &chunk); 3365 3366 if (ret) { 3367 error_report("ram block search failed"); 3368 goto err; 3369 } 3370 3371 qemu_rdma_signal_unregister(rdma, index, chunk, 0); 3372 3373 /* 3374 * TODO: Synchronous, guaranteed unregistration (should not occur during 3375 * fast-path). Otherwise, unregisters will process on the next call to 3376 * qemu_rdma_drain_cq() 3377 if (size < 0) { 3378 qemu_rdma_unregister_waiting(rdma); 3379 } 3380 */ 3381 } 3382 3383 /* 3384 * Drain the Completion Queue if possible, but do not block, 3385 * just poll. 3386 * 3387 * If nothing to poll, the end of the iteration will do this 3388 * again to make sure we don't overflow the request queue. 3389 */ 3390 while (1) { 3391 uint64_t wr_id, wr_id_in; 3392 int ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL); 3393 if (ret < 0) { 3394 error_report("rdma migration: polling error! %d", ret); 3395 goto err; 3396 } 3397 3398 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 3399 3400 if (wr_id == RDMA_WRID_NONE) { 3401 break; 3402 } 3403 } 3404 3405 while (1) { 3406 uint64_t wr_id, wr_id_in; 3407 int ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL); 3408 if (ret < 0) { 3409 error_report("rdma migration: polling error! %d", ret); 3410 goto err; 3411 } 3412 3413 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 3414 3415 if (wr_id == RDMA_WRID_NONE) { 3416 break; 3417 } 3418 } 3419 3420 return RAM_SAVE_CONTROL_DELAYED; 3421 err: 3422 rdma->error_state = ret; 3423 return ret; 3424 } 3425 3426 static void rdma_accept_incoming_migration(void *opaque); 3427 3428 static void rdma_cm_poll_handler(void *opaque) 3429 { 3430 RDMAContext *rdma = opaque; 3431 int ret; 3432 struct rdma_cm_event *cm_event; 3433 MigrationIncomingState *mis = migration_incoming_get_current(); 3434 3435 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3436 if (ret) { 3437 error_report("get_cm_event failed %d", errno); 3438 return; 3439 } 3440 3441 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 3442 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 3443 if (!rdma->error_state && 3444 migration_incoming_get_current()->state != 3445 MIGRATION_STATUS_COMPLETED) { 3446 error_report("receive cm event, cm event is %d", cm_event->event); 3447 rdma->error_state = -EPIPE; 3448 if (rdma->return_path) { 3449 rdma->return_path->error_state = -EPIPE; 3450 } 3451 } 3452 rdma_ack_cm_event(cm_event); 3453 3454 if (mis->migration_incoming_co) { 3455 qemu_coroutine_enter(mis->migration_incoming_co); 3456 } 3457 return; 3458 } 3459 rdma_ack_cm_event(cm_event); 3460 } 3461 3462 static int qemu_rdma_accept(RDMAContext *rdma) 3463 { 3464 RDMACapabilities cap; 3465 struct rdma_conn_param conn_param = { 3466 .responder_resources = 2, 3467 .private_data = &cap, 3468 .private_data_len = sizeof(cap), 3469 }; 3470 RDMAContext *rdma_return_path = NULL; 3471 struct rdma_cm_event *cm_event; 3472 struct ibv_context *verbs; 3473 int ret = -EINVAL; 3474 int idx; 3475 3476 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3477 if (ret) { 3478 goto err_rdma_dest_wait; 3479 } 3480 3481 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 3482 rdma_ack_cm_event(cm_event); 3483 goto err_rdma_dest_wait; 3484 } 3485 3486 /* 3487 * initialize the RDMAContext for return path for postcopy after first 3488 * connection request reached. 3489 */ 3490 if (migrate_postcopy() && !rdma->is_return_path) { 3491 rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL); 3492 if (rdma_return_path == NULL) { 3493 rdma_ack_cm_event(cm_event); 3494 goto err_rdma_dest_wait; 3495 } 3496 3497 qemu_rdma_return_path_dest_init(rdma_return_path, rdma); 3498 } 3499 3500 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 3501 3502 network_to_caps(&cap); 3503 3504 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 3505 error_report("Unknown source RDMA version: %d, bailing...", 3506 cap.version); 3507 rdma_ack_cm_event(cm_event); 3508 goto err_rdma_dest_wait; 3509 } 3510 3511 /* 3512 * Respond with only the capabilities this version of QEMU knows about. 3513 */ 3514 cap.flags &= known_capabilities; 3515 3516 /* 3517 * Enable the ones that we do know about. 3518 * Add other checks here as new ones are introduced. 3519 */ 3520 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 3521 rdma->pin_all = true; 3522 } 3523 3524 rdma->cm_id = cm_event->id; 3525 verbs = cm_event->id->verbs; 3526 3527 rdma_ack_cm_event(cm_event); 3528 3529 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 3530 3531 caps_to_network(&cap); 3532 3533 trace_qemu_rdma_accept_pin_verbsc(verbs); 3534 3535 if (!rdma->verbs) { 3536 rdma->verbs = verbs; 3537 } else if (rdma->verbs != verbs) { 3538 error_report("ibv context not matching %p, %p!", rdma->verbs, 3539 verbs); 3540 goto err_rdma_dest_wait; 3541 } 3542 3543 qemu_rdma_dump_id("dest_init", verbs); 3544 3545 ret = qemu_rdma_alloc_pd_cq(rdma); 3546 if (ret) { 3547 error_report("rdma migration: error allocating pd and cq!"); 3548 goto err_rdma_dest_wait; 3549 } 3550 3551 ret = qemu_rdma_alloc_qp(rdma); 3552 if (ret) { 3553 error_report("rdma migration: error allocating qp!"); 3554 goto err_rdma_dest_wait; 3555 } 3556 3557 ret = qemu_rdma_init_ram_blocks(rdma); 3558 if (ret) { 3559 error_report("rdma migration: error initializing ram blocks!"); 3560 goto err_rdma_dest_wait; 3561 } 3562 3563 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 3564 ret = qemu_rdma_reg_control(rdma, idx); 3565 if (ret) { 3566 error_report("rdma: error registering %d control", idx); 3567 goto err_rdma_dest_wait; 3568 } 3569 } 3570 3571 /* Accept the second connection request for return path */ 3572 if (migrate_postcopy() && !rdma->is_return_path) { 3573 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3574 NULL, 3575 (void *)(intptr_t)rdma->return_path); 3576 } else { 3577 qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, 3578 NULL, rdma); 3579 } 3580 3581 ret = rdma_accept(rdma->cm_id, &conn_param); 3582 if (ret) { 3583 error_report("rdma_accept returns %d", ret); 3584 goto err_rdma_dest_wait; 3585 } 3586 3587 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3588 if (ret) { 3589 error_report("rdma_accept get_cm_event failed %d", ret); 3590 goto err_rdma_dest_wait; 3591 } 3592 3593 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 3594 error_report("rdma_accept not event established"); 3595 rdma_ack_cm_event(cm_event); 3596 goto err_rdma_dest_wait; 3597 } 3598 3599 rdma_ack_cm_event(cm_event); 3600 rdma->connected = true; 3601 3602 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 3603 if (ret) { 3604 error_report("rdma migration: error posting second control recv"); 3605 goto err_rdma_dest_wait; 3606 } 3607 3608 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 3609 3610 return 0; 3611 3612 err_rdma_dest_wait: 3613 rdma->error_state = ret; 3614 qemu_rdma_cleanup(rdma); 3615 g_free(rdma_return_path); 3616 return ret; 3617 } 3618 3619 static int dest_ram_sort_func(const void *a, const void *b) 3620 { 3621 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index; 3622 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index; 3623 3624 return (a_index < b_index) ? -1 : (a_index != b_index); 3625 } 3626 3627 /* 3628 * During each iteration of the migration, we listen for instructions 3629 * by the source VM to perform dynamic page registrations before they 3630 * can perform RDMA operations. 3631 * 3632 * We respond with the 'rkey'. 3633 * 3634 * Keep doing this until the source tells us to stop. 3635 */ 3636 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) 3637 { 3638 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 3639 .type = RDMA_CONTROL_REGISTER_RESULT, 3640 .repeat = 0, 3641 }; 3642 RDMAControlHeader unreg_resp = { .len = 0, 3643 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 3644 .repeat = 0, 3645 }; 3646 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 3647 .repeat = 1 }; 3648 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3649 RDMAContext *rdma; 3650 RDMALocalBlocks *local; 3651 RDMAControlHeader head; 3652 RDMARegister *reg, *registers; 3653 RDMACompress *comp; 3654 RDMARegisterResult *reg_result; 3655 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 3656 RDMALocalBlock *block; 3657 void *host_addr; 3658 int ret = 0; 3659 int idx = 0; 3660 int count = 0; 3661 int i = 0; 3662 3663 RCU_READ_LOCK_GUARD(); 3664 rdma = qatomic_rcu_read(&rioc->rdmain); 3665 3666 if (!rdma) { 3667 return -EIO; 3668 } 3669 3670 CHECK_ERROR_STATE(); 3671 3672 local = &rdma->local_ram_blocks; 3673 do { 3674 trace_qemu_rdma_registration_handle_wait(); 3675 3676 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE); 3677 3678 if (ret < 0) { 3679 break; 3680 } 3681 3682 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 3683 error_report("rdma: Too many requests in this message (%d)." 3684 "Bailing.", head.repeat); 3685 ret = -EIO; 3686 break; 3687 } 3688 3689 switch (head.type) { 3690 case RDMA_CONTROL_COMPRESS: 3691 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 3692 network_to_compress(comp); 3693 3694 trace_qemu_rdma_registration_handle_compress(comp->length, 3695 comp->block_idx, 3696 comp->offset); 3697 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) { 3698 error_report("rdma: 'compress' bad block index %u (vs %d)", 3699 (unsigned int)comp->block_idx, 3700 rdma->local_ram_blocks.nb_blocks); 3701 ret = -EIO; 3702 goto out; 3703 } 3704 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 3705 3706 host_addr = block->local_host_addr + 3707 (comp->offset - block->offset); 3708 3709 ram_handle_compressed(host_addr, comp->value, comp->length); 3710 break; 3711 3712 case RDMA_CONTROL_REGISTER_FINISHED: 3713 trace_qemu_rdma_registration_handle_finished(); 3714 goto out; 3715 3716 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 3717 trace_qemu_rdma_registration_handle_ram_blocks(); 3718 3719 /* Sort our local RAM Block list so it's the same as the source, 3720 * we can do this since we've filled in a src_index in the list 3721 * as we received the RAMBlock list earlier. 3722 */ 3723 qsort(rdma->local_ram_blocks.block, 3724 rdma->local_ram_blocks.nb_blocks, 3725 sizeof(RDMALocalBlock), dest_ram_sort_func); 3726 for (i = 0; i < local->nb_blocks; i++) { 3727 local->block[i].index = i; 3728 } 3729 3730 if (rdma->pin_all) { 3731 ret = qemu_rdma_reg_whole_ram_blocks(rdma); 3732 if (ret) { 3733 error_report("rdma migration: error dest " 3734 "registering ram blocks"); 3735 goto out; 3736 } 3737 } 3738 3739 /* 3740 * Dest uses this to prepare to transmit the RAMBlock descriptions 3741 * to the source VM after connection setup. 3742 * Both sides use the "remote" structure to communicate and update 3743 * their "local" descriptions with what was sent. 3744 */ 3745 for (i = 0; i < local->nb_blocks; i++) { 3746 rdma->dest_blocks[i].remote_host_addr = 3747 (uintptr_t)(local->block[i].local_host_addr); 3748 3749 if (rdma->pin_all) { 3750 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 3751 } 3752 3753 rdma->dest_blocks[i].offset = local->block[i].offset; 3754 rdma->dest_blocks[i].length = local->block[i].length; 3755 3756 dest_block_to_network(&rdma->dest_blocks[i]); 3757 trace_qemu_rdma_registration_handle_ram_blocks_loop( 3758 local->block[i].block_name, 3759 local->block[i].offset, 3760 local->block[i].length, 3761 local->block[i].local_host_addr, 3762 local->block[i].src_index); 3763 } 3764 3765 blocks.len = rdma->local_ram_blocks.nb_blocks 3766 * sizeof(RDMADestBlock); 3767 3768 3769 ret = qemu_rdma_post_send_control(rdma, 3770 (uint8_t *) rdma->dest_blocks, &blocks); 3771 3772 if (ret < 0) { 3773 error_report("rdma migration: error sending remote info"); 3774 goto out; 3775 } 3776 3777 break; 3778 case RDMA_CONTROL_REGISTER_REQUEST: 3779 trace_qemu_rdma_registration_handle_register(head.repeat); 3780 3781 reg_resp.repeat = head.repeat; 3782 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3783 3784 for (count = 0; count < head.repeat; count++) { 3785 uint64_t chunk; 3786 uint8_t *chunk_start, *chunk_end; 3787 3788 reg = ®isters[count]; 3789 network_to_register(reg); 3790 3791 reg_result = &results[count]; 3792 3793 trace_qemu_rdma_registration_handle_register_loop(count, 3794 reg->current_index, reg->key.current_addr, reg->chunks); 3795 3796 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) { 3797 error_report("rdma: 'register' bad block index %u (vs %d)", 3798 (unsigned int)reg->current_index, 3799 rdma->local_ram_blocks.nb_blocks); 3800 ret = -ENOENT; 3801 goto out; 3802 } 3803 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3804 if (block->is_ram_block) { 3805 if (block->offset > reg->key.current_addr) { 3806 error_report("rdma: bad register address for block %s" 3807 " offset: %" PRIx64 " current_addr: %" PRIx64, 3808 block->block_name, block->offset, 3809 reg->key.current_addr); 3810 ret = -ERANGE; 3811 goto out; 3812 } 3813 host_addr = (block->local_host_addr + 3814 (reg->key.current_addr - block->offset)); 3815 chunk = ram_chunk_index(block->local_host_addr, 3816 (uint8_t *) host_addr); 3817 } else { 3818 chunk = reg->key.chunk; 3819 host_addr = block->local_host_addr + 3820 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3821 /* Check for particularly bad chunk value */ 3822 if (host_addr < (void *)block->local_host_addr) { 3823 error_report("rdma: bad chunk for block %s" 3824 " chunk: %" PRIx64, 3825 block->block_name, reg->key.chunk); 3826 ret = -ERANGE; 3827 goto out; 3828 } 3829 } 3830 chunk_start = ram_chunk_start(block, chunk); 3831 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3832 /* avoid "-Waddress-of-packed-member" warning */ 3833 uint32_t tmp_rkey = 0; 3834 if (qemu_rdma_register_and_get_keys(rdma, block, 3835 (uintptr_t)host_addr, NULL, &tmp_rkey, 3836 chunk, chunk_start, chunk_end)) { 3837 error_report("cannot get rkey"); 3838 ret = -EINVAL; 3839 goto out; 3840 } 3841 reg_result->rkey = tmp_rkey; 3842 3843 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3844 3845 trace_qemu_rdma_registration_handle_register_rkey( 3846 reg_result->rkey); 3847 3848 result_to_network(reg_result); 3849 } 3850 3851 ret = qemu_rdma_post_send_control(rdma, 3852 (uint8_t *) results, ®_resp); 3853 3854 if (ret < 0) { 3855 error_report("Failed to send control buffer"); 3856 goto out; 3857 } 3858 break; 3859 case RDMA_CONTROL_UNREGISTER_REQUEST: 3860 trace_qemu_rdma_registration_handle_unregister(head.repeat); 3861 unreg_resp.repeat = head.repeat; 3862 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3863 3864 for (count = 0; count < head.repeat; count++) { 3865 reg = ®isters[count]; 3866 network_to_register(reg); 3867 3868 trace_qemu_rdma_registration_handle_unregister_loop(count, 3869 reg->current_index, reg->key.chunk); 3870 3871 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3872 3873 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3874 block->pmr[reg->key.chunk] = NULL; 3875 3876 if (ret != 0) { 3877 perror("rdma unregistration chunk failed"); 3878 ret = -ret; 3879 goto out; 3880 } 3881 3882 rdma->total_registrations--; 3883 3884 trace_qemu_rdma_registration_handle_unregister_success( 3885 reg->key.chunk); 3886 } 3887 3888 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp); 3889 3890 if (ret < 0) { 3891 error_report("Failed to send control buffer"); 3892 goto out; 3893 } 3894 break; 3895 case RDMA_CONTROL_REGISTER_RESULT: 3896 error_report("Invalid RESULT message at dest."); 3897 ret = -EIO; 3898 goto out; 3899 default: 3900 error_report("Unknown control message %s", control_desc(head.type)); 3901 ret = -EIO; 3902 goto out; 3903 } 3904 } while (1); 3905 out: 3906 if (ret < 0) { 3907 rdma->error_state = ret; 3908 } 3909 return ret; 3910 } 3911 3912 /* Destination: 3913 * Called via a ram_control_load_hook during the initial RAM load section which 3914 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks 3915 * on the source. 3916 * We've already built our local RAMBlock list, but not yet sent the list to 3917 * the source. 3918 */ 3919 static int 3920 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) 3921 { 3922 RDMAContext *rdma; 3923 int curr; 3924 int found = -1; 3925 3926 RCU_READ_LOCK_GUARD(); 3927 rdma = qatomic_rcu_read(&rioc->rdmain); 3928 3929 if (!rdma) { 3930 return -EIO; 3931 } 3932 3933 /* Find the matching RAMBlock in our local list */ 3934 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { 3935 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { 3936 found = curr; 3937 break; 3938 } 3939 } 3940 3941 if (found == -1) { 3942 error_report("RAMBlock '%s' not found on destination", name); 3943 return -ENOENT; 3944 } 3945 3946 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index; 3947 trace_rdma_block_notification_handle(name, rdma->next_src_index); 3948 rdma->next_src_index++; 3949 3950 return 0; 3951 } 3952 3953 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) 3954 { 3955 switch (flags) { 3956 case RAM_CONTROL_BLOCK_REG: 3957 return rdma_block_notification_handle(opaque, data); 3958 3959 case RAM_CONTROL_HOOK: 3960 return qemu_rdma_registration_handle(f, opaque); 3961 3962 default: 3963 /* Shouldn't be called with any other values */ 3964 abort(); 3965 } 3966 } 3967 3968 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, 3969 uint64_t flags, void *data) 3970 { 3971 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3972 RDMAContext *rdma; 3973 3974 RCU_READ_LOCK_GUARD(); 3975 rdma = qatomic_rcu_read(&rioc->rdmaout); 3976 if (!rdma) { 3977 return -EIO; 3978 } 3979 3980 CHECK_ERROR_STATE(); 3981 3982 if (migration_in_postcopy()) { 3983 return 0; 3984 } 3985 3986 trace_qemu_rdma_registration_start(flags); 3987 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3988 qemu_fflush(f); 3989 3990 return 0; 3991 } 3992 3993 /* 3994 * Inform dest that dynamic registrations are done for now. 3995 * First, flush writes, if any. 3996 */ 3997 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, 3998 uint64_t flags, void *data) 3999 { 4000 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 4001 RDMAContext *rdma; 4002 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 4003 int ret = 0; 4004 4005 RCU_READ_LOCK_GUARD(); 4006 rdma = qatomic_rcu_read(&rioc->rdmaout); 4007 if (!rdma) { 4008 return -EIO; 4009 } 4010 4011 CHECK_ERROR_STATE(); 4012 4013 if (migration_in_postcopy()) { 4014 return 0; 4015 } 4016 4017 qemu_fflush(f); 4018 ret = qemu_rdma_drain_cq(f, rdma); 4019 4020 if (ret < 0) { 4021 goto err; 4022 } 4023 4024 if (flags == RAM_CONTROL_SETUP) { 4025 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 4026 RDMALocalBlocks *local = &rdma->local_ram_blocks; 4027 int reg_result_idx, i, nb_dest_blocks; 4028 4029 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 4030 trace_qemu_rdma_registration_stop_ram(); 4031 4032 /* 4033 * Make sure that we parallelize the pinning on both sides. 4034 * For very large guests, doing this serially takes a really 4035 * long time, so we have to 'interleave' the pinning locally 4036 * with the control messages by performing the pinning on this 4037 * side before we receive the control response from the other 4038 * side that the pinning has completed. 4039 */ 4040 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 4041 ®_result_idx, rdma->pin_all ? 4042 qemu_rdma_reg_whole_ram_blocks : NULL); 4043 if (ret < 0) { 4044 fprintf(stderr, "receiving remote info!"); 4045 return ret; 4046 } 4047 4048 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 4049 4050 /* 4051 * The protocol uses two different sets of rkeys (mutually exclusive): 4052 * 1. One key to represent the virtual address of the entire ram block. 4053 * (dynamic chunk registration disabled - pin everything with one rkey.) 4054 * 2. One to represent individual chunks within a ram block. 4055 * (dynamic chunk registration enabled - pin individual chunks.) 4056 * 4057 * Once the capability is successfully negotiated, the destination transmits 4058 * the keys to use (or sends them later) including the virtual addresses 4059 * and then propagates the remote ram block descriptions to his local copy. 4060 */ 4061 4062 if (local->nb_blocks != nb_dest_blocks) { 4063 fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) " 4064 "Your QEMU command line parameters are probably " 4065 "not identical on both the source and destination.", 4066 local->nb_blocks, nb_dest_blocks); 4067 rdma->error_state = -EINVAL; 4068 return -EINVAL; 4069 } 4070 4071 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 4072 memcpy(rdma->dest_blocks, 4073 rdma->wr_data[reg_result_idx].control_curr, resp.len); 4074 for (i = 0; i < nb_dest_blocks; i++) { 4075 network_to_dest_block(&rdma->dest_blocks[i]); 4076 4077 /* We require that the blocks are in the same order */ 4078 if (rdma->dest_blocks[i].length != local->block[i].length) { 4079 fprintf(stderr, "Block %s/%d has a different length %" PRIu64 4080 "vs %" PRIu64, local->block[i].block_name, i, 4081 local->block[i].length, 4082 rdma->dest_blocks[i].length); 4083 rdma->error_state = -EINVAL; 4084 return -EINVAL; 4085 } 4086 local->block[i].remote_host_addr = 4087 rdma->dest_blocks[i].remote_host_addr; 4088 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; 4089 } 4090 } 4091 4092 trace_qemu_rdma_registration_stop(flags); 4093 4094 head.type = RDMA_CONTROL_REGISTER_FINISHED; 4095 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); 4096 4097 if (ret < 0) { 4098 goto err; 4099 } 4100 4101 return 0; 4102 err: 4103 rdma->error_state = ret; 4104 return ret; 4105 } 4106 4107 static const QEMUFileHooks rdma_read_hooks = { 4108 .hook_ram_load = rdma_load_hook, 4109 }; 4110 4111 static const QEMUFileHooks rdma_write_hooks = { 4112 .before_ram_iterate = qemu_rdma_registration_start, 4113 .after_ram_iterate = qemu_rdma_registration_stop, 4114 .save_page = qemu_rdma_save_page, 4115 }; 4116 4117 4118 static void qio_channel_rdma_finalize(Object *obj) 4119 { 4120 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); 4121 if (rioc->rdmain) { 4122 qemu_rdma_cleanup(rioc->rdmain); 4123 g_free(rioc->rdmain); 4124 rioc->rdmain = NULL; 4125 } 4126 if (rioc->rdmaout) { 4127 qemu_rdma_cleanup(rioc->rdmaout); 4128 g_free(rioc->rdmaout); 4129 rioc->rdmaout = NULL; 4130 } 4131 } 4132 4133 static void qio_channel_rdma_class_init(ObjectClass *klass, 4134 void *class_data G_GNUC_UNUSED) 4135 { 4136 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 4137 4138 ioc_klass->io_writev = qio_channel_rdma_writev; 4139 ioc_klass->io_readv = qio_channel_rdma_readv; 4140 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; 4141 ioc_klass->io_close = qio_channel_rdma_close; 4142 ioc_klass->io_create_watch = qio_channel_rdma_create_watch; 4143 ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler; 4144 ioc_klass->io_shutdown = qio_channel_rdma_shutdown; 4145 } 4146 4147 static const TypeInfo qio_channel_rdma_info = { 4148 .parent = TYPE_QIO_CHANNEL, 4149 .name = TYPE_QIO_CHANNEL_RDMA, 4150 .instance_size = sizeof(QIOChannelRDMA), 4151 .instance_finalize = qio_channel_rdma_finalize, 4152 .class_init = qio_channel_rdma_class_init, 4153 }; 4154 4155 static void qio_channel_rdma_register_types(void) 4156 { 4157 type_register_static(&qio_channel_rdma_info); 4158 } 4159 4160 type_init(qio_channel_rdma_register_types); 4161 4162 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) 4163 { 4164 QIOChannelRDMA *rioc; 4165 4166 if (qemu_file_mode_is_not_valid(mode)) { 4167 return NULL; 4168 } 4169 4170 rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); 4171 4172 if (mode[0] == 'w') { 4173 rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); 4174 rioc->rdmaout = rdma; 4175 rioc->rdmain = rdma->return_path; 4176 qemu_file_set_hooks(rioc->file, &rdma_write_hooks); 4177 } else { 4178 rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc)); 4179 rioc->rdmain = rdma; 4180 rioc->rdmaout = rdma->return_path; 4181 qemu_file_set_hooks(rioc->file, &rdma_read_hooks); 4182 } 4183 4184 return rioc->file; 4185 } 4186 4187 static void rdma_accept_incoming_migration(void *opaque) 4188 { 4189 RDMAContext *rdma = opaque; 4190 int ret; 4191 QEMUFile *f; 4192 Error *local_err = NULL; 4193 4194 trace_qemu_rdma_accept_incoming_migration(); 4195 ret = qemu_rdma_accept(rdma); 4196 4197 if (ret) { 4198 fprintf(stderr, "RDMA ERROR: Migration initialization failed\n"); 4199 return; 4200 } 4201 4202 trace_qemu_rdma_accept_incoming_migration_accepted(); 4203 4204 if (rdma->is_return_path) { 4205 return; 4206 } 4207 4208 f = qemu_fopen_rdma(rdma, "rb"); 4209 if (f == NULL) { 4210 fprintf(stderr, "RDMA ERROR: could not qemu_fopen_rdma\n"); 4211 qemu_rdma_cleanup(rdma); 4212 return; 4213 } 4214 4215 rdma->migration_started_on_destination = 1; 4216 migration_fd_process_incoming(f, &local_err); 4217 if (local_err) { 4218 error_reportf_err(local_err, "RDMA ERROR:"); 4219 } 4220 } 4221 4222 void rdma_start_incoming_migration(const char *host_port, Error **errp) 4223 { 4224 int ret; 4225 RDMAContext *rdma, *rdma_return_path = NULL; 4226 Error *local_err = NULL; 4227 4228 trace_rdma_start_incoming_migration(); 4229 4230 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 4231 if (ram_block_discard_is_required()) { 4232 error_setg(errp, "RDMA: cannot disable RAM discard"); 4233 return; 4234 } 4235 4236 rdma = qemu_rdma_data_init(host_port, &local_err); 4237 if (rdma == NULL) { 4238 goto err; 4239 } 4240 4241 ret = qemu_rdma_dest_init(rdma, &local_err); 4242 4243 if (ret) { 4244 goto err; 4245 } 4246 4247 trace_rdma_start_incoming_migration_after_dest_init(); 4248 4249 ret = rdma_listen(rdma->listen_id, 5); 4250 4251 if (ret) { 4252 ERROR(errp, "listening on socket!"); 4253 goto cleanup_rdma; 4254 } 4255 4256 trace_rdma_start_incoming_migration_after_rdma_listen(); 4257 4258 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 4259 NULL, (void *)(intptr_t)rdma); 4260 return; 4261 4262 cleanup_rdma: 4263 qemu_rdma_cleanup(rdma); 4264 err: 4265 error_propagate(errp, local_err); 4266 if (rdma) { 4267 g_free(rdma->host); 4268 g_free(rdma->host_port); 4269 } 4270 g_free(rdma); 4271 g_free(rdma_return_path); 4272 } 4273 4274 void rdma_start_outgoing_migration(void *opaque, 4275 const char *host_port, Error **errp) 4276 { 4277 MigrationState *s = opaque; 4278 RDMAContext *rdma_return_path = NULL; 4279 RDMAContext *rdma; 4280 int ret = 0; 4281 4282 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 4283 if (ram_block_discard_is_required()) { 4284 error_setg(errp, "RDMA: cannot disable RAM discard"); 4285 return; 4286 } 4287 4288 rdma = qemu_rdma_data_init(host_port, errp); 4289 if (rdma == NULL) { 4290 goto err; 4291 } 4292 4293 ret = qemu_rdma_source_init(rdma, 4294 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp); 4295 4296 if (ret) { 4297 goto err; 4298 } 4299 4300 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 4301 ret = qemu_rdma_connect(rdma, errp, false); 4302 4303 if (ret) { 4304 goto err; 4305 } 4306 4307 /* RDMA postcopy need a separate queue pair for return path */ 4308 if (migrate_postcopy()) { 4309 rdma_return_path = qemu_rdma_data_init(host_port, errp); 4310 4311 if (rdma_return_path == NULL) { 4312 goto return_path_err; 4313 } 4314 4315 ret = qemu_rdma_source_init(rdma_return_path, 4316 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp); 4317 4318 if (ret) { 4319 goto return_path_err; 4320 } 4321 4322 ret = qemu_rdma_connect(rdma_return_path, errp, true); 4323 4324 if (ret) { 4325 goto return_path_err; 4326 } 4327 4328 rdma->return_path = rdma_return_path; 4329 rdma_return_path->return_path = rdma; 4330 rdma_return_path->is_return_path = true; 4331 } 4332 4333 trace_rdma_start_outgoing_migration_after_rdma_connect(); 4334 4335 s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); 4336 migrate_fd_connect(s, NULL); 4337 return; 4338 return_path_err: 4339 qemu_rdma_cleanup(rdma); 4340 err: 4341 g_free(rdma); 4342 g_free(rdma_return_path); 4343 } 4344