1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * Copyright Red Hat, Inc. 2015-2016 6 * 7 * Authors: 8 * Michael R. Hines <mrhines@us.ibm.com> 9 * Jiuxing Liu <jl@us.ibm.com> 10 * Daniel P. Berrange <berrange@redhat.com> 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or 13 * later. See the COPYING file in the top-level directory. 14 * 15 */ 16 17 #include "qemu/osdep.h" 18 #include "qapi/error.h" 19 #include "qemu/cutils.h" 20 #include "rdma.h" 21 #include "migration.h" 22 #include "qemu-file.h" 23 #include "ram.h" 24 #include "qemu-file-channel.h" 25 #include "qemu/error-report.h" 26 #include "qemu/main-loop.h" 27 #include "qemu/module.h" 28 #include "qemu/rcu.h" 29 #include "qemu/sockets.h" 30 #include "qemu/bitmap.h" 31 #include "qemu/coroutine.h" 32 #include "exec/memory.h" 33 #include <sys/socket.h> 34 #include <netdb.h> 35 #include <arpa/inet.h> 36 #include <rdma/rdma_cma.h> 37 #include "trace.h" 38 #include "qom/object.h" 39 #include <poll.h> 40 41 /* 42 * Print and error on both the Monitor and the Log file. 43 */ 44 #define ERROR(errp, fmt, ...) \ 45 do { \ 46 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \ 47 if (errp && (*(errp) == NULL)) { \ 48 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \ 49 } \ 50 } while (0) 51 52 #define RDMA_RESOLVE_TIMEOUT_MS 10000 53 54 /* Do not merge data if larger than this. */ 55 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 56 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 57 58 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 59 60 /* 61 * This is only for non-live state being migrated. 62 * Instead of RDMA_WRITE messages, we use RDMA_SEND 63 * messages for that state, which requires a different 64 * delivery design than main memory. 65 */ 66 #define RDMA_SEND_INCREMENT 32768 67 68 /* 69 * Maximum size infiniband SEND message 70 */ 71 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 72 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 73 74 #define RDMA_CONTROL_VERSION_CURRENT 1 75 /* 76 * Capabilities for negotiation. 77 */ 78 #define RDMA_CAPABILITY_PIN_ALL 0x01 79 80 /* 81 * Add the other flags above to this list of known capabilities 82 * as they are introduced. 83 */ 84 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 85 86 #define CHECK_ERROR_STATE() \ 87 do { \ 88 if (rdma->error_state) { \ 89 if (!rdma->error_reported) { \ 90 error_report("RDMA is in an error state waiting migration" \ 91 " to abort!"); \ 92 rdma->error_reported = 1; \ 93 } \ 94 return rdma->error_state; \ 95 } \ 96 } while (0) 97 98 /* 99 * A work request ID is 64-bits and we split up these bits 100 * into 3 parts: 101 * 102 * bits 0-15 : type of control message, 2^16 103 * bits 16-29: ram block index, 2^14 104 * bits 30-63: ram block chunk number, 2^34 105 * 106 * The last two bit ranges are only used for RDMA writes, 107 * in order to track their completion and potentially 108 * also track unregistration status of the message. 109 */ 110 #define RDMA_WRID_TYPE_SHIFT 0UL 111 #define RDMA_WRID_BLOCK_SHIFT 16UL 112 #define RDMA_WRID_CHUNK_SHIFT 30UL 113 114 #define RDMA_WRID_TYPE_MASK \ 115 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 116 117 #define RDMA_WRID_BLOCK_MASK \ 118 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 119 120 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 121 122 /* 123 * RDMA migration protocol: 124 * 1. RDMA Writes (data messages, i.e. RAM) 125 * 2. IB Send/Recv (control channel messages) 126 */ 127 enum { 128 RDMA_WRID_NONE = 0, 129 RDMA_WRID_RDMA_WRITE = 1, 130 RDMA_WRID_SEND_CONTROL = 2000, 131 RDMA_WRID_RECV_CONTROL = 4000, 132 }; 133 134 static const char *wrid_desc[] = { 135 [RDMA_WRID_NONE] = "NONE", 136 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA", 137 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND", 138 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", 139 }; 140 141 /* 142 * Work request IDs for IB SEND messages only (not RDMA writes). 143 * This is used by the migration protocol to transmit 144 * control messages (such as device state and registration commands) 145 * 146 * We could use more WRs, but we have enough for now. 147 */ 148 enum { 149 RDMA_WRID_READY = 0, 150 RDMA_WRID_DATA, 151 RDMA_WRID_CONTROL, 152 RDMA_WRID_MAX, 153 }; 154 155 /* 156 * SEND/RECV IB Control Messages. 157 */ 158 enum { 159 RDMA_CONTROL_NONE = 0, 160 RDMA_CONTROL_ERROR, 161 RDMA_CONTROL_READY, /* ready to receive */ 162 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 163 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 164 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 165 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 166 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 167 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 168 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 169 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 170 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 171 }; 172 173 174 /* 175 * Memory and MR structures used to represent an IB Send/Recv work request. 176 * This is *not* used for RDMA writes, only IB Send/Recv. 177 */ 178 typedef struct { 179 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 180 struct ibv_mr *control_mr; /* registration metadata */ 181 size_t control_len; /* length of the message */ 182 uint8_t *control_curr; /* start of unconsumed bytes */ 183 } RDMAWorkRequestData; 184 185 /* 186 * Negotiate RDMA capabilities during connection-setup time. 187 */ 188 typedef struct { 189 uint32_t version; 190 uint32_t flags; 191 } RDMACapabilities; 192 193 static void caps_to_network(RDMACapabilities *cap) 194 { 195 cap->version = htonl(cap->version); 196 cap->flags = htonl(cap->flags); 197 } 198 199 static void network_to_caps(RDMACapabilities *cap) 200 { 201 cap->version = ntohl(cap->version); 202 cap->flags = ntohl(cap->flags); 203 } 204 205 /* 206 * Representation of a RAMBlock from an RDMA perspective. 207 * This is not transmitted, only local. 208 * This and subsequent structures cannot be linked lists 209 * because we're using a single IB message to transmit 210 * the information. It's small anyway, so a list is overkill. 211 */ 212 typedef struct RDMALocalBlock { 213 char *block_name; 214 uint8_t *local_host_addr; /* local virtual address */ 215 uint64_t remote_host_addr; /* remote virtual address */ 216 uint64_t offset; 217 uint64_t length; 218 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 219 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 220 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 221 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 222 int index; /* which block are we */ 223 unsigned int src_index; /* (Only used on dest) */ 224 bool is_ram_block; 225 int nb_chunks; 226 unsigned long *transit_bitmap; 227 unsigned long *unregister_bitmap; 228 } RDMALocalBlock; 229 230 /* 231 * Also represents a RAMblock, but only on the dest. 232 * This gets transmitted by the dest during connection-time 233 * to the source VM and then is used to populate the 234 * corresponding RDMALocalBlock with 235 * the information needed to perform the actual RDMA. 236 */ 237 typedef struct QEMU_PACKED RDMADestBlock { 238 uint64_t remote_host_addr; 239 uint64_t offset; 240 uint64_t length; 241 uint32_t remote_rkey; 242 uint32_t padding; 243 } RDMADestBlock; 244 245 static const char *control_desc(unsigned int rdma_control) 246 { 247 static const char *strs[] = { 248 [RDMA_CONTROL_NONE] = "NONE", 249 [RDMA_CONTROL_ERROR] = "ERROR", 250 [RDMA_CONTROL_READY] = "READY", 251 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 252 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 253 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 254 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 255 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 256 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 257 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 258 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 259 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 260 }; 261 262 if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) { 263 return "??BAD CONTROL VALUE??"; 264 } 265 266 return strs[rdma_control]; 267 } 268 269 static uint64_t htonll(uint64_t v) 270 { 271 union { uint32_t lv[2]; uint64_t llv; } u; 272 u.lv[0] = htonl(v >> 32); 273 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 274 return u.llv; 275 } 276 277 static uint64_t ntohll(uint64_t v) 278 { 279 union { uint32_t lv[2]; uint64_t llv; } u; 280 u.llv = v; 281 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 282 } 283 284 static void dest_block_to_network(RDMADestBlock *db) 285 { 286 db->remote_host_addr = htonll(db->remote_host_addr); 287 db->offset = htonll(db->offset); 288 db->length = htonll(db->length); 289 db->remote_rkey = htonl(db->remote_rkey); 290 } 291 292 static void network_to_dest_block(RDMADestBlock *db) 293 { 294 db->remote_host_addr = ntohll(db->remote_host_addr); 295 db->offset = ntohll(db->offset); 296 db->length = ntohll(db->length); 297 db->remote_rkey = ntohl(db->remote_rkey); 298 } 299 300 /* 301 * Virtual address of the above structures used for transmitting 302 * the RAMBlock descriptions at connection-time. 303 * This structure is *not* transmitted. 304 */ 305 typedef struct RDMALocalBlocks { 306 int nb_blocks; 307 bool init; /* main memory init complete */ 308 RDMALocalBlock *block; 309 } RDMALocalBlocks; 310 311 /* 312 * Main data structure for RDMA state. 313 * While there is only one copy of this structure being allocated right now, 314 * this is the place where one would start if you wanted to consider 315 * having more than one RDMA connection open at the same time. 316 */ 317 typedef struct RDMAContext { 318 char *host; 319 int port; 320 char *host_port; 321 322 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 323 324 /* 325 * This is used by *_exchange_send() to figure out whether or not 326 * the initial "READY" message has already been received or not. 327 * This is because other functions may potentially poll() and detect 328 * the READY message before send() does, in which case we need to 329 * know if it completed. 330 */ 331 int control_ready_expected; 332 333 /* number of outstanding writes */ 334 int nb_sent; 335 336 /* store info about current buffer so that we can 337 merge it with future sends */ 338 uint64_t current_addr; 339 uint64_t current_length; 340 /* index of ram block the current buffer belongs to */ 341 int current_index; 342 /* index of the chunk in the current ram block */ 343 int current_chunk; 344 345 bool pin_all; 346 347 /* 348 * infiniband-specific variables for opening the device 349 * and maintaining connection state and so forth. 350 * 351 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 352 * cm_id->verbs, cm_id->channel, and cm_id->qp. 353 */ 354 struct rdma_cm_id *cm_id; /* connection manager ID */ 355 struct rdma_cm_id *listen_id; 356 bool connected; 357 358 struct ibv_context *verbs; 359 struct rdma_event_channel *channel; 360 struct ibv_qp *qp; /* queue pair */ 361 struct ibv_comp_channel *comp_channel; /* completion channel */ 362 struct ibv_pd *pd; /* protection domain */ 363 struct ibv_cq *cq; /* completion queue */ 364 365 /* 366 * If a previous write failed (perhaps because of a failed 367 * memory registration, then do not attempt any future work 368 * and remember the error state. 369 */ 370 int error_state; 371 int error_reported; 372 int received_error; 373 374 /* 375 * Description of ram blocks used throughout the code. 376 */ 377 RDMALocalBlocks local_ram_blocks; 378 RDMADestBlock *dest_blocks; 379 380 /* Index of the next RAMBlock received during block registration */ 381 unsigned int next_src_index; 382 383 /* 384 * Migration on *destination* started. 385 * Then use coroutine yield function. 386 * Source runs in a thread, so we don't care. 387 */ 388 int migration_started_on_destination; 389 390 int total_registrations; 391 int total_writes; 392 393 int unregister_current, unregister_next; 394 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 395 396 GHashTable *blockmap; 397 398 /* the RDMAContext for return path */ 399 struct RDMAContext *return_path; 400 bool is_return_path; 401 } RDMAContext; 402 403 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" 404 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA) 405 406 407 408 struct QIOChannelRDMA { 409 QIOChannel parent; 410 RDMAContext *rdmain; 411 RDMAContext *rdmaout; 412 QEMUFile *file; 413 bool blocking; /* XXX we don't actually honour this yet */ 414 }; 415 416 /* 417 * Main structure for IB Send/Recv control messages. 418 * This gets prepended at the beginning of every Send/Recv. 419 */ 420 typedef struct QEMU_PACKED { 421 uint32_t len; /* Total length of data portion */ 422 uint32_t type; /* which control command to perform */ 423 uint32_t repeat; /* number of commands in data portion of same type */ 424 uint32_t padding; 425 } RDMAControlHeader; 426 427 static void control_to_network(RDMAControlHeader *control) 428 { 429 control->type = htonl(control->type); 430 control->len = htonl(control->len); 431 control->repeat = htonl(control->repeat); 432 } 433 434 static void network_to_control(RDMAControlHeader *control) 435 { 436 control->type = ntohl(control->type); 437 control->len = ntohl(control->len); 438 control->repeat = ntohl(control->repeat); 439 } 440 441 /* 442 * Register a single Chunk. 443 * Information sent by the source VM to inform the dest 444 * to register an single chunk of memory before we can perform 445 * the actual RDMA operation. 446 */ 447 typedef struct QEMU_PACKED { 448 union QEMU_PACKED { 449 uint64_t current_addr; /* offset into the ram_addr_t space */ 450 uint64_t chunk; /* chunk to lookup if unregistering */ 451 } key; 452 uint32_t current_index; /* which ramblock the chunk belongs to */ 453 uint32_t padding; 454 uint64_t chunks; /* how many sequential chunks to register */ 455 } RDMARegister; 456 457 static void register_to_network(RDMAContext *rdma, RDMARegister *reg) 458 { 459 RDMALocalBlock *local_block; 460 local_block = &rdma->local_ram_blocks.block[reg->current_index]; 461 462 if (local_block->is_ram_block) { 463 /* 464 * current_addr as passed in is an address in the local ram_addr_t 465 * space, we need to translate this for the destination 466 */ 467 reg->key.current_addr -= local_block->offset; 468 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset; 469 } 470 reg->key.current_addr = htonll(reg->key.current_addr); 471 reg->current_index = htonl(reg->current_index); 472 reg->chunks = htonll(reg->chunks); 473 } 474 475 static void network_to_register(RDMARegister *reg) 476 { 477 reg->key.current_addr = ntohll(reg->key.current_addr); 478 reg->current_index = ntohl(reg->current_index); 479 reg->chunks = ntohll(reg->chunks); 480 } 481 482 typedef struct QEMU_PACKED { 483 uint32_t value; /* if zero, we will madvise() */ 484 uint32_t block_idx; /* which ram block index */ 485 uint64_t offset; /* Address in remote ram_addr_t space */ 486 uint64_t length; /* length of the chunk */ 487 } RDMACompress; 488 489 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp) 490 { 491 comp->value = htonl(comp->value); 492 /* 493 * comp->offset as passed in is an address in the local ram_addr_t 494 * space, we need to translate this for the destination 495 */ 496 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset; 497 comp->offset += rdma->dest_blocks[comp->block_idx].offset; 498 comp->block_idx = htonl(comp->block_idx); 499 comp->offset = htonll(comp->offset); 500 comp->length = htonll(comp->length); 501 } 502 503 static void network_to_compress(RDMACompress *comp) 504 { 505 comp->value = ntohl(comp->value); 506 comp->block_idx = ntohl(comp->block_idx); 507 comp->offset = ntohll(comp->offset); 508 comp->length = ntohll(comp->length); 509 } 510 511 /* 512 * The result of the dest's memory registration produces an "rkey" 513 * which the source VM must reference in order to perform 514 * the RDMA operation. 515 */ 516 typedef struct QEMU_PACKED { 517 uint32_t rkey; 518 uint32_t padding; 519 uint64_t host_addr; 520 } RDMARegisterResult; 521 522 static void result_to_network(RDMARegisterResult *result) 523 { 524 result->rkey = htonl(result->rkey); 525 result->host_addr = htonll(result->host_addr); 526 }; 527 528 static void network_to_result(RDMARegisterResult *result) 529 { 530 result->rkey = ntohl(result->rkey); 531 result->host_addr = ntohll(result->host_addr); 532 }; 533 534 const char *print_wrid(int wrid); 535 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 536 uint8_t *data, RDMAControlHeader *resp, 537 int *resp_idx, 538 int (*callback)(RDMAContext *rdma)); 539 540 static inline uint64_t ram_chunk_index(const uint8_t *start, 541 const uint8_t *host) 542 { 543 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 544 } 545 546 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 547 uint64_t i) 548 { 549 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 550 (i << RDMA_REG_CHUNK_SHIFT)); 551 } 552 553 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 554 uint64_t i) 555 { 556 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 557 (1UL << RDMA_REG_CHUNK_SHIFT); 558 559 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 560 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 561 } 562 563 return result; 564 } 565 566 static int rdma_add_block(RDMAContext *rdma, const char *block_name, 567 void *host_addr, 568 ram_addr_t block_offset, uint64_t length) 569 { 570 RDMALocalBlocks *local = &rdma->local_ram_blocks; 571 RDMALocalBlock *block; 572 RDMALocalBlock *old = local->block; 573 574 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1); 575 576 if (local->nb_blocks) { 577 int x; 578 579 if (rdma->blockmap) { 580 for (x = 0; x < local->nb_blocks; x++) { 581 g_hash_table_remove(rdma->blockmap, 582 (void *)(uintptr_t)old[x].offset); 583 g_hash_table_insert(rdma->blockmap, 584 (void *)(uintptr_t)old[x].offset, 585 &local->block[x]); 586 } 587 } 588 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 589 g_free(old); 590 } 591 592 block = &local->block[local->nb_blocks]; 593 594 block->block_name = g_strdup(block_name); 595 block->local_host_addr = host_addr; 596 block->offset = block_offset; 597 block->length = length; 598 block->index = local->nb_blocks; 599 block->src_index = ~0U; /* Filled in by the receipt of the block list */ 600 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 601 block->transit_bitmap = bitmap_new(block->nb_chunks); 602 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 603 block->unregister_bitmap = bitmap_new(block->nb_chunks); 604 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 605 block->remote_keys = g_new0(uint32_t, block->nb_chunks); 606 607 block->is_ram_block = local->init ? false : true; 608 609 if (rdma->blockmap) { 610 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block); 611 } 612 613 trace_rdma_add_block(block_name, local->nb_blocks, 614 (uintptr_t) block->local_host_addr, 615 block->offset, block->length, 616 (uintptr_t) (block->local_host_addr + block->length), 617 BITS_TO_LONGS(block->nb_chunks) * 618 sizeof(unsigned long) * 8, 619 block->nb_chunks); 620 621 local->nb_blocks++; 622 623 return 0; 624 } 625 626 /* 627 * Memory regions need to be registered with the device and queue pairs setup 628 * in advanced before the migration starts. This tells us where the RAM blocks 629 * are so that we can register them individually. 630 */ 631 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque) 632 { 633 const char *block_name = qemu_ram_get_idstr(rb); 634 void *host_addr = qemu_ram_get_host_addr(rb); 635 ram_addr_t block_offset = qemu_ram_get_offset(rb); 636 ram_addr_t length = qemu_ram_get_used_length(rb); 637 return rdma_add_block(opaque, block_name, host_addr, block_offset, length); 638 } 639 640 /* 641 * Identify the RAMBlocks and their quantity. They will be references to 642 * identify chunk boundaries inside each RAMBlock and also be referenced 643 * during dynamic page registration. 644 */ 645 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma) 646 { 647 RDMALocalBlocks *local = &rdma->local_ram_blocks; 648 int ret; 649 650 assert(rdma->blockmap == NULL); 651 memset(local, 0, sizeof *local); 652 ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma); 653 if (ret) { 654 return ret; 655 } 656 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 657 rdma->dest_blocks = g_new0(RDMADestBlock, 658 rdma->local_ram_blocks.nb_blocks); 659 local->init = true; 660 return 0; 661 } 662 663 /* 664 * Note: If used outside of cleanup, the caller must ensure that the destination 665 * block structures are also updated 666 */ 667 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) 668 { 669 RDMALocalBlocks *local = &rdma->local_ram_blocks; 670 RDMALocalBlock *old = local->block; 671 int x; 672 673 if (rdma->blockmap) { 674 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset); 675 } 676 if (block->pmr) { 677 int j; 678 679 for (j = 0; j < block->nb_chunks; j++) { 680 if (!block->pmr[j]) { 681 continue; 682 } 683 ibv_dereg_mr(block->pmr[j]); 684 rdma->total_registrations--; 685 } 686 g_free(block->pmr); 687 block->pmr = NULL; 688 } 689 690 if (block->mr) { 691 ibv_dereg_mr(block->mr); 692 rdma->total_registrations--; 693 block->mr = NULL; 694 } 695 696 g_free(block->transit_bitmap); 697 block->transit_bitmap = NULL; 698 699 g_free(block->unregister_bitmap); 700 block->unregister_bitmap = NULL; 701 702 g_free(block->remote_keys); 703 block->remote_keys = NULL; 704 705 g_free(block->block_name); 706 block->block_name = NULL; 707 708 if (rdma->blockmap) { 709 for (x = 0; x < local->nb_blocks; x++) { 710 g_hash_table_remove(rdma->blockmap, 711 (void *)(uintptr_t)old[x].offset); 712 } 713 } 714 715 if (local->nb_blocks > 1) { 716 717 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1); 718 719 if (block->index) { 720 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 721 } 722 723 if (block->index < (local->nb_blocks - 1)) { 724 memcpy(local->block + block->index, old + (block->index + 1), 725 sizeof(RDMALocalBlock) * 726 (local->nb_blocks - (block->index + 1))); 727 for (x = block->index; x < local->nb_blocks - 1; x++) { 728 local->block[x].index--; 729 } 730 } 731 } else { 732 assert(block == local->block); 733 local->block = NULL; 734 } 735 736 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr, 737 block->offset, block->length, 738 (uintptr_t)(block->local_host_addr + block->length), 739 BITS_TO_LONGS(block->nb_chunks) * 740 sizeof(unsigned long) * 8, block->nb_chunks); 741 742 g_free(old); 743 744 local->nb_blocks--; 745 746 if (local->nb_blocks && rdma->blockmap) { 747 for (x = 0; x < local->nb_blocks; x++) { 748 g_hash_table_insert(rdma->blockmap, 749 (void *)(uintptr_t)local->block[x].offset, 750 &local->block[x]); 751 } 752 } 753 754 return 0; 755 } 756 757 /* 758 * Put in the log file which RDMA device was opened and the details 759 * associated with that device. 760 */ 761 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 762 { 763 struct ibv_port_attr port; 764 765 if (ibv_query_port(verbs, 1, &port)) { 766 error_report("Failed to query port information"); 767 return; 768 } 769 770 printf("%s RDMA Device opened: kernel name %s " 771 "uverbs device name %s, " 772 "infiniband_verbs class device path %s, " 773 "infiniband class device path %s, " 774 "transport: (%d) %s\n", 775 who, 776 verbs->device->name, 777 verbs->device->dev_name, 778 verbs->device->dev_path, 779 verbs->device->ibdev_path, 780 port.link_layer, 781 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" : 782 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 783 ? "Ethernet" : "Unknown")); 784 } 785 786 /* 787 * Put in the log file the RDMA gid addressing information, 788 * useful for folks who have trouble understanding the 789 * RDMA device hierarchy in the kernel. 790 */ 791 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 792 { 793 char sgid[33]; 794 char dgid[33]; 795 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 796 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 797 trace_qemu_rdma_dump_gid(who, sgid, dgid); 798 } 799 800 /* 801 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 802 * We will try the next addrinfo struct, and fail if there are 803 * no other valid addresses to bind against. 804 * 805 * If user is listening on '[::]', then we will not have a opened a device 806 * yet and have no way of verifying if the device is RoCE or not. 807 * 808 * In this case, the source VM will throw an error for ALL types of 809 * connections (both IPv4 and IPv6) if the destination machine does not have 810 * a regular infiniband network available for use. 811 * 812 * The only way to guarantee that an error is thrown for broken kernels is 813 * for the management software to choose a *specific* interface at bind time 814 * and validate what time of hardware it is. 815 * 816 * Unfortunately, this puts the user in a fix: 817 * 818 * If the source VM connects with an IPv4 address without knowing that the 819 * destination has bound to '[::]' the migration will unconditionally fail 820 * unless the management software is explicitly listening on the IPv4 821 * address while using a RoCE-based device. 822 * 823 * If the source VM connects with an IPv6 address, then we're OK because we can 824 * throw an error on the source (and similarly on the destination). 825 * 826 * But in mixed environments, this will be broken for a while until it is fixed 827 * inside linux. 828 * 829 * We do provide a *tiny* bit of help in this function: We can list all of the 830 * devices in the system and check to see if all the devices are RoCE or 831 * Infiniband. 832 * 833 * If we detect that we have a *pure* RoCE environment, then we can safely 834 * thrown an error even if the management software has specified '[::]' as the 835 * bind address. 836 * 837 * However, if there is are multiple hetergeneous devices, then we cannot make 838 * this assumption and the user just has to be sure they know what they are 839 * doing. 840 * 841 * Patches are being reviewed on linux-rdma. 842 */ 843 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp) 844 { 845 /* This bug only exists in linux, to our knowledge. */ 846 #ifdef CONFIG_LINUX 847 struct ibv_port_attr port_attr; 848 849 /* 850 * Verbs are only NULL if management has bound to '[::]'. 851 * 852 * Let's iterate through all the devices and see if there any pure IB 853 * devices (non-ethernet). 854 * 855 * If not, then we can safely proceed with the migration. 856 * Otherwise, there are no guarantees until the bug is fixed in linux. 857 */ 858 if (!verbs) { 859 int num_devices, x; 860 struct ibv_device **dev_list = ibv_get_device_list(&num_devices); 861 bool roce_found = false; 862 bool ib_found = false; 863 864 for (x = 0; x < num_devices; x++) { 865 verbs = ibv_open_device(dev_list[x]); 866 if (!verbs) { 867 if (errno == EPERM) { 868 continue; 869 } else { 870 return -EINVAL; 871 } 872 } 873 874 if (ibv_query_port(verbs, 1, &port_attr)) { 875 ibv_close_device(verbs); 876 ERROR(errp, "Could not query initial IB port"); 877 return -EINVAL; 878 } 879 880 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 881 ib_found = true; 882 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 883 roce_found = true; 884 } 885 886 ibv_close_device(verbs); 887 888 } 889 890 if (roce_found) { 891 if (ib_found) { 892 fprintf(stderr, "WARN: migrations may fail:" 893 " IPv6 over RoCE / iWARP in linux" 894 " is broken. But since you appear to have a" 895 " mixed RoCE / IB environment, be sure to only" 896 " migrate over the IB fabric until the kernel " 897 " fixes the bug.\n"); 898 } else { 899 ERROR(errp, "You only have RoCE / iWARP devices in your systems" 900 " and your management software has specified '[::]'" 901 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 902 return -ENONET; 903 } 904 } 905 906 return 0; 907 } 908 909 /* 910 * If we have a verbs context, that means that some other than '[::]' was 911 * used by the management software for binding. In which case we can 912 * actually warn the user about a potentially broken kernel. 913 */ 914 915 /* IB ports start with 1, not 0 */ 916 if (ibv_query_port(verbs, 1, &port_attr)) { 917 ERROR(errp, "Could not query initial IB port"); 918 return -EINVAL; 919 } 920 921 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 922 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 " 923 "(but patches on linux-rdma in progress)"); 924 return -ENONET; 925 } 926 927 #endif 928 929 return 0; 930 } 931 932 /* 933 * Figure out which RDMA device corresponds to the requested IP hostname 934 * Also create the initial connection manager identifiers for opening 935 * the connection. 936 */ 937 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 938 { 939 int ret; 940 struct rdma_addrinfo *res; 941 char port_str[16]; 942 struct rdma_cm_event *cm_event; 943 char ip[40] = "unknown"; 944 struct rdma_addrinfo *e; 945 946 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 947 ERROR(errp, "RDMA hostname has not been set"); 948 return -EINVAL; 949 } 950 951 /* create CM channel */ 952 rdma->channel = rdma_create_event_channel(); 953 if (!rdma->channel) { 954 ERROR(errp, "could not create CM channel"); 955 return -EINVAL; 956 } 957 958 /* create CM id */ 959 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 960 if (ret) { 961 ERROR(errp, "could not create channel id"); 962 goto err_resolve_create_id; 963 } 964 965 snprintf(port_str, 16, "%d", rdma->port); 966 port_str[15] = '\0'; 967 968 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 969 if (ret < 0) { 970 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 971 goto err_resolve_get_addr; 972 } 973 974 for (e = res; e != NULL; e = e->ai_next) { 975 inet_ntop(e->ai_family, 976 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 977 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 978 979 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 980 RDMA_RESOLVE_TIMEOUT_MS); 981 if (!ret) { 982 if (e->ai_family == AF_INET6) { 983 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp); 984 if (ret) { 985 continue; 986 } 987 } 988 goto route; 989 } 990 } 991 992 rdma_freeaddrinfo(res); 993 ERROR(errp, "could not resolve address %s", rdma->host); 994 goto err_resolve_get_addr; 995 996 route: 997 rdma_freeaddrinfo(res); 998 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 999 1000 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1001 if (ret) { 1002 ERROR(errp, "could not perform event_addr_resolved"); 1003 goto err_resolve_get_addr; 1004 } 1005 1006 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 1007 ERROR(errp, "result not equal to event_addr_resolved %s", 1008 rdma_event_str(cm_event->event)); 1009 error_report("rdma_resolve_addr"); 1010 rdma_ack_cm_event(cm_event); 1011 ret = -EINVAL; 1012 goto err_resolve_get_addr; 1013 } 1014 rdma_ack_cm_event(cm_event); 1015 1016 /* resolve route */ 1017 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 1018 if (ret) { 1019 ERROR(errp, "could not resolve rdma route"); 1020 goto err_resolve_get_addr; 1021 } 1022 1023 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1024 if (ret) { 1025 ERROR(errp, "could not perform event_route_resolved"); 1026 goto err_resolve_get_addr; 1027 } 1028 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 1029 ERROR(errp, "result not equal to event_route_resolved: %s", 1030 rdma_event_str(cm_event->event)); 1031 rdma_ack_cm_event(cm_event); 1032 ret = -EINVAL; 1033 goto err_resolve_get_addr; 1034 } 1035 rdma_ack_cm_event(cm_event); 1036 rdma->verbs = rdma->cm_id->verbs; 1037 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 1038 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 1039 return 0; 1040 1041 err_resolve_get_addr: 1042 rdma_destroy_id(rdma->cm_id); 1043 rdma->cm_id = NULL; 1044 err_resolve_create_id: 1045 rdma_destroy_event_channel(rdma->channel); 1046 rdma->channel = NULL; 1047 return ret; 1048 } 1049 1050 /* 1051 * Create protection domain and completion queues 1052 */ 1053 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma) 1054 { 1055 /* allocate pd */ 1056 rdma->pd = ibv_alloc_pd(rdma->verbs); 1057 if (!rdma->pd) { 1058 error_report("failed to allocate protection domain"); 1059 return -1; 1060 } 1061 1062 /* create completion channel */ 1063 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs); 1064 if (!rdma->comp_channel) { 1065 error_report("failed to allocate completion channel"); 1066 goto err_alloc_pd_cq; 1067 } 1068 1069 /* 1070 * Completion queue can be filled by both read and write work requests, 1071 * so must reflect the sum of both possible queue sizes. 1072 */ 1073 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1074 NULL, rdma->comp_channel, 0); 1075 if (!rdma->cq) { 1076 error_report("failed to allocate completion queue"); 1077 goto err_alloc_pd_cq; 1078 } 1079 1080 return 0; 1081 1082 err_alloc_pd_cq: 1083 if (rdma->pd) { 1084 ibv_dealloc_pd(rdma->pd); 1085 } 1086 if (rdma->comp_channel) { 1087 ibv_destroy_comp_channel(rdma->comp_channel); 1088 } 1089 rdma->pd = NULL; 1090 rdma->comp_channel = NULL; 1091 return -1; 1092 1093 } 1094 1095 /* 1096 * Create queue pairs. 1097 */ 1098 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1099 { 1100 struct ibv_qp_init_attr attr = { 0 }; 1101 int ret; 1102 1103 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1104 attr.cap.max_recv_wr = 3; 1105 attr.cap.max_send_sge = 1; 1106 attr.cap.max_recv_sge = 1; 1107 attr.send_cq = rdma->cq; 1108 attr.recv_cq = rdma->cq; 1109 attr.qp_type = IBV_QPT_RC; 1110 1111 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr); 1112 if (ret) { 1113 return -1; 1114 } 1115 1116 rdma->qp = rdma->cm_id->qp; 1117 return 0; 1118 } 1119 1120 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma) 1121 { 1122 int i; 1123 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1124 1125 for (i = 0; i < local->nb_blocks; i++) { 1126 local->block[i].mr = 1127 ibv_reg_mr(rdma->pd, 1128 local->block[i].local_host_addr, 1129 local->block[i].length, 1130 IBV_ACCESS_LOCAL_WRITE | 1131 IBV_ACCESS_REMOTE_WRITE 1132 ); 1133 if (!local->block[i].mr) { 1134 perror("Failed to register local dest ram block!\n"); 1135 break; 1136 } 1137 rdma->total_registrations++; 1138 } 1139 1140 if (i >= local->nb_blocks) { 1141 return 0; 1142 } 1143 1144 for (i--; i >= 0; i--) { 1145 ibv_dereg_mr(local->block[i].mr); 1146 rdma->total_registrations--; 1147 } 1148 1149 return -1; 1150 1151 } 1152 1153 /* 1154 * Find the ram block that corresponds to the page requested to be 1155 * transmitted by QEMU. 1156 * 1157 * Once the block is found, also identify which 'chunk' within that 1158 * block that the page belongs to. 1159 * 1160 * This search cannot fail or the migration will fail. 1161 */ 1162 static int qemu_rdma_search_ram_block(RDMAContext *rdma, 1163 uintptr_t block_offset, 1164 uint64_t offset, 1165 uint64_t length, 1166 uint64_t *block_index, 1167 uint64_t *chunk_index) 1168 { 1169 uint64_t current_addr = block_offset + offset; 1170 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1171 (void *) block_offset); 1172 assert(block); 1173 assert(current_addr >= block->offset); 1174 assert((current_addr + length) <= (block->offset + block->length)); 1175 1176 *block_index = block->index; 1177 *chunk_index = ram_chunk_index(block->local_host_addr, 1178 block->local_host_addr + (current_addr - block->offset)); 1179 1180 return 0; 1181 } 1182 1183 /* 1184 * Register a chunk with IB. If the chunk was already registered 1185 * previously, then skip. 1186 * 1187 * Also return the keys associated with the registration needed 1188 * to perform the actual RDMA operation. 1189 */ 1190 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1191 RDMALocalBlock *block, uintptr_t host_addr, 1192 uint32_t *lkey, uint32_t *rkey, int chunk, 1193 uint8_t *chunk_start, uint8_t *chunk_end) 1194 { 1195 if (block->mr) { 1196 if (lkey) { 1197 *lkey = block->mr->lkey; 1198 } 1199 if (rkey) { 1200 *rkey = block->mr->rkey; 1201 } 1202 return 0; 1203 } 1204 1205 /* allocate memory to store chunk MRs */ 1206 if (!block->pmr) { 1207 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks); 1208 } 1209 1210 /* 1211 * If 'rkey', then we're the destination, so grant access to the source. 1212 * 1213 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1214 */ 1215 if (!block->pmr[chunk]) { 1216 uint64_t len = chunk_end - chunk_start; 1217 1218 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1219 1220 block->pmr[chunk] = ibv_reg_mr(rdma->pd, 1221 chunk_start, len, 1222 (rkey ? (IBV_ACCESS_LOCAL_WRITE | 1223 IBV_ACCESS_REMOTE_WRITE) : 0)); 1224 1225 if (!block->pmr[chunk]) { 1226 perror("Failed to register chunk!"); 1227 fprintf(stderr, "Chunk details: block: %d chunk index %d" 1228 " start %" PRIuPTR " end %" PRIuPTR 1229 " host %" PRIuPTR 1230 " local %" PRIuPTR " registrations: %d\n", 1231 block->index, chunk, (uintptr_t)chunk_start, 1232 (uintptr_t)chunk_end, host_addr, 1233 (uintptr_t)block->local_host_addr, 1234 rdma->total_registrations); 1235 return -1; 1236 } 1237 rdma->total_registrations++; 1238 } 1239 1240 if (lkey) { 1241 *lkey = block->pmr[chunk]->lkey; 1242 } 1243 if (rkey) { 1244 *rkey = block->pmr[chunk]->rkey; 1245 } 1246 return 0; 1247 } 1248 1249 /* 1250 * Register (at connection time) the memory used for control 1251 * channel messages. 1252 */ 1253 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1254 { 1255 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1256 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1257 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1258 if (rdma->wr_data[idx].control_mr) { 1259 rdma->total_registrations++; 1260 return 0; 1261 } 1262 error_report("qemu_rdma_reg_control failed"); 1263 return -1; 1264 } 1265 1266 const char *print_wrid(int wrid) 1267 { 1268 if (wrid >= RDMA_WRID_RECV_CONTROL) { 1269 return wrid_desc[RDMA_WRID_RECV_CONTROL]; 1270 } 1271 return wrid_desc[wrid]; 1272 } 1273 1274 /* 1275 * RDMA requires memory registration (mlock/pinning), but this is not good for 1276 * overcommitment. 1277 * 1278 * In preparation for the future where LRU information or workload-specific 1279 * writable writable working set memory access behavior is available to QEMU 1280 * it would be nice to have in place the ability to UN-register/UN-pin 1281 * particular memory regions from the RDMA hardware when it is determine that 1282 * those regions of memory will likely not be accessed again in the near future. 1283 * 1284 * While we do not yet have such information right now, the following 1285 * compile-time option allows us to perform a non-optimized version of this 1286 * behavior. 1287 * 1288 * By uncommenting this option, you will cause *all* RDMA transfers to be 1289 * unregistered immediately after the transfer completes on both sides of the 1290 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode. 1291 * 1292 * This will have a terrible impact on migration performance, so until future 1293 * workload information or LRU information is available, do not attempt to use 1294 * this feature except for basic testing. 1295 */ 1296 /* #define RDMA_UNREGISTRATION_EXAMPLE */ 1297 1298 /* 1299 * Perform a non-optimized memory unregistration after every transfer 1300 * for demonstration purposes, only if pin-all is not requested. 1301 * 1302 * Potential optimizations: 1303 * 1. Start a new thread to run this function continuously 1304 - for bit clearing 1305 - and for receipt of unregister messages 1306 * 2. Use an LRU. 1307 * 3. Use workload hints. 1308 */ 1309 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1310 { 1311 while (rdma->unregistrations[rdma->unregister_current]) { 1312 int ret; 1313 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1314 uint64_t chunk = 1315 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1316 uint64_t index = 1317 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1318 RDMALocalBlock *block = 1319 &(rdma->local_ram_blocks.block[index]); 1320 RDMARegister reg = { .current_index = index }; 1321 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1322 }; 1323 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1324 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1325 .repeat = 1, 1326 }; 1327 1328 trace_qemu_rdma_unregister_waiting_proc(chunk, 1329 rdma->unregister_current); 1330 1331 rdma->unregistrations[rdma->unregister_current] = 0; 1332 rdma->unregister_current++; 1333 1334 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1335 rdma->unregister_current = 0; 1336 } 1337 1338 1339 /* 1340 * Unregistration is speculative (because migration is single-threaded 1341 * and we cannot break the protocol's inifinband message ordering). 1342 * Thus, if the memory is currently being used for transmission, 1343 * then abort the attempt to unregister and try again 1344 * later the next time a completion is received for this memory. 1345 */ 1346 clear_bit(chunk, block->unregister_bitmap); 1347 1348 if (test_bit(chunk, block->transit_bitmap)) { 1349 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1350 continue; 1351 } 1352 1353 trace_qemu_rdma_unregister_waiting_send(chunk); 1354 1355 ret = ibv_dereg_mr(block->pmr[chunk]); 1356 block->pmr[chunk] = NULL; 1357 block->remote_keys[chunk] = 0; 1358 1359 if (ret != 0) { 1360 perror("unregistration chunk failed"); 1361 return -ret; 1362 } 1363 rdma->total_registrations--; 1364 1365 reg.key.chunk = chunk; 1366 register_to_network(rdma, ®); 1367 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1368 &resp, NULL, NULL); 1369 if (ret < 0) { 1370 return ret; 1371 } 1372 1373 trace_qemu_rdma_unregister_waiting_complete(chunk); 1374 } 1375 1376 return 0; 1377 } 1378 1379 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1380 uint64_t chunk) 1381 { 1382 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1383 1384 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1385 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1386 1387 return result; 1388 } 1389 1390 /* 1391 * Set bit for unregistration in the next iteration. 1392 * We cannot transmit right here, but will unpin later. 1393 */ 1394 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index, 1395 uint64_t chunk, uint64_t wr_id) 1396 { 1397 if (rdma->unregistrations[rdma->unregister_next] != 0) { 1398 error_report("rdma migration: queue is full"); 1399 } else { 1400 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1401 1402 if (!test_and_set_bit(chunk, block->unregister_bitmap)) { 1403 trace_qemu_rdma_signal_unregister_append(chunk, 1404 rdma->unregister_next); 1405 1406 rdma->unregistrations[rdma->unregister_next++] = 1407 qemu_rdma_make_wrid(wr_id, index, chunk); 1408 1409 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) { 1410 rdma->unregister_next = 0; 1411 } 1412 } else { 1413 trace_qemu_rdma_signal_unregister_already(chunk); 1414 } 1415 } 1416 } 1417 1418 /* 1419 * Consult the connection manager to see a work request 1420 * (of any kind) has completed. 1421 * Return the work request ID that completed. 1422 */ 1423 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out, 1424 uint32_t *byte_len) 1425 { 1426 int ret; 1427 struct ibv_wc wc; 1428 uint64_t wr_id; 1429 1430 ret = ibv_poll_cq(rdma->cq, 1, &wc); 1431 1432 if (!ret) { 1433 *wr_id_out = RDMA_WRID_NONE; 1434 return 0; 1435 } 1436 1437 if (ret < 0) { 1438 error_report("ibv_poll_cq return %d", ret); 1439 return ret; 1440 } 1441 1442 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1443 1444 if (wc.status != IBV_WC_SUCCESS) { 1445 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n", 1446 wc.status, ibv_wc_status_str(wc.status)); 1447 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]); 1448 1449 return -1; 1450 } 1451 1452 if (rdma->control_ready_expected && 1453 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1454 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL], 1455 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent); 1456 rdma->control_ready_expected = 0; 1457 } 1458 1459 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1460 uint64_t chunk = 1461 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1462 uint64_t index = 1463 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1464 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1465 1466 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent, 1467 index, chunk, block->local_host_addr, 1468 (void *)(uintptr_t)block->remote_host_addr); 1469 1470 clear_bit(chunk, block->transit_bitmap); 1471 1472 if (rdma->nb_sent > 0) { 1473 rdma->nb_sent--; 1474 } 1475 1476 if (!rdma->pin_all) { 1477 /* 1478 * FYI: If one wanted to signal a specific chunk to be unregistered 1479 * using LRU or workload-specific information, this is the function 1480 * you would call to do so. That chunk would then get asynchronously 1481 * unregistered later. 1482 */ 1483 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1484 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id); 1485 #endif 1486 } 1487 } else { 1488 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent); 1489 } 1490 1491 *wr_id_out = wc.wr_id; 1492 if (byte_len) { 1493 *byte_len = wc.byte_len; 1494 } 1495 1496 return 0; 1497 } 1498 1499 /* Wait for activity on the completion channel. 1500 * Returns 0 on success, none-0 on error. 1501 */ 1502 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma) 1503 { 1504 struct rdma_cm_event *cm_event; 1505 int ret = -1; 1506 1507 /* 1508 * Coroutine doesn't start until migration_fd_process_incoming() 1509 * so don't yield unless we know we're running inside of a coroutine. 1510 */ 1511 if (rdma->migration_started_on_destination && 1512 migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) { 1513 yield_until_fd_readable(rdma->comp_channel->fd); 1514 } else { 1515 /* This is the source side, we're in a separate thread 1516 * or destination prior to migration_fd_process_incoming() 1517 * after postcopy, the destination also in a separate thread. 1518 * we can't yield; so we have to poll the fd. 1519 * But we need to be able to handle 'cancel' or an error 1520 * without hanging forever. 1521 */ 1522 while (!rdma->error_state && !rdma->received_error) { 1523 GPollFD pfds[2]; 1524 pfds[0].fd = rdma->comp_channel->fd; 1525 pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1526 pfds[0].revents = 0; 1527 1528 pfds[1].fd = rdma->channel->fd; 1529 pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1530 pfds[1].revents = 0; 1531 1532 /* 0.1s timeout, should be fine for a 'cancel' */ 1533 switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) { 1534 case 2: 1535 case 1: /* fd active */ 1536 if (pfds[0].revents) { 1537 return 0; 1538 } 1539 1540 if (pfds[1].revents) { 1541 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1542 if (ret) { 1543 error_report("failed to get cm event while wait " 1544 "completion channel"); 1545 return -EPIPE; 1546 } 1547 1548 error_report("receive cm event while wait comp channel," 1549 "cm event is %d", cm_event->event); 1550 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 1551 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 1552 rdma_ack_cm_event(cm_event); 1553 return -EPIPE; 1554 } 1555 rdma_ack_cm_event(cm_event); 1556 } 1557 break; 1558 1559 case 0: /* Timeout, go around again */ 1560 break; 1561 1562 default: /* Error of some type - 1563 * I don't trust errno from qemu_poll_ns 1564 */ 1565 error_report("%s: poll failed", __func__); 1566 return -EPIPE; 1567 } 1568 1569 if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) { 1570 /* Bail out and let the cancellation happen */ 1571 return -EPIPE; 1572 } 1573 } 1574 } 1575 1576 if (rdma->received_error) { 1577 return -EPIPE; 1578 } 1579 return rdma->error_state; 1580 } 1581 1582 /* 1583 * Block until the next work request has completed. 1584 * 1585 * First poll to see if a work request has already completed, 1586 * otherwise block. 1587 * 1588 * If we encounter completed work requests for IDs other than 1589 * the one we're interested in, then that's generally an error. 1590 * 1591 * The only exception is actual RDMA Write completions. These 1592 * completions only need to be recorded, but do not actually 1593 * need further processing. 1594 */ 1595 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested, 1596 uint32_t *byte_len) 1597 { 1598 int num_cq_events = 0, ret = 0; 1599 struct ibv_cq *cq; 1600 void *cq_ctx; 1601 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1602 1603 if (ibv_req_notify_cq(rdma->cq, 0)) { 1604 return -1; 1605 } 1606 /* poll cq first */ 1607 while (wr_id != wrid_requested) { 1608 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1609 if (ret < 0) { 1610 return ret; 1611 } 1612 1613 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1614 1615 if (wr_id == RDMA_WRID_NONE) { 1616 break; 1617 } 1618 if (wr_id != wrid_requested) { 1619 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1620 wrid_requested, print_wrid(wr_id), wr_id); 1621 } 1622 } 1623 1624 if (wr_id == wrid_requested) { 1625 return 0; 1626 } 1627 1628 while (1) { 1629 ret = qemu_rdma_wait_comp_channel(rdma); 1630 if (ret) { 1631 goto err_block_for_wrid; 1632 } 1633 1634 ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx); 1635 if (ret) { 1636 perror("ibv_get_cq_event"); 1637 goto err_block_for_wrid; 1638 } 1639 1640 num_cq_events++; 1641 1642 ret = -ibv_req_notify_cq(cq, 0); 1643 if (ret) { 1644 goto err_block_for_wrid; 1645 } 1646 1647 while (wr_id != wrid_requested) { 1648 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1649 if (ret < 0) { 1650 goto err_block_for_wrid; 1651 } 1652 1653 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1654 1655 if (wr_id == RDMA_WRID_NONE) { 1656 break; 1657 } 1658 if (wr_id != wrid_requested) { 1659 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1660 wrid_requested, print_wrid(wr_id), wr_id); 1661 } 1662 } 1663 1664 if (wr_id == wrid_requested) { 1665 goto success_block_for_wrid; 1666 } 1667 } 1668 1669 success_block_for_wrid: 1670 if (num_cq_events) { 1671 ibv_ack_cq_events(cq, num_cq_events); 1672 } 1673 return 0; 1674 1675 err_block_for_wrid: 1676 if (num_cq_events) { 1677 ibv_ack_cq_events(cq, num_cq_events); 1678 } 1679 1680 rdma->error_state = ret; 1681 return ret; 1682 } 1683 1684 /* 1685 * Post a SEND message work request for the control channel 1686 * containing some data and block until the post completes. 1687 */ 1688 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1689 RDMAControlHeader *head) 1690 { 1691 int ret = 0; 1692 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1693 struct ibv_send_wr *bad_wr; 1694 struct ibv_sge sge = { 1695 .addr = (uintptr_t)(wr->control), 1696 .length = head->len + sizeof(RDMAControlHeader), 1697 .lkey = wr->control_mr->lkey, 1698 }; 1699 struct ibv_send_wr send_wr = { 1700 .wr_id = RDMA_WRID_SEND_CONTROL, 1701 .opcode = IBV_WR_SEND, 1702 .send_flags = IBV_SEND_SIGNALED, 1703 .sg_list = &sge, 1704 .num_sge = 1, 1705 }; 1706 1707 trace_qemu_rdma_post_send_control(control_desc(head->type)); 1708 1709 /* 1710 * We don't actually need to do a memcpy() in here if we used 1711 * the "sge" properly, but since we're only sending control messages 1712 * (not RAM in a performance-critical path), then its OK for now. 1713 * 1714 * The copy makes the RDMAControlHeader simpler to manipulate 1715 * for the time being. 1716 */ 1717 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1718 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1719 control_to_network((void *) wr->control); 1720 1721 if (buf) { 1722 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1723 } 1724 1725 1726 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1727 1728 if (ret > 0) { 1729 error_report("Failed to use post IB SEND for control"); 1730 return -ret; 1731 } 1732 1733 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1734 if (ret < 0) { 1735 error_report("rdma migration: send polling control error"); 1736 } 1737 1738 return ret; 1739 } 1740 1741 /* 1742 * Post a RECV work request in anticipation of some future receipt 1743 * of data on the control channel. 1744 */ 1745 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx) 1746 { 1747 struct ibv_recv_wr *bad_wr; 1748 struct ibv_sge sge = { 1749 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1750 .length = RDMA_CONTROL_MAX_BUFFER, 1751 .lkey = rdma->wr_data[idx].control_mr->lkey, 1752 }; 1753 1754 struct ibv_recv_wr recv_wr = { 1755 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1756 .sg_list = &sge, 1757 .num_sge = 1, 1758 }; 1759 1760 1761 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1762 return -1; 1763 } 1764 1765 return 0; 1766 } 1767 1768 /* 1769 * Block and wait for a RECV control channel message to arrive. 1770 */ 1771 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1772 RDMAControlHeader *head, int expecting, int idx) 1773 { 1774 uint32_t byte_len; 1775 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1776 &byte_len); 1777 1778 if (ret < 0) { 1779 error_report("rdma migration: recv polling control error!"); 1780 return ret; 1781 } 1782 1783 network_to_control((void *) rdma->wr_data[idx].control); 1784 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1785 1786 trace_qemu_rdma_exchange_get_response_start(control_desc(expecting)); 1787 1788 if (expecting == RDMA_CONTROL_NONE) { 1789 trace_qemu_rdma_exchange_get_response_none(control_desc(head->type), 1790 head->type); 1791 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1792 error_report("Was expecting a %s (%d) control message" 1793 ", but got: %s (%d), length: %d", 1794 control_desc(expecting), expecting, 1795 control_desc(head->type), head->type, head->len); 1796 if (head->type == RDMA_CONTROL_ERROR) { 1797 rdma->received_error = true; 1798 } 1799 return -EIO; 1800 } 1801 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1802 error_report("too long length: %d", head->len); 1803 return -EINVAL; 1804 } 1805 if (sizeof(*head) + head->len != byte_len) { 1806 error_report("Malformed length: %d byte_len %d", head->len, byte_len); 1807 return -EINVAL; 1808 } 1809 1810 return 0; 1811 } 1812 1813 /* 1814 * When a RECV work request has completed, the work request's 1815 * buffer is pointed at the header. 1816 * 1817 * This will advance the pointer to the data portion 1818 * of the control message of the work request's buffer that 1819 * was populated after the work request finished. 1820 */ 1821 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1822 RDMAControlHeader *head) 1823 { 1824 rdma->wr_data[idx].control_len = head->len; 1825 rdma->wr_data[idx].control_curr = 1826 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1827 } 1828 1829 /* 1830 * This is an 'atomic' high-level operation to deliver a single, unified 1831 * control-channel message. 1832 * 1833 * Additionally, if the user is expecting some kind of reply to this message, 1834 * they can request a 'resp' response message be filled in by posting an 1835 * additional work request on behalf of the user and waiting for an additional 1836 * completion. 1837 * 1838 * The extra (optional) response is used during registration to us from having 1839 * to perform an *additional* exchange of message just to provide a response by 1840 * instead piggy-backing on the acknowledgement. 1841 */ 1842 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1843 uint8_t *data, RDMAControlHeader *resp, 1844 int *resp_idx, 1845 int (*callback)(RDMAContext *rdma)) 1846 { 1847 int ret = 0; 1848 1849 /* 1850 * Wait until the dest is ready before attempting to deliver the message 1851 * by waiting for a READY message. 1852 */ 1853 if (rdma->control_ready_expected) { 1854 RDMAControlHeader resp; 1855 ret = qemu_rdma_exchange_get_response(rdma, 1856 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY); 1857 if (ret < 0) { 1858 return ret; 1859 } 1860 } 1861 1862 /* 1863 * If the user is expecting a response, post a WR in anticipation of it. 1864 */ 1865 if (resp) { 1866 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA); 1867 if (ret) { 1868 error_report("rdma migration: error posting" 1869 " extra control recv for anticipated result!"); 1870 return ret; 1871 } 1872 } 1873 1874 /* 1875 * Post a WR to replace the one we just consumed for the READY message. 1876 */ 1877 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1878 if (ret) { 1879 error_report("rdma migration: error posting first control recv!"); 1880 return ret; 1881 } 1882 1883 /* 1884 * Deliver the control message that was requested. 1885 */ 1886 ret = qemu_rdma_post_send_control(rdma, data, head); 1887 1888 if (ret < 0) { 1889 error_report("Failed to send control buffer!"); 1890 return ret; 1891 } 1892 1893 /* 1894 * If we're expecting a response, block and wait for it. 1895 */ 1896 if (resp) { 1897 if (callback) { 1898 trace_qemu_rdma_exchange_send_issue_callback(); 1899 ret = callback(rdma); 1900 if (ret < 0) { 1901 return ret; 1902 } 1903 } 1904 1905 trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type)); 1906 ret = qemu_rdma_exchange_get_response(rdma, resp, 1907 resp->type, RDMA_WRID_DATA); 1908 1909 if (ret < 0) { 1910 return ret; 1911 } 1912 1913 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1914 if (resp_idx) { 1915 *resp_idx = RDMA_WRID_DATA; 1916 } 1917 trace_qemu_rdma_exchange_send_received(control_desc(resp->type)); 1918 } 1919 1920 rdma->control_ready_expected = 1; 1921 1922 return 0; 1923 } 1924 1925 /* 1926 * This is an 'atomic' high-level operation to receive a single, unified 1927 * control-channel message. 1928 */ 1929 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 1930 int expecting) 1931 { 1932 RDMAControlHeader ready = { 1933 .len = 0, 1934 .type = RDMA_CONTROL_READY, 1935 .repeat = 1, 1936 }; 1937 int ret; 1938 1939 /* 1940 * Inform the source that we're ready to receive a message. 1941 */ 1942 ret = qemu_rdma_post_send_control(rdma, NULL, &ready); 1943 1944 if (ret < 0) { 1945 error_report("Failed to send control buffer!"); 1946 return ret; 1947 } 1948 1949 /* 1950 * Block and wait for the message. 1951 */ 1952 ret = qemu_rdma_exchange_get_response(rdma, head, 1953 expecting, RDMA_WRID_READY); 1954 1955 if (ret < 0) { 1956 return ret; 1957 } 1958 1959 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 1960 1961 /* 1962 * Post a new RECV work request to replace the one we just consumed. 1963 */ 1964 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1965 if (ret) { 1966 error_report("rdma migration: error posting second control recv!"); 1967 return ret; 1968 } 1969 1970 return 0; 1971 } 1972 1973 /* 1974 * Write an actual chunk of memory using RDMA. 1975 * 1976 * If we're using dynamic registration on the dest-side, we have to 1977 * send a registration command first. 1978 */ 1979 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, 1980 int current_index, uint64_t current_addr, 1981 uint64_t length) 1982 { 1983 struct ibv_sge sge; 1984 struct ibv_send_wr send_wr = { 0 }; 1985 struct ibv_send_wr *bad_wr; 1986 int reg_result_idx, ret, count = 0; 1987 uint64_t chunk, chunks; 1988 uint8_t *chunk_start, *chunk_end; 1989 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 1990 RDMARegister reg; 1991 RDMARegisterResult *reg_result; 1992 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 1993 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1994 .type = RDMA_CONTROL_REGISTER_REQUEST, 1995 .repeat = 1, 1996 }; 1997 1998 retry: 1999 sge.addr = (uintptr_t)(block->local_host_addr + 2000 (current_addr - block->offset)); 2001 sge.length = length; 2002 2003 chunk = ram_chunk_index(block->local_host_addr, 2004 (uint8_t *)(uintptr_t)sge.addr); 2005 chunk_start = ram_chunk_start(block, chunk); 2006 2007 if (block->is_ram_block) { 2008 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 2009 2010 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 2011 chunks--; 2012 } 2013 } else { 2014 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 2015 2016 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 2017 chunks--; 2018 } 2019 } 2020 2021 trace_qemu_rdma_write_one_top(chunks + 1, 2022 (chunks + 1) * 2023 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 2024 2025 chunk_end = ram_chunk_end(block, chunk + chunks); 2026 2027 if (!rdma->pin_all) { 2028 #ifdef RDMA_UNREGISTRATION_EXAMPLE 2029 qemu_rdma_unregister_waiting(rdma); 2030 #endif 2031 } 2032 2033 while (test_bit(chunk, block->transit_bitmap)) { 2034 (void)count; 2035 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 2036 sge.addr, length, rdma->nb_sent, block->nb_chunks); 2037 2038 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2039 2040 if (ret < 0) { 2041 error_report("Failed to Wait for previous write to complete " 2042 "block %d chunk %" PRIu64 2043 " current %" PRIu64 " len %" PRIu64 " %d", 2044 current_index, chunk, sge.addr, length, rdma->nb_sent); 2045 return ret; 2046 } 2047 } 2048 2049 if (!rdma->pin_all || !block->is_ram_block) { 2050 if (!block->remote_keys[chunk]) { 2051 /* 2052 * This chunk has not yet been registered, so first check to see 2053 * if the entire chunk is zero. If so, tell the other size to 2054 * memset() + madvise() the entire chunk without RDMA. 2055 */ 2056 2057 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) { 2058 RDMACompress comp = { 2059 .offset = current_addr, 2060 .value = 0, 2061 .block_idx = current_index, 2062 .length = length, 2063 }; 2064 2065 head.len = sizeof(comp); 2066 head.type = RDMA_CONTROL_COMPRESS; 2067 2068 trace_qemu_rdma_write_one_zero(chunk, sge.length, 2069 current_index, current_addr); 2070 2071 compress_to_network(rdma, &comp); 2072 ret = qemu_rdma_exchange_send(rdma, &head, 2073 (uint8_t *) &comp, NULL, NULL, NULL); 2074 2075 if (ret < 0) { 2076 return -EIO; 2077 } 2078 2079 acct_update_position(f, sge.length, true); 2080 2081 return 1; 2082 } 2083 2084 /* 2085 * Otherwise, tell other side to register. 2086 */ 2087 reg.current_index = current_index; 2088 if (block->is_ram_block) { 2089 reg.key.current_addr = current_addr; 2090 } else { 2091 reg.key.chunk = chunk; 2092 } 2093 reg.chunks = chunks; 2094 2095 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 2096 current_addr); 2097 2098 register_to_network(rdma, ®); 2099 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 2100 &resp, ®_result_idx, NULL); 2101 if (ret < 0) { 2102 return ret; 2103 } 2104 2105 /* try to overlap this single registration with the one we sent. */ 2106 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2107 &sge.lkey, NULL, chunk, 2108 chunk_start, chunk_end)) { 2109 error_report("cannot get lkey"); 2110 return -EINVAL; 2111 } 2112 2113 reg_result = (RDMARegisterResult *) 2114 rdma->wr_data[reg_result_idx].control_curr; 2115 2116 network_to_result(reg_result); 2117 2118 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 2119 reg_result->rkey, chunk); 2120 2121 block->remote_keys[chunk] = reg_result->rkey; 2122 block->remote_host_addr = reg_result->host_addr; 2123 } else { 2124 /* already registered before */ 2125 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2126 &sge.lkey, NULL, chunk, 2127 chunk_start, chunk_end)) { 2128 error_report("cannot get lkey!"); 2129 return -EINVAL; 2130 } 2131 } 2132 2133 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 2134 } else { 2135 send_wr.wr.rdma.rkey = block->remote_rkey; 2136 2137 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2138 &sge.lkey, NULL, chunk, 2139 chunk_start, chunk_end)) { 2140 error_report("cannot get lkey!"); 2141 return -EINVAL; 2142 } 2143 } 2144 2145 /* 2146 * Encode the ram block index and chunk within this wrid. 2147 * We will use this information at the time of completion 2148 * to figure out which bitmap to check against and then which 2149 * chunk in the bitmap to look for. 2150 */ 2151 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 2152 current_index, chunk); 2153 2154 send_wr.opcode = IBV_WR_RDMA_WRITE; 2155 send_wr.send_flags = IBV_SEND_SIGNALED; 2156 send_wr.sg_list = &sge; 2157 send_wr.num_sge = 1; 2158 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2159 (current_addr - block->offset); 2160 2161 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2162 sge.length); 2163 2164 /* 2165 * ibv_post_send() does not return negative error numbers, 2166 * per the specification they are positive - no idea why. 2167 */ 2168 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2169 2170 if (ret == ENOMEM) { 2171 trace_qemu_rdma_write_one_queue_full(); 2172 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2173 if (ret < 0) { 2174 error_report("rdma migration: failed to make " 2175 "room in full send queue! %d", ret); 2176 return ret; 2177 } 2178 2179 goto retry; 2180 2181 } else if (ret > 0) { 2182 perror("rdma migration: post rdma write failed"); 2183 return -ret; 2184 } 2185 2186 set_bit(chunk, block->transit_bitmap); 2187 acct_update_position(f, sge.length, false); 2188 rdma->total_writes++; 2189 2190 return 0; 2191 } 2192 2193 /* 2194 * Push out any unwritten RDMA operations. 2195 * 2196 * We support sending out multiple chunks at the same time. 2197 * Not all of them need to get signaled in the completion queue. 2198 */ 2199 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) 2200 { 2201 int ret; 2202 2203 if (!rdma->current_length) { 2204 return 0; 2205 } 2206 2207 ret = qemu_rdma_write_one(f, rdma, 2208 rdma->current_index, rdma->current_addr, rdma->current_length); 2209 2210 if (ret < 0) { 2211 return ret; 2212 } 2213 2214 if (ret == 0) { 2215 rdma->nb_sent++; 2216 trace_qemu_rdma_write_flush(rdma->nb_sent); 2217 } 2218 2219 rdma->current_length = 0; 2220 rdma->current_addr = 0; 2221 2222 return 0; 2223 } 2224 2225 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma, 2226 uint64_t offset, uint64_t len) 2227 { 2228 RDMALocalBlock *block; 2229 uint8_t *host_addr; 2230 uint8_t *chunk_end; 2231 2232 if (rdma->current_index < 0) { 2233 return 0; 2234 } 2235 2236 if (rdma->current_chunk < 0) { 2237 return 0; 2238 } 2239 2240 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2241 host_addr = block->local_host_addr + (offset - block->offset); 2242 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2243 2244 if (rdma->current_length == 0) { 2245 return 0; 2246 } 2247 2248 /* 2249 * Only merge into chunk sequentially. 2250 */ 2251 if (offset != (rdma->current_addr + rdma->current_length)) { 2252 return 0; 2253 } 2254 2255 if (offset < block->offset) { 2256 return 0; 2257 } 2258 2259 if ((offset + len) > (block->offset + block->length)) { 2260 return 0; 2261 } 2262 2263 if ((host_addr + len) > chunk_end) { 2264 return 0; 2265 } 2266 2267 return 1; 2268 } 2269 2270 /* 2271 * We're not actually writing here, but doing three things: 2272 * 2273 * 1. Identify the chunk the buffer belongs to. 2274 * 2. If the chunk is full or the buffer doesn't belong to the current 2275 * chunk, then start a new chunk and flush() the old chunk. 2276 * 3. To keep the hardware busy, we also group chunks into batches 2277 * and only require that a batch gets acknowledged in the completion 2278 * queue instead of each individual chunk. 2279 */ 2280 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma, 2281 uint64_t block_offset, uint64_t offset, 2282 uint64_t len) 2283 { 2284 uint64_t current_addr = block_offset + offset; 2285 uint64_t index = rdma->current_index; 2286 uint64_t chunk = rdma->current_chunk; 2287 int ret; 2288 2289 /* If we cannot merge it, we flush the current buffer first. */ 2290 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) { 2291 ret = qemu_rdma_write_flush(f, rdma); 2292 if (ret) { 2293 return ret; 2294 } 2295 rdma->current_length = 0; 2296 rdma->current_addr = current_addr; 2297 2298 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2299 offset, len, &index, &chunk); 2300 if (ret) { 2301 error_report("ram block search failed"); 2302 return ret; 2303 } 2304 rdma->current_index = index; 2305 rdma->current_chunk = chunk; 2306 } 2307 2308 /* merge it */ 2309 rdma->current_length += len; 2310 2311 /* flush it if buffer is too large */ 2312 if (rdma->current_length >= RDMA_MERGE_MAX) { 2313 return qemu_rdma_write_flush(f, rdma); 2314 } 2315 2316 return 0; 2317 } 2318 2319 static void qemu_rdma_cleanup(RDMAContext *rdma) 2320 { 2321 int idx; 2322 2323 if (rdma->cm_id && rdma->connected) { 2324 if ((rdma->error_state || 2325 migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) && 2326 !rdma->received_error) { 2327 RDMAControlHeader head = { .len = 0, 2328 .type = RDMA_CONTROL_ERROR, 2329 .repeat = 1, 2330 }; 2331 error_report("Early error. Sending error."); 2332 qemu_rdma_post_send_control(rdma, NULL, &head); 2333 } 2334 2335 rdma_disconnect(rdma->cm_id); 2336 trace_qemu_rdma_cleanup_disconnect(); 2337 rdma->connected = false; 2338 } 2339 2340 if (rdma->channel) { 2341 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 2342 } 2343 g_free(rdma->dest_blocks); 2344 rdma->dest_blocks = NULL; 2345 2346 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2347 if (rdma->wr_data[idx].control_mr) { 2348 rdma->total_registrations--; 2349 ibv_dereg_mr(rdma->wr_data[idx].control_mr); 2350 } 2351 rdma->wr_data[idx].control_mr = NULL; 2352 } 2353 2354 if (rdma->local_ram_blocks.block) { 2355 while (rdma->local_ram_blocks.nb_blocks) { 2356 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); 2357 } 2358 } 2359 2360 if (rdma->qp) { 2361 rdma_destroy_qp(rdma->cm_id); 2362 rdma->qp = NULL; 2363 } 2364 if (rdma->cq) { 2365 ibv_destroy_cq(rdma->cq); 2366 rdma->cq = NULL; 2367 } 2368 if (rdma->comp_channel) { 2369 ibv_destroy_comp_channel(rdma->comp_channel); 2370 rdma->comp_channel = NULL; 2371 } 2372 if (rdma->pd) { 2373 ibv_dealloc_pd(rdma->pd); 2374 rdma->pd = NULL; 2375 } 2376 if (rdma->cm_id) { 2377 rdma_destroy_id(rdma->cm_id); 2378 rdma->cm_id = NULL; 2379 } 2380 2381 /* the destination side, listen_id and channel is shared */ 2382 if (rdma->listen_id) { 2383 if (!rdma->is_return_path) { 2384 rdma_destroy_id(rdma->listen_id); 2385 } 2386 rdma->listen_id = NULL; 2387 2388 if (rdma->channel) { 2389 if (!rdma->is_return_path) { 2390 rdma_destroy_event_channel(rdma->channel); 2391 } 2392 rdma->channel = NULL; 2393 } 2394 } 2395 2396 if (rdma->channel) { 2397 rdma_destroy_event_channel(rdma->channel); 2398 rdma->channel = NULL; 2399 } 2400 g_free(rdma->host); 2401 g_free(rdma->host_port); 2402 rdma->host = NULL; 2403 rdma->host_port = NULL; 2404 } 2405 2406 2407 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp) 2408 { 2409 int ret, idx; 2410 Error *local_err = NULL, **temp = &local_err; 2411 2412 /* 2413 * Will be validated against destination's actual capabilities 2414 * after the connect() completes. 2415 */ 2416 rdma->pin_all = pin_all; 2417 2418 ret = qemu_rdma_resolve_host(rdma, temp); 2419 if (ret) { 2420 goto err_rdma_source_init; 2421 } 2422 2423 ret = qemu_rdma_alloc_pd_cq(rdma); 2424 if (ret) { 2425 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()" 2426 " limits may be too low. Please check $ ulimit -a # and " 2427 "search for 'ulimit -l' in the output"); 2428 goto err_rdma_source_init; 2429 } 2430 2431 ret = qemu_rdma_alloc_qp(rdma); 2432 if (ret) { 2433 ERROR(temp, "rdma migration: error allocating qp!"); 2434 goto err_rdma_source_init; 2435 } 2436 2437 ret = qemu_rdma_init_ram_blocks(rdma); 2438 if (ret) { 2439 ERROR(temp, "rdma migration: error initializing ram blocks!"); 2440 goto err_rdma_source_init; 2441 } 2442 2443 /* Build the hash that maps from offset to RAMBlock */ 2444 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 2445 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) { 2446 g_hash_table_insert(rdma->blockmap, 2447 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset, 2448 &rdma->local_ram_blocks.block[idx]); 2449 } 2450 2451 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2452 ret = qemu_rdma_reg_control(rdma, idx); 2453 if (ret) { 2454 ERROR(temp, "rdma migration: error registering %d control!", 2455 idx); 2456 goto err_rdma_source_init; 2457 } 2458 } 2459 2460 return 0; 2461 2462 err_rdma_source_init: 2463 error_propagate(errp, local_err); 2464 qemu_rdma_cleanup(rdma); 2465 return -1; 2466 } 2467 2468 static int qemu_get_cm_event_timeout(RDMAContext *rdma, 2469 struct rdma_cm_event **cm_event, 2470 long msec, Error **errp) 2471 { 2472 int ret; 2473 struct pollfd poll_fd = { 2474 .fd = rdma->channel->fd, 2475 .events = POLLIN, 2476 .revents = 0 2477 }; 2478 2479 do { 2480 ret = poll(&poll_fd, 1, msec); 2481 } while (ret < 0 && errno == EINTR); 2482 2483 if (ret == 0) { 2484 ERROR(errp, "poll cm event timeout"); 2485 return -1; 2486 } else if (ret < 0) { 2487 ERROR(errp, "failed to poll cm event, errno=%i", errno); 2488 return -1; 2489 } else if (poll_fd.revents & POLLIN) { 2490 return rdma_get_cm_event(rdma->channel, cm_event); 2491 } else { 2492 ERROR(errp, "no POLLIN event, revent=%x", poll_fd.revents); 2493 return -1; 2494 } 2495 } 2496 2497 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path) 2498 { 2499 RDMACapabilities cap = { 2500 .version = RDMA_CONTROL_VERSION_CURRENT, 2501 .flags = 0, 2502 }; 2503 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2504 .retry_count = 5, 2505 .private_data = &cap, 2506 .private_data_len = sizeof(cap), 2507 }; 2508 struct rdma_cm_event *cm_event; 2509 int ret; 2510 2511 /* 2512 * Only negotiate the capability with destination if the user 2513 * on the source first requested the capability. 2514 */ 2515 if (rdma->pin_all) { 2516 trace_qemu_rdma_connect_pin_all_requested(); 2517 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2518 } 2519 2520 caps_to_network(&cap); 2521 2522 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2523 if (ret) { 2524 ERROR(errp, "posting second control recv"); 2525 goto err_rdma_source_connect; 2526 } 2527 2528 ret = rdma_connect(rdma->cm_id, &conn_param); 2529 if (ret) { 2530 perror("rdma_connect"); 2531 ERROR(errp, "connecting to destination!"); 2532 goto err_rdma_source_connect; 2533 } 2534 2535 if (return_path) { 2536 ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp); 2537 } else { 2538 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2539 } 2540 if (ret) { 2541 perror("rdma_get_cm_event after rdma_connect"); 2542 ERROR(errp, "connecting to destination!"); 2543 goto err_rdma_source_connect; 2544 } 2545 2546 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2547 error_report("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect"); 2548 ERROR(errp, "connecting to destination!"); 2549 rdma_ack_cm_event(cm_event); 2550 goto err_rdma_source_connect; 2551 } 2552 rdma->connected = true; 2553 2554 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2555 network_to_caps(&cap); 2556 2557 /* 2558 * Verify that the *requested* capabilities are supported by the destination 2559 * and disable them otherwise. 2560 */ 2561 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2562 ERROR(errp, "Server cannot support pinning all memory. " 2563 "Will register memory dynamically."); 2564 rdma->pin_all = false; 2565 } 2566 2567 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2568 2569 rdma_ack_cm_event(cm_event); 2570 2571 rdma->control_ready_expected = 1; 2572 rdma->nb_sent = 0; 2573 return 0; 2574 2575 err_rdma_source_connect: 2576 qemu_rdma_cleanup(rdma); 2577 return -1; 2578 } 2579 2580 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2581 { 2582 int ret, idx; 2583 struct rdma_cm_id *listen_id; 2584 char ip[40] = "unknown"; 2585 struct rdma_addrinfo *res, *e; 2586 char port_str[16]; 2587 2588 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2589 rdma->wr_data[idx].control_len = 0; 2590 rdma->wr_data[idx].control_curr = NULL; 2591 } 2592 2593 if (!rdma->host || !rdma->host[0]) { 2594 ERROR(errp, "RDMA host is not set!"); 2595 rdma->error_state = -EINVAL; 2596 return -1; 2597 } 2598 /* create CM channel */ 2599 rdma->channel = rdma_create_event_channel(); 2600 if (!rdma->channel) { 2601 ERROR(errp, "could not create rdma event channel"); 2602 rdma->error_state = -EINVAL; 2603 return -1; 2604 } 2605 2606 /* create CM id */ 2607 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2608 if (ret) { 2609 ERROR(errp, "could not create cm_id!"); 2610 goto err_dest_init_create_listen_id; 2611 } 2612 2613 snprintf(port_str, 16, "%d", rdma->port); 2614 port_str[15] = '\0'; 2615 2616 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2617 if (ret < 0) { 2618 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 2619 goto err_dest_init_bind_addr; 2620 } 2621 2622 for (e = res; e != NULL; e = e->ai_next) { 2623 inet_ntop(e->ai_family, 2624 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2625 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2626 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2627 if (ret) { 2628 continue; 2629 } 2630 if (e->ai_family == AF_INET6) { 2631 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp); 2632 if (ret) { 2633 continue; 2634 } 2635 } 2636 break; 2637 } 2638 2639 rdma_freeaddrinfo(res); 2640 if (!e) { 2641 ERROR(errp, "Error: could not rdma_bind_addr!"); 2642 goto err_dest_init_bind_addr; 2643 } 2644 2645 rdma->listen_id = listen_id; 2646 qemu_rdma_dump_gid("dest_init", listen_id); 2647 return 0; 2648 2649 err_dest_init_bind_addr: 2650 rdma_destroy_id(listen_id); 2651 err_dest_init_create_listen_id: 2652 rdma_destroy_event_channel(rdma->channel); 2653 rdma->channel = NULL; 2654 rdma->error_state = ret; 2655 return ret; 2656 2657 } 2658 2659 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path, 2660 RDMAContext *rdma) 2661 { 2662 int idx; 2663 2664 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2665 rdma_return_path->wr_data[idx].control_len = 0; 2666 rdma_return_path->wr_data[idx].control_curr = NULL; 2667 } 2668 2669 /*the CM channel and CM id is shared*/ 2670 rdma_return_path->channel = rdma->channel; 2671 rdma_return_path->listen_id = rdma->listen_id; 2672 2673 rdma->return_path = rdma_return_path; 2674 rdma_return_path->return_path = rdma; 2675 rdma_return_path->is_return_path = true; 2676 } 2677 2678 static void *qemu_rdma_data_init(const char *host_port, Error **errp) 2679 { 2680 RDMAContext *rdma = NULL; 2681 InetSocketAddress *addr; 2682 2683 if (host_port) { 2684 rdma = g_new0(RDMAContext, 1); 2685 rdma->current_index = -1; 2686 rdma->current_chunk = -1; 2687 2688 addr = g_new(InetSocketAddress, 1); 2689 if (!inet_parse(addr, host_port, NULL)) { 2690 rdma->port = atoi(addr->port); 2691 rdma->host = g_strdup(addr->host); 2692 rdma->host_port = g_strdup(host_port); 2693 } else { 2694 ERROR(errp, "bad RDMA migration address '%s'", host_port); 2695 g_free(rdma); 2696 rdma = NULL; 2697 } 2698 2699 qapi_free_InetSocketAddress(addr); 2700 } 2701 2702 return rdma; 2703 } 2704 2705 /* 2706 * QEMUFile interface to the control channel. 2707 * SEND messages for control only. 2708 * VM's ram is handled with regular RDMA messages. 2709 */ 2710 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, 2711 const struct iovec *iov, 2712 size_t niov, 2713 int *fds, 2714 size_t nfds, 2715 Error **errp) 2716 { 2717 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2718 QEMUFile *f = rioc->file; 2719 RDMAContext *rdma; 2720 int ret; 2721 ssize_t done = 0; 2722 size_t i; 2723 size_t len = 0; 2724 2725 RCU_READ_LOCK_GUARD(); 2726 rdma = qatomic_rcu_read(&rioc->rdmaout); 2727 2728 if (!rdma) { 2729 return -EIO; 2730 } 2731 2732 CHECK_ERROR_STATE(); 2733 2734 /* 2735 * Push out any writes that 2736 * we're queued up for VM's ram. 2737 */ 2738 ret = qemu_rdma_write_flush(f, rdma); 2739 if (ret < 0) { 2740 rdma->error_state = ret; 2741 return ret; 2742 } 2743 2744 for (i = 0; i < niov; i++) { 2745 size_t remaining = iov[i].iov_len; 2746 uint8_t * data = (void *)iov[i].iov_base; 2747 while (remaining) { 2748 RDMAControlHeader head; 2749 2750 len = MIN(remaining, RDMA_SEND_INCREMENT); 2751 remaining -= len; 2752 2753 head.len = len; 2754 head.type = RDMA_CONTROL_QEMU_FILE; 2755 2756 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); 2757 2758 if (ret < 0) { 2759 rdma->error_state = ret; 2760 return ret; 2761 } 2762 2763 data += len; 2764 done += len; 2765 } 2766 } 2767 2768 return done; 2769 } 2770 2771 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2772 size_t size, int idx) 2773 { 2774 size_t len = 0; 2775 2776 if (rdma->wr_data[idx].control_len) { 2777 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2778 2779 len = MIN(size, rdma->wr_data[idx].control_len); 2780 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2781 rdma->wr_data[idx].control_curr += len; 2782 rdma->wr_data[idx].control_len -= len; 2783 } 2784 2785 return len; 2786 } 2787 2788 /* 2789 * QEMUFile interface to the control channel. 2790 * RDMA links don't use bytestreams, so we have to 2791 * return bytes to QEMUFile opportunistically. 2792 */ 2793 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, 2794 const struct iovec *iov, 2795 size_t niov, 2796 int **fds, 2797 size_t *nfds, 2798 Error **errp) 2799 { 2800 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2801 RDMAContext *rdma; 2802 RDMAControlHeader head; 2803 int ret = 0; 2804 ssize_t i; 2805 size_t done = 0; 2806 2807 RCU_READ_LOCK_GUARD(); 2808 rdma = qatomic_rcu_read(&rioc->rdmain); 2809 2810 if (!rdma) { 2811 return -EIO; 2812 } 2813 2814 CHECK_ERROR_STATE(); 2815 2816 for (i = 0; i < niov; i++) { 2817 size_t want = iov[i].iov_len; 2818 uint8_t *data = (void *)iov[i].iov_base; 2819 2820 /* 2821 * First, we hold on to the last SEND message we 2822 * were given and dish out the bytes until we run 2823 * out of bytes. 2824 */ 2825 ret = qemu_rdma_fill(rdma, data, want, 0); 2826 done += ret; 2827 want -= ret; 2828 /* Got what we needed, so go to next iovec */ 2829 if (want == 0) { 2830 continue; 2831 } 2832 2833 /* If we got any data so far, then don't wait 2834 * for more, just return what we have */ 2835 if (done > 0) { 2836 break; 2837 } 2838 2839 2840 /* We've got nothing at all, so lets wait for 2841 * more to arrive 2842 */ 2843 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); 2844 2845 if (ret < 0) { 2846 rdma->error_state = ret; 2847 return ret; 2848 } 2849 2850 /* 2851 * SEND was received with new bytes, now try again. 2852 */ 2853 ret = qemu_rdma_fill(rdma, data, want, 0); 2854 done += ret; 2855 want -= ret; 2856 2857 /* Still didn't get enough, so lets just return */ 2858 if (want) { 2859 if (done == 0) { 2860 return QIO_CHANNEL_ERR_BLOCK; 2861 } else { 2862 break; 2863 } 2864 } 2865 } 2866 return done; 2867 } 2868 2869 /* 2870 * Block until all the outstanding chunks have been delivered by the hardware. 2871 */ 2872 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) 2873 { 2874 int ret; 2875 2876 if (qemu_rdma_write_flush(f, rdma) < 0) { 2877 return -EIO; 2878 } 2879 2880 while (rdma->nb_sent) { 2881 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2882 if (ret < 0) { 2883 error_report("rdma migration: complete polling error!"); 2884 return -EIO; 2885 } 2886 } 2887 2888 qemu_rdma_unregister_waiting(rdma); 2889 2890 return 0; 2891 } 2892 2893 2894 static int qio_channel_rdma_set_blocking(QIOChannel *ioc, 2895 bool blocking, 2896 Error **errp) 2897 { 2898 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2899 /* XXX we should make readv/writev actually honour this :-) */ 2900 rioc->blocking = blocking; 2901 return 0; 2902 } 2903 2904 2905 typedef struct QIOChannelRDMASource QIOChannelRDMASource; 2906 struct QIOChannelRDMASource { 2907 GSource parent; 2908 QIOChannelRDMA *rioc; 2909 GIOCondition condition; 2910 }; 2911 2912 static gboolean 2913 qio_channel_rdma_source_prepare(GSource *source, 2914 gint *timeout) 2915 { 2916 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2917 RDMAContext *rdma; 2918 GIOCondition cond = 0; 2919 *timeout = -1; 2920 2921 RCU_READ_LOCK_GUARD(); 2922 if (rsource->condition == G_IO_IN) { 2923 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 2924 } else { 2925 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 2926 } 2927 2928 if (!rdma) { 2929 error_report("RDMAContext is NULL when prepare Gsource"); 2930 return FALSE; 2931 } 2932 2933 if (rdma->wr_data[0].control_len) { 2934 cond |= G_IO_IN; 2935 } 2936 cond |= G_IO_OUT; 2937 2938 return cond & rsource->condition; 2939 } 2940 2941 static gboolean 2942 qio_channel_rdma_source_check(GSource *source) 2943 { 2944 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2945 RDMAContext *rdma; 2946 GIOCondition cond = 0; 2947 2948 RCU_READ_LOCK_GUARD(); 2949 if (rsource->condition == G_IO_IN) { 2950 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 2951 } else { 2952 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 2953 } 2954 2955 if (!rdma) { 2956 error_report("RDMAContext is NULL when check Gsource"); 2957 return FALSE; 2958 } 2959 2960 if (rdma->wr_data[0].control_len) { 2961 cond |= G_IO_IN; 2962 } 2963 cond |= G_IO_OUT; 2964 2965 return cond & rsource->condition; 2966 } 2967 2968 static gboolean 2969 qio_channel_rdma_source_dispatch(GSource *source, 2970 GSourceFunc callback, 2971 gpointer user_data) 2972 { 2973 QIOChannelFunc func = (QIOChannelFunc)callback; 2974 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2975 RDMAContext *rdma; 2976 GIOCondition cond = 0; 2977 2978 RCU_READ_LOCK_GUARD(); 2979 if (rsource->condition == G_IO_IN) { 2980 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 2981 } else { 2982 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 2983 } 2984 2985 if (!rdma) { 2986 error_report("RDMAContext is NULL when dispatch Gsource"); 2987 return FALSE; 2988 } 2989 2990 if (rdma->wr_data[0].control_len) { 2991 cond |= G_IO_IN; 2992 } 2993 cond |= G_IO_OUT; 2994 2995 return (*func)(QIO_CHANNEL(rsource->rioc), 2996 (cond & rsource->condition), 2997 user_data); 2998 } 2999 3000 static void 3001 qio_channel_rdma_source_finalize(GSource *source) 3002 { 3003 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source; 3004 3005 object_unref(OBJECT(ssource->rioc)); 3006 } 3007 3008 GSourceFuncs qio_channel_rdma_source_funcs = { 3009 qio_channel_rdma_source_prepare, 3010 qio_channel_rdma_source_check, 3011 qio_channel_rdma_source_dispatch, 3012 qio_channel_rdma_source_finalize 3013 }; 3014 3015 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, 3016 GIOCondition condition) 3017 { 3018 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3019 QIOChannelRDMASource *ssource; 3020 GSource *source; 3021 3022 source = g_source_new(&qio_channel_rdma_source_funcs, 3023 sizeof(QIOChannelRDMASource)); 3024 ssource = (QIOChannelRDMASource *)source; 3025 3026 ssource->rioc = rioc; 3027 object_ref(OBJECT(rioc)); 3028 3029 ssource->condition = condition; 3030 3031 return source; 3032 } 3033 3034 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, 3035 AioContext *ctx, 3036 IOHandler *io_read, 3037 IOHandler *io_write, 3038 void *opaque) 3039 { 3040 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3041 if (io_read) { 3042 aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd, 3043 false, io_read, io_write, NULL, opaque); 3044 } else { 3045 aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd, 3046 false, io_read, io_write, NULL, opaque); 3047 } 3048 } 3049 3050 struct rdma_close_rcu { 3051 struct rcu_head rcu; 3052 RDMAContext *rdmain; 3053 RDMAContext *rdmaout; 3054 }; 3055 3056 /* callback from qio_channel_rdma_close via call_rcu */ 3057 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu) 3058 { 3059 if (rcu->rdmain) { 3060 qemu_rdma_cleanup(rcu->rdmain); 3061 } 3062 3063 if (rcu->rdmaout) { 3064 qemu_rdma_cleanup(rcu->rdmaout); 3065 } 3066 3067 g_free(rcu->rdmain); 3068 g_free(rcu->rdmaout); 3069 g_free(rcu); 3070 } 3071 3072 static int qio_channel_rdma_close(QIOChannel *ioc, 3073 Error **errp) 3074 { 3075 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3076 RDMAContext *rdmain, *rdmaout; 3077 struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1); 3078 3079 trace_qemu_rdma_close(); 3080 3081 rdmain = rioc->rdmain; 3082 if (rdmain) { 3083 qatomic_rcu_set(&rioc->rdmain, NULL); 3084 } 3085 3086 rdmaout = rioc->rdmaout; 3087 if (rdmaout) { 3088 qatomic_rcu_set(&rioc->rdmaout, NULL); 3089 } 3090 3091 rcu->rdmain = rdmain; 3092 rcu->rdmaout = rdmaout; 3093 call_rcu(rcu, qio_channel_rdma_close_rcu, rcu); 3094 3095 return 0; 3096 } 3097 3098 static int 3099 qio_channel_rdma_shutdown(QIOChannel *ioc, 3100 QIOChannelShutdown how, 3101 Error **errp) 3102 { 3103 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3104 RDMAContext *rdmain, *rdmaout; 3105 3106 RCU_READ_LOCK_GUARD(); 3107 3108 rdmain = qatomic_rcu_read(&rioc->rdmain); 3109 rdmaout = qatomic_rcu_read(&rioc->rdmain); 3110 3111 switch (how) { 3112 case QIO_CHANNEL_SHUTDOWN_READ: 3113 if (rdmain) { 3114 rdmain->error_state = -1; 3115 } 3116 break; 3117 case QIO_CHANNEL_SHUTDOWN_WRITE: 3118 if (rdmaout) { 3119 rdmaout->error_state = -1; 3120 } 3121 break; 3122 case QIO_CHANNEL_SHUTDOWN_BOTH: 3123 default: 3124 if (rdmain) { 3125 rdmain->error_state = -1; 3126 } 3127 if (rdmaout) { 3128 rdmaout->error_state = -1; 3129 } 3130 break; 3131 } 3132 3133 return 0; 3134 } 3135 3136 /* 3137 * Parameters: 3138 * @offset == 0 : 3139 * This means that 'block_offset' is a full virtual address that does not 3140 * belong to a RAMBlock of the virtual machine and instead 3141 * represents a private malloc'd memory area that the caller wishes to 3142 * transfer. 3143 * 3144 * @offset != 0 : 3145 * Offset is an offset to be added to block_offset and used 3146 * to also lookup the corresponding RAMBlock. 3147 * 3148 * @size > 0 : 3149 * Initiate an transfer this size. 3150 * 3151 * @size == 0 : 3152 * A 'hint' or 'advice' that means that we wish to speculatively 3153 * and asynchronously unregister this memory. In this case, there is no 3154 * guarantee that the unregister will actually happen, for example, 3155 * if the memory is being actively transmitted. Additionally, the memory 3156 * may be re-registered at any future time if a write within the same 3157 * chunk was requested again, even if you attempted to unregister it 3158 * here. 3159 * 3160 * @size < 0 : TODO, not yet supported 3161 * Unregister the memory NOW. This means that the caller does not 3162 * expect there to be any future RDMA transfers and we just want to clean 3163 * things up. This is used in case the upper layer owns the memory and 3164 * cannot wait for qemu_fclose() to occur. 3165 * 3166 * @bytes_sent : User-specificed pointer to indicate how many bytes were 3167 * sent. Usually, this will not be more than a few bytes of 3168 * the protocol because most transfers are sent asynchronously. 3169 */ 3170 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, 3171 ram_addr_t block_offset, ram_addr_t offset, 3172 size_t size, uint64_t *bytes_sent) 3173 { 3174 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3175 RDMAContext *rdma; 3176 int ret; 3177 3178 RCU_READ_LOCK_GUARD(); 3179 rdma = qatomic_rcu_read(&rioc->rdmaout); 3180 3181 if (!rdma) { 3182 return -EIO; 3183 } 3184 3185 CHECK_ERROR_STATE(); 3186 3187 if (migration_in_postcopy()) { 3188 return RAM_SAVE_CONTROL_NOT_SUPP; 3189 } 3190 3191 qemu_fflush(f); 3192 3193 if (size > 0) { 3194 /* 3195 * Add this page to the current 'chunk'. If the chunk 3196 * is full, or the page doesn't belong to the current chunk, 3197 * an actual RDMA write will occur and a new chunk will be formed. 3198 */ 3199 ret = qemu_rdma_write(f, rdma, block_offset, offset, size); 3200 if (ret < 0) { 3201 error_report("rdma migration: write error! %d", ret); 3202 goto err; 3203 } 3204 3205 /* 3206 * We always return 1 bytes because the RDMA 3207 * protocol is completely asynchronous. We do not yet know 3208 * whether an identified chunk is zero or not because we're 3209 * waiting for other pages to potentially be merged with 3210 * the current chunk. So, we have to call qemu_update_position() 3211 * later on when the actual write occurs. 3212 */ 3213 if (bytes_sent) { 3214 *bytes_sent = 1; 3215 } 3216 } else { 3217 uint64_t index, chunk; 3218 3219 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long 3220 if (size < 0) { 3221 ret = qemu_rdma_drain_cq(f, rdma); 3222 if (ret < 0) { 3223 fprintf(stderr, "rdma: failed to synchronously drain" 3224 " completion queue before unregistration.\n"); 3225 goto err; 3226 } 3227 } 3228 */ 3229 3230 ret = qemu_rdma_search_ram_block(rdma, block_offset, 3231 offset, size, &index, &chunk); 3232 3233 if (ret) { 3234 error_report("ram block search failed"); 3235 goto err; 3236 } 3237 3238 qemu_rdma_signal_unregister(rdma, index, chunk, 0); 3239 3240 /* 3241 * TODO: Synchronous, guaranteed unregistration (should not occur during 3242 * fast-path). Otherwise, unregisters will process on the next call to 3243 * qemu_rdma_drain_cq() 3244 if (size < 0) { 3245 qemu_rdma_unregister_waiting(rdma); 3246 } 3247 */ 3248 } 3249 3250 /* 3251 * Drain the Completion Queue if possible, but do not block, 3252 * just poll. 3253 * 3254 * If nothing to poll, the end of the iteration will do this 3255 * again to make sure we don't overflow the request queue. 3256 */ 3257 while (1) { 3258 uint64_t wr_id, wr_id_in; 3259 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL); 3260 if (ret < 0) { 3261 error_report("rdma migration: polling error! %d", ret); 3262 goto err; 3263 } 3264 3265 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 3266 3267 if (wr_id == RDMA_WRID_NONE) { 3268 break; 3269 } 3270 } 3271 3272 return RAM_SAVE_CONTROL_DELAYED; 3273 err: 3274 rdma->error_state = ret; 3275 return ret; 3276 } 3277 3278 static void rdma_accept_incoming_migration(void *opaque); 3279 3280 static void rdma_cm_poll_handler(void *opaque) 3281 { 3282 RDMAContext *rdma = opaque; 3283 int ret; 3284 struct rdma_cm_event *cm_event; 3285 MigrationIncomingState *mis = migration_incoming_get_current(); 3286 3287 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3288 if (ret) { 3289 error_report("get_cm_event failed %d", errno); 3290 return; 3291 } 3292 3293 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 3294 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 3295 if (!rdma->error_state && 3296 migration_incoming_get_current()->state != 3297 MIGRATION_STATUS_COMPLETED) { 3298 error_report("receive cm event, cm event is %d", cm_event->event); 3299 rdma->error_state = -EPIPE; 3300 if (rdma->return_path) { 3301 rdma->return_path->error_state = -EPIPE; 3302 } 3303 } 3304 rdma_ack_cm_event(cm_event); 3305 3306 if (mis->migration_incoming_co) { 3307 qemu_coroutine_enter(mis->migration_incoming_co); 3308 } 3309 return; 3310 } 3311 rdma_ack_cm_event(cm_event); 3312 } 3313 3314 static int qemu_rdma_accept(RDMAContext *rdma) 3315 { 3316 RDMACapabilities cap; 3317 struct rdma_conn_param conn_param = { 3318 .responder_resources = 2, 3319 .private_data = &cap, 3320 .private_data_len = sizeof(cap), 3321 }; 3322 RDMAContext *rdma_return_path = NULL; 3323 struct rdma_cm_event *cm_event; 3324 struct ibv_context *verbs; 3325 int ret = -EINVAL; 3326 int idx; 3327 3328 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3329 if (ret) { 3330 goto err_rdma_dest_wait; 3331 } 3332 3333 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 3334 rdma_ack_cm_event(cm_event); 3335 goto err_rdma_dest_wait; 3336 } 3337 3338 /* 3339 * initialize the RDMAContext for return path for postcopy after first 3340 * connection request reached. 3341 */ 3342 if (migrate_postcopy() && !rdma->is_return_path) { 3343 rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL); 3344 if (rdma_return_path == NULL) { 3345 rdma_ack_cm_event(cm_event); 3346 goto err_rdma_dest_wait; 3347 } 3348 3349 qemu_rdma_return_path_dest_init(rdma_return_path, rdma); 3350 } 3351 3352 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 3353 3354 network_to_caps(&cap); 3355 3356 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 3357 error_report("Unknown source RDMA version: %d, bailing...", 3358 cap.version); 3359 rdma_ack_cm_event(cm_event); 3360 goto err_rdma_dest_wait; 3361 } 3362 3363 /* 3364 * Respond with only the capabilities this version of QEMU knows about. 3365 */ 3366 cap.flags &= known_capabilities; 3367 3368 /* 3369 * Enable the ones that we do know about. 3370 * Add other checks here as new ones are introduced. 3371 */ 3372 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 3373 rdma->pin_all = true; 3374 } 3375 3376 rdma->cm_id = cm_event->id; 3377 verbs = cm_event->id->verbs; 3378 3379 rdma_ack_cm_event(cm_event); 3380 3381 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 3382 3383 caps_to_network(&cap); 3384 3385 trace_qemu_rdma_accept_pin_verbsc(verbs); 3386 3387 if (!rdma->verbs) { 3388 rdma->verbs = verbs; 3389 } else if (rdma->verbs != verbs) { 3390 error_report("ibv context not matching %p, %p!", rdma->verbs, 3391 verbs); 3392 goto err_rdma_dest_wait; 3393 } 3394 3395 qemu_rdma_dump_id("dest_init", verbs); 3396 3397 ret = qemu_rdma_alloc_pd_cq(rdma); 3398 if (ret) { 3399 error_report("rdma migration: error allocating pd and cq!"); 3400 goto err_rdma_dest_wait; 3401 } 3402 3403 ret = qemu_rdma_alloc_qp(rdma); 3404 if (ret) { 3405 error_report("rdma migration: error allocating qp!"); 3406 goto err_rdma_dest_wait; 3407 } 3408 3409 ret = qemu_rdma_init_ram_blocks(rdma); 3410 if (ret) { 3411 error_report("rdma migration: error initializing ram blocks!"); 3412 goto err_rdma_dest_wait; 3413 } 3414 3415 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 3416 ret = qemu_rdma_reg_control(rdma, idx); 3417 if (ret) { 3418 error_report("rdma: error registering %d control", idx); 3419 goto err_rdma_dest_wait; 3420 } 3421 } 3422 3423 /* Accept the second connection request for return path */ 3424 if (migrate_postcopy() && !rdma->is_return_path) { 3425 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3426 NULL, 3427 (void *)(intptr_t)rdma->return_path); 3428 } else { 3429 qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, 3430 NULL, rdma); 3431 } 3432 3433 ret = rdma_accept(rdma->cm_id, &conn_param); 3434 if (ret) { 3435 error_report("rdma_accept returns %d", ret); 3436 goto err_rdma_dest_wait; 3437 } 3438 3439 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3440 if (ret) { 3441 error_report("rdma_accept get_cm_event failed %d", ret); 3442 goto err_rdma_dest_wait; 3443 } 3444 3445 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 3446 error_report("rdma_accept not event established"); 3447 rdma_ack_cm_event(cm_event); 3448 goto err_rdma_dest_wait; 3449 } 3450 3451 rdma_ack_cm_event(cm_event); 3452 rdma->connected = true; 3453 3454 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 3455 if (ret) { 3456 error_report("rdma migration: error posting second control recv"); 3457 goto err_rdma_dest_wait; 3458 } 3459 3460 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 3461 3462 return 0; 3463 3464 err_rdma_dest_wait: 3465 rdma->error_state = ret; 3466 qemu_rdma_cleanup(rdma); 3467 g_free(rdma_return_path); 3468 return ret; 3469 } 3470 3471 static int dest_ram_sort_func(const void *a, const void *b) 3472 { 3473 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index; 3474 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index; 3475 3476 return (a_index < b_index) ? -1 : (a_index != b_index); 3477 } 3478 3479 /* 3480 * During each iteration of the migration, we listen for instructions 3481 * by the source VM to perform dynamic page registrations before they 3482 * can perform RDMA operations. 3483 * 3484 * We respond with the 'rkey'. 3485 * 3486 * Keep doing this until the source tells us to stop. 3487 */ 3488 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) 3489 { 3490 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 3491 .type = RDMA_CONTROL_REGISTER_RESULT, 3492 .repeat = 0, 3493 }; 3494 RDMAControlHeader unreg_resp = { .len = 0, 3495 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 3496 .repeat = 0, 3497 }; 3498 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 3499 .repeat = 1 }; 3500 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3501 RDMAContext *rdma; 3502 RDMALocalBlocks *local; 3503 RDMAControlHeader head; 3504 RDMARegister *reg, *registers; 3505 RDMACompress *comp; 3506 RDMARegisterResult *reg_result; 3507 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 3508 RDMALocalBlock *block; 3509 void *host_addr; 3510 int ret = 0; 3511 int idx = 0; 3512 int count = 0; 3513 int i = 0; 3514 3515 RCU_READ_LOCK_GUARD(); 3516 rdma = qatomic_rcu_read(&rioc->rdmain); 3517 3518 if (!rdma) { 3519 return -EIO; 3520 } 3521 3522 CHECK_ERROR_STATE(); 3523 3524 local = &rdma->local_ram_blocks; 3525 do { 3526 trace_qemu_rdma_registration_handle_wait(); 3527 3528 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE); 3529 3530 if (ret < 0) { 3531 break; 3532 } 3533 3534 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 3535 error_report("rdma: Too many requests in this message (%d)." 3536 "Bailing.", head.repeat); 3537 ret = -EIO; 3538 break; 3539 } 3540 3541 switch (head.type) { 3542 case RDMA_CONTROL_COMPRESS: 3543 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 3544 network_to_compress(comp); 3545 3546 trace_qemu_rdma_registration_handle_compress(comp->length, 3547 comp->block_idx, 3548 comp->offset); 3549 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) { 3550 error_report("rdma: 'compress' bad block index %u (vs %d)", 3551 (unsigned int)comp->block_idx, 3552 rdma->local_ram_blocks.nb_blocks); 3553 ret = -EIO; 3554 goto out; 3555 } 3556 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 3557 3558 host_addr = block->local_host_addr + 3559 (comp->offset - block->offset); 3560 3561 ram_handle_compressed(host_addr, comp->value, comp->length); 3562 break; 3563 3564 case RDMA_CONTROL_REGISTER_FINISHED: 3565 trace_qemu_rdma_registration_handle_finished(); 3566 goto out; 3567 3568 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 3569 trace_qemu_rdma_registration_handle_ram_blocks(); 3570 3571 /* Sort our local RAM Block list so it's the same as the source, 3572 * we can do this since we've filled in a src_index in the list 3573 * as we received the RAMBlock list earlier. 3574 */ 3575 qsort(rdma->local_ram_blocks.block, 3576 rdma->local_ram_blocks.nb_blocks, 3577 sizeof(RDMALocalBlock), dest_ram_sort_func); 3578 for (i = 0; i < local->nb_blocks; i++) { 3579 local->block[i].index = i; 3580 } 3581 3582 if (rdma->pin_all) { 3583 ret = qemu_rdma_reg_whole_ram_blocks(rdma); 3584 if (ret) { 3585 error_report("rdma migration: error dest " 3586 "registering ram blocks"); 3587 goto out; 3588 } 3589 } 3590 3591 /* 3592 * Dest uses this to prepare to transmit the RAMBlock descriptions 3593 * to the source VM after connection setup. 3594 * Both sides use the "remote" structure to communicate and update 3595 * their "local" descriptions with what was sent. 3596 */ 3597 for (i = 0; i < local->nb_blocks; i++) { 3598 rdma->dest_blocks[i].remote_host_addr = 3599 (uintptr_t)(local->block[i].local_host_addr); 3600 3601 if (rdma->pin_all) { 3602 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 3603 } 3604 3605 rdma->dest_blocks[i].offset = local->block[i].offset; 3606 rdma->dest_blocks[i].length = local->block[i].length; 3607 3608 dest_block_to_network(&rdma->dest_blocks[i]); 3609 trace_qemu_rdma_registration_handle_ram_blocks_loop( 3610 local->block[i].block_name, 3611 local->block[i].offset, 3612 local->block[i].length, 3613 local->block[i].local_host_addr, 3614 local->block[i].src_index); 3615 } 3616 3617 blocks.len = rdma->local_ram_blocks.nb_blocks 3618 * sizeof(RDMADestBlock); 3619 3620 3621 ret = qemu_rdma_post_send_control(rdma, 3622 (uint8_t *) rdma->dest_blocks, &blocks); 3623 3624 if (ret < 0) { 3625 error_report("rdma migration: error sending remote info"); 3626 goto out; 3627 } 3628 3629 break; 3630 case RDMA_CONTROL_REGISTER_REQUEST: 3631 trace_qemu_rdma_registration_handle_register(head.repeat); 3632 3633 reg_resp.repeat = head.repeat; 3634 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3635 3636 for (count = 0; count < head.repeat; count++) { 3637 uint64_t chunk; 3638 uint8_t *chunk_start, *chunk_end; 3639 3640 reg = ®isters[count]; 3641 network_to_register(reg); 3642 3643 reg_result = &results[count]; 3644 3645 trace_qemu_rdma_registration_handle_register_loop(count, 3646 reg->current_index, reg->key.current_addr, reg->chunks); 3647 3648 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) { 3649 error_report("rdma: 'register' bad block index %u (vs %d)", 3650 (unsigned int)reg->current_index, 3651 rdma->local_ram_blocks.nb_blocks); 3652 ret = -ENOENT; 3653 goto out; 3654 } 3655 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3656 if (block->is_ram_block) { 3657 if (block->offset > reg->key.current_addr) { 3658 error_report("rdma: bad register address for block %s" 3659 " offset: %" PRIx64 " current_addr: %" PRIx64, 3660 block->block_name, block->offset, 3661 reg->key.current_addr); 3662 ret = -ERANGE; 3663 goto out; 3664 } 3665 host_addr = (block->local_host_addr + 3666 (reg->key.current_addr - block->offset)); 3667 chunk = ram_chunk_index(block->local_host_addr, 3668 (uint8_t *) host_addr); 3669 } else { 3670 chunk = reg->key.chunk; 3671 host_addr = block->local_host_addr + 3672 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3673 /* Check for particularly bad chunk value */ 3674 if (host_addr < (void *)block->local_host_addr) { 3675 error_report("rdma: bad chunk for block %s" 3676 " chunk: %" PRIx64, 3677 block->block_name, reg->key.chunk); 3678 ret = -ERANGE; 3679 goto out; 3680 } 3681 } 3682 chunk_start = ram_chunk_start(block, chunk); 3683 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3684 /* avoid "-Waddress-of-packed-member" warning */ 3685 uint32_t tmp_rkey = 0; 3686 if (qemu_rdma_register_and_get_keys(rdma, block, 3687 (uintptr_t)host_addr, NULL, &tmp_rkey, 3688 chunk, chunk_start, chunk_end)) { 3689 error_report("cannot get rkey"); 3690 ret = -EINVAL; 3691 goto out; 3692 } 3693 reg_result->rkey = tmp_rkey; 3694 3695 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3696 3697 trace_qemu_rdma_registration_handle_register_rkey( 3698 reg_result->rkey); 3699 3700 result_to_network(reg_result); 3701 } 3702 3703 ret = qemu_rdma_post_send_control(rdma, 3704 (uint8_t *) results, ®_resp); 3705 3706 if (ret < 0) { 3707 error_report("Failed to send control buffer"); 3708 goto out; 3709 } 3710 break; 3711 case RDMA_CONTROL_UNREGISTER_REQUEST: 3712 trace_qemu_rdma_registration_handle_unregister(head.repeat); 3713 unreg_resp.repeat = head.repeat; 3714 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3715 3716 for (count = 0; count < head.repeat; count++) { 3717 reg = ®isters[count]; 3718 network_to_register(reg); 3719 3720 trace_qemu_rdma_registration_handle_unregister_loop(count, 3721 reg->current_index, reg->key.chunk); 3722 3723 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3724 3725 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3726 block->pmr[reg->key.chunk] = NULL; 3727 3728 if (ret != 0) { 3729 perror("rdma unregistration chunk failed"); 3730 ret = -ret; 3731 goto out; 3732 } 3733 3734 rdma->total_registrations--; 3735 3736 trace_qemu_rdma_registration_handle_unregister_success( 3737 reg->key.chunk); 3738 } 3739 3740 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp); 3741 3742 if (ret < 0) { 3743 error_report("Failed to send control buffer"); 3744 goto out; 3745 } 3746 break; 3747 case RDMA_CONTROL_REGISTER_RESULT: 3748 error_report("Invalid RESULT message at dest."); 3749 ret = -EIO; 3750 goto out; 3751 default: 3752 error_report("Unknown control message %s", control_desc(head.type)); 3753 ret = -EIO; 3754 goto out; 3755 } 3756 } while (1); 3757 out: 3758 if (ret < 0) { 3759 rdma->error_state = ret; 3760 } 3761 return ret; 3762 } 3763 3764 /* Destination: 3765 * Called via a ram_control_load_hook during the initial RAM load section which 3766 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks 3767 * on the source. 3768 * We've already built our local RAMBlock list, but not yet sent the list to 3769 * the source. 3770 */ 3771 static int 3772 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) 3773 { 3774 RDMAContext *rdma; 3775 int curr; 3776 int found = -1; 3777 3778 RCU_READ_LOCK_GUARD(); 3779 rdma = qatomic_rcu_read(&rioc->rdmain); 3780 3781 if (!rdma) { 3782 return -EIO; 3783 } 3784 3785 /* Find the matching RAMBlock in our local list */ 3786 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { 3787 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { 3788 found = curr; 3789 break; 3790 } 3791 } 3792 3793 if (found == -1) { 3794 error_report("RAMBlock '%s' not found on destination", name); 3795 return -ENOENT; 3796 } 3797 3798 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index; 3799 trace_rdma_block_notification_handle(name, rdma->next_src_index); 3800 rdma->next_src_index++; 3801 3802 return 0; 3803 } 3804 3805 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) 3806 { 3807 switch (flags) { 3808 case RAM_CONTROL_BLOCK_REG: 3809 return rdma_block_notification_handle(opaque, data); 3810 3811 case RAM_CONTROL_HOOK: 3812 return qemu_rdma_registration_handle(f, opaque); 3813 3814 default: 3815 /* Shouldn't be called with any other values */ 3816 abort(); 3817 } 3818 } 3819 3820 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, 3821 uint64_t flags, void *data) 3822 { 3823 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3824 RDMAContext *rdma; 3825 3826 RCU_READ_LOCK_GUARD(); 3827 rdma = qatomic_rcu_read(&rioc->rdmaout); 3828 if (!rdma) { 3829 return -EIO; 3830 } 3831 3832 CHECK_ERROR_STATE(); 3833 3834 if (migration_in_postcopy()) { 3835 return 0; 3836 } 3837 3838 trace_qemu_rdma_registration_start(flags); 3839 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3840 qemu_fflush(f); 3841 3842 return 0; 3843 } 3844 3845 /* 3846 * Inform dest that dynamic registrations are done for now. 3847 * First, flush writes, if any. 3848 */ 3849 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, 3850 uint64_t flags, void *data) 3851 { 3852 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3853 RDMAContext *rdma; 3854 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3855 int ret = 0; 3856 3857 RCU_READ_LOCK_GUARD(); 3858 rdma = qatomic_rcu_read(&rioc->rdmaout); 3859 if (!rdma) { 3860 return -EIO; 3861 } 3862 3863 CHECK_ERROR_STATE(); 3864 3865 if (migration_in_postcopy()) { 3866 return 0; 3867 } 3868 3869 qemu_fflush(f); 3870 ret = qemu_rdma_drain_cq(f, rdma); 3871 3872 if (ret < 0) { 3873 goto err; 3874 } 3875 3876 if (flags == RAM_CONTROL_SETUP) { 3877 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3878 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3879 int reg_result_idx, i, nb_dest_blocks; 3880 3881 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3882 trace_qemu_rdma_registration_stop_ram(); 3883 3884 /* 3885 * Make sure that we parallelize the pinning on both sides. 3886 * For very large guests, doing this serially takes a really 3887 * long time, so we have to 'interleave' the pinning locally 3888 * with the control messages by performing the pinning on this 3889 * side before we receive the control response from the other 3890 * side that the pinning has completed. 3891 */ 3892 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3893 ®_result_idx, rdma->pin_all ? 3894 qemu_rdma_reg_whole_ram_blocks : NULL); 3895 if (ret < 0) { 3896 fprintf(stderr, "receiving remote info!"); 3897 return ret; 3898 } 3899 3900 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3901 3902 /* 3903 * The protocol uses two different sets of rkeys (mutually exclusive): 3904 * 1. One key to represent the virtual address of the entire ram block. 3905 * (dynamic chunk registration disabled - pin everything with one rkey.) 3906 * 2. One to represent individual chunks within a ram block. 3907 * (dynamic chunk registration enabled - pin individual chunks.) 3908 * 3909 * Once the capability is successfully negotiated, the destination transmits 3910 * the keys to use (or sends them later) including the virtual addresses 3911 * and then propagates the remote ram block descriptions to his local copy. 3912 */ 3913 3914 if (local->nb_blocks != nb_dest_blocks) { 3915 fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) " 3916 "Your QEMU command line parameters are probably " 3917 "not identical on both the source and destination.", 3918 local->nb_blocks, nb_dest_blocks); 3919 rdma->error_state = -EINVAL; 3920 return -EINVAL; 3921 } 3922 3923 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 3924 memcpy(rdma->dest_blocks, 3925 rdma->wr_data[reg_result_idx].control_curr, resp.len); 3926 for (i = 0; i < nb_dest_blocks; i++) { 3927 network_to_dest_block(&rdma->dest_blocks[i]); 3928 3929 /* We require that the blocks are in the same order */ 3930 if (rdma->dest_blocks[i].length != local->block[i].length) { 3931 fprintf(stderr, "Block %s/%d has a different length %" PRIu64 3932 "vs %" PRIu64, local->block[i].block_name, i, 3933 local->block[i].length, 3934 rdma->dest_blocks[i].length); 3935 rdma->error_state = -EINVAL; 3936 return -EINVAL; 3937 } 3938 local->block[i].remote_host_addr = 3939 rdma->dest_blocks[i].remote_host_addr; 3940 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; 3941 } 3942 } 3943 3944 trace_qemu_rdma_registration_stop(flags); 3945 3946 head.type = RDMA_CONTROL_REGISTER_FINISHED; 3947 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); 3948 3949 if (ret < 0) { 3950 goto err; 3951 } 3952 3953 return 0; 3954 err: 3955 rdma->error_state = ret; 3956 return ret; 3957 } 3958 3959 static const QEMUFileHooks rdma_read_hooks = { 3960 .hook_ram_load = rdma_load_hook, 3961 }; 3962 3963 static const QEMUFileHooks rdma_write_hooks = { 3964 .before_ram_iterate = qemu_rdma_registration_start, 3965 .after_ram_iterate = qemu_rdma_registration_stop, 3966 .save_page = qemu_rdma_save_page, 3967 }; 3968 3969 3970 static void qio_channel_rdma_finalize(Object *obj) 3971 { 3972 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); 3973 if (rioc->rdmain) { 3974 qemu_rdma_cleanup(rioc->rdmain); 3975 g_free(rioc->rdmain); 3976 rioc->rdmain = NULL; 3977 } 3978 if (rioc->rdmaout) { 3979 qemu_rdma_cleanup(rioc->rdmaout); 3980 g_free(rioc->rdmaout); 3981 rioc->rdmaout = NULL; 3982 } 3983 } 3984 3985 static void qio_channel_rdma_class_init(ObjectClass *klass, 3986 void *class_data G_GNUC_UNUSED) 3987 { 3988 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 3989 3990 ioc_klass->io_writev = qio_channel_rdma_writev; 3991 ioc_klass->io_readv = qio_channel_rdma_readv; 3992 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; 3993 ioc_klass->io_close = qio_channel_rdma_close; 3994 ioc_klass->io_create_watch = qio_channel_rdma_create_watch; 3995 ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler; 3996 ioc_klass->io_shutdown = qio_channel_rdma_shutdown; 3997 } 3998 3999 static const TypeInfo qio_channel_rdma_info = { 4000 .parent = TYPE_QIO_CHANNEL, 4001 .name = TYPE_QIO_CHANNEL_RDMA, 4002 .instance_size = sizeof(QIOChannelRDMA), 4003 .instance_finalize = qio_channel_rdma_finalize, 4004 .class_init = qio_channel_rdma_class_init, 4005 }; 4006 4007 static void qio_channel_rdma_register_types(void) 4008 { 4009 type_register_static(&qio_channel_rdma_info); 4010 } 4011 4012 type_init(qio_channel_rdma_register_types); 4013 4014 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) 4015 { 4016 QIOChannelRDMA *rioc; 4017 4018 if (qemu_file_mode_is_not_valid(mode)) { 4019 return NULL; 4020 } 4021 4022 rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); 4023 4024 if (mode[0] == 'w') { 4025 rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); 4026 rioc->rdmaout = rdma; 4027 rioc->rdmain = rdma->return_path; 4028 qemu_file_set_hooks(rioc->file, &rdma_write_hooks); 4029 } else { 4030 rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc)); 4031 rioc->rdmain = rdma; 4032 rioc->rdmaout = rdma->return_path; 4033 qemu_file_set_hooks(rioc->file, &rdma_read_hooks); 4034 } 4035 4036 return rioc->file; 4037 } 4038 4039 static void rdma_accept_incoming_migration(void *opaque) 4040 { 4041 RDMAContext *rdma = opaque; 4042 int ret; 4043 QEMUFile *f; 4044 Error *local_err = NULL; 4045 4046 trace_qemu_rdma_accept_incoming_migration(); 4047 ret = qemu_rdma_accept(rdma); 4048 4049 if (ret) { 4050 fprintf(stderr, "RDMA ERROR: Migration initialization failed\n"); 4051 return; 4052 } 4053 4054 trace_qemu_rdma_accept_incoming_migration_accepted(); 4055 4056 if (rdma->is_return_path) { 4057 return; 4058 } 4059 4060 f = qemu_fopen_rdma(rdma, "rb"); 4061 if (f == NULL) { 4062 fprintf(stderr, "RDMA ERROR: could not qemu_fopen_rdma\n"); 4063 qemu_rdma_cleanup(rdma); 4064 return; 4065 } 4066 4067 rdma->migration_started_on_destination = 1; 4068 migration_fd_process_incoming(f, &local_err); 4069 if (local_err) { 4070 error_reportf_err(local_err, "RDMA ERROR:"); 4071 } 4072 } 4073 4074 void rdma_start_incoming_migration(const char *host_port, Error **errp) 4075 { 4076 int ret; 4077 RDMAContext *rdma, *rdma_return_path = NULL; 4078 Error *local_err = NULL; 4079 4080 trace_rdma_start_incoming_migration(); 4081 4082 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 4083 if (ram_block_discard_is_required()) { 4084 error_setg(errp, "RDMA: cannot disable RAM discard"); 4085 return; 4086 } 4087 4088 rdma = qemu_rdma_data_init(host_port, &local_err); 4089 if (rdma == NULL) { 4090 goto err; 4091 } 4092 4093 ret = qemu_rdma_dest_init(rdma, &local_err); 4094 4095 if (ret) { 4096 goto err; 4097 } 4098 4099 trace_rdma_start_incoming_migration_after_dest_init(); 4100 4101 ret = rdma_listen(rdma->listen_id, 5); 4102 4103 if (ret) { 4104 ERROR(errp, "listening on socket!"); 4105 goto cleanup_rdma; 4106 } 4107 4108 trace_rdma_start_incoming_migration_after_rdma_listen(); 4109 4110 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 4111 NULL, (void *)(intptr_t)rdma); 4112 return; 4113 4114 cleanup_rdma: 4115 qemu_rdma_cleanup(rdma); 4116 err: 4117 error_propagate(errp, local_err); 4118 if (rdma) { 4119 g_free(rdma->host); 4120 g_free(rdma->host_port); 4121 } 4122 g_free(rdma); 4123 g_free(rdma_return_path); 4124 } 4125 4126 void rdma_start_outgoing_migration(void *opaque, 4127 const char *host_port, Error **errp) 4128 { 4129 MigrationState *s = opaque; 4130 RDMAContext *rdma_return_path = NULL; 4131 RDMAContext *rdma; 4132 int ret = 0; 4133 4134 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 4135 if (ram_block_discard_is_required()) { 4136 error_setg(errp, "RDMA: cannot disable RAM discard"); 4137 return; 4138 } 4139 4140 rdma = qemu_rdma_data_init(host_port, errp); 4141 if (rdma == NULL) { 4142 goto err; 4143 } 4144 4145 ret = qemu_rdma_source_init(rdma, 4146 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp); 4147 4148 if (ret) { 4149 goto err; 4150 } 4151 4152 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 4153 ret = qemu_rdma_connect(rdma, errp, false); 4154 4155 if (ret) { 4156 goto err; 4157 } 4158 4159 /* RDMA postcopy need a separate queue pair for return path */ 4160 if (migrate_postcopy()) { 4161 rdma_return_path = qemu_rdma_data_init(host_port, errp); 4162 4163 if (rdma_return_path == NULL) { 4164 goto return_path_err; 4165 } 4166 4167 ret = qemu_rdma_source_init(rdma_return_path, 4168 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp); 4169 4170 if (ret) { 4171 goto return_path_err; 4172 } 4173 4174 ret = qemu_rdma_connect(rdma_return_path, errp, true); 4175 4176 if (ret) { 4177 goto return_path_err; 4178 } 4179 4180 rdma->return_path = rdma_return_path; 4181 rdma_return_path->return_path = rdma; 4182 rdma_return_path->is_return_path = true; 4183 } 4184 4185 trace_rdma_start_outgoing_migration_after_rdma_connect(); 4186 4187 s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); 4188 migrate_fd_connect(s, NULL); 4189 return; 4190 return_path_err: 4191 qemu_rdma_cleanup(rdma); 4192 err: 4193 g_free(rdma); 4194 g_free(rdma_return_path); 4195 } 4196