1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * Copyright Red Hat, Inc. 2015-2016 6 * 7 * Authors: 8 * Michael R. Hines <mrhines@us.ibm.com> 9 * Jiuxing Liu <jl@us.ibm.com> 10 * Daniel P. Berrange <berrange@redhat.com> 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or 13 * later. See the COPYING file in the top-level directory. 14 * 15 */ 16 17 #include "qemu/osdep.h" 18 #include "qapi/error.h" 19 #include "qemu/cutils.h" 20 #include "rdma.h" 21 #include "migration.h" 22 #include "qemu-file.h" 23 #include "ram.h" 24 #include "qemu-file-channel.h" 25 #include "qemu/error-report.h" 26 #include "qemu/main-loop.h" 27 #include "qemu/module.h" 28 #include "qemu/rcu.h" 29 #include "qemu/sockets.h" 30 #include "qemu/bitmap.h" 31 #include "qemu/coroutine.h" 32 #include "exec/memory.h" 33 #include <sys/socket.h> 34 #include <netdb.h> 35 #include <arpa/inet.h> 36 #include <rdma/rdma_cma.h> 37 #include "trace.h" 38 #include "qom/object.h" 39 #include <poll.h> 40 41 /* 42 * Print and error on both the Monitor and the Log file. 43 */ 44 #define ERROR(errp, fmt, ...) \ 45 do { \ 46 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \ 47 if (errp && (*(errp) == NULL)) { \ 48 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \ 49 } \ 50 } while (0) 51 52 #define RDMA_RESOLVE_TIMEOUT_MS 10000 53 54 /* Do not merge data if larger than this. */ 55 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 56 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 57 58 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 59 60 /* 61 * This is only for non-live state being migrated. 62 * Instead of RDMA_WRITE messages, we use RDMA_SEND 63 * messages for that state, which requires a different 64 * delivery design than main memory. 65 */ 66 #define RDMA_SEND_INCREMENT 32768 67 68 /* 69 * Maximum size infiniband SEND message 70 */ 71 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 72 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 73 74 #define RDMA_CONTROL_VERSION_CURRENT 1 75 /* 76 * Capabilities for negotiation. 77 */ 78 #define RDMA_CAPABILITY_PIN_ALL 0x01 79 80 /* 81 * Add the other flags above to this list of known capabilities 82 * as they are introduced. 83 */ 84 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 85 86 #define CHECK_ERROR_STATE() \ 87 do { \ 88 if (rdma->error_state) { \ 89 if (!rdma->error_reported) { \ 90 error_report("RDMA is in an error state waiting migration" \ 91 " to abort!"); \ 92 rdma->error_reported = 1; \ 93 } \ 94 return rdma->error_state; \ 95 } \ 96 } while (0) 97 98 /* 99 * A work request ID is 64-bits and we split up these bits 100 * into 3 parts: 101 * 102 * bits 0-15 : type of control message, 2^16 103 * bits 16-29: ram block index, 2^14 104 * bits 30-63: ram block chunk number, 2^34 105 * 106 * The last two bit ranges are only used for RDMA writes, 107 * in order to track their completion and potentially 108 * also track unregistration status of the message. 109 */ 110 #define RDMA_WRID_TYPE_SHIFT 0UL 111 #define RDMA_WRID_BLOCK_SHIFT 16UL 112 #define RDMA_WRID_CHUNK_SHIFT 30UL 113 114 #define RDMA_WRID_TYPE_MASK \ 115 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 116 117 #define RDMA_WRID_BLOCK_MASK \ 118 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 119 120 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 121 122 /* 123 * RDMA migration protocol: 124 * 1. RDMA Writes (data messages, i.e. RAM) 125 * 2. IB Send/Recv (control channel messages) 126 */ 127 enum { 128 RDMA_WRID_NONE = 0, 129 RDMA_WRID_RDMA_WRITE = 1, 130 RDMA_WRID_SEND_CONTROL = 2000, 131 RDMA_WRID_RECV_CONTROL = 4000, 132 }; 133 134 static const char *wrid_desc[] = { 135 [RDMA_WRID_NONE] = "NONE", 136 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA", 137 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND", 138 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", 139 }; 140 141 /* 142 * Work request IDs for IB SEND messages only (not RDMA writes). 143 * This is used by the migration protocol to transmit 144 * control messages (such as device state and registration commands) 145 * 146 * We could use more WRs, but we have enough for now. 147 */ 148 enum { 149 RDMA_WRID_READY = 0, 150 RDMA_WRID_DATA, 151 RDMA_WRID_CONTROL, 152 RDMA_WRID_MAX, 153 }; 154 155 /* 156 * SEND/RECV IB Control Messages. 157 */ 158 enum { 159 RDMA_CONTROL_NONE = 0, 160 RDMA_CONTROL_ERROR, 161 RDMA_CONTROL_READY, /* ready to receive */ 162 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 163 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 164 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 165 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 166 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 167 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 168 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 169 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 170 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 171 }; 172 173 174 /* 175 * Memory and MR structures used to represent an IB Send/Recv work request. 176 * This is *not* used for RDMA writes, only IB Send/Recv. 177 */ 178 typedef struct { 179 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 180 struct ibv_mr *control_mr; /* registration metadata */ 181 size_t control_len; /* length of the message */ 182 uint8_t *control_curr; /* start of unconsumed bytes */ 183 } RDMAWorkRequestData; 184 185 /* 186 * Negotiate RDMA capabilities during connection-setup time. 187 */ 188 typedef struct { 189 uint32_t version; 190 uint32_t flags; 191 } RDMACapabilities; 192 193 static void caps_to_network(RDMACapabilities *cap) 194 { 195 cap->version = htonl(cap->version); 196 cap->flags = htonl(cap->flags); 197 } 198 199 static void network_to_caps(RDMACapabilities *cap) 200 { 201 cap->version = ntohl(cap->version); 202 cap->flags = ntohl(cap->flags); 203 } 204 205 /* 206 * Representation of a RAMBlock from an RDMA perspective. 207 * This is not transmitted, only local. 208 * This and subsequent structures cannot be linked lists 209 * because we're using a single IB message to transmit 210 * the information. It's small anyway, so a list is overkill. 211 */ 212 typedef struct RDMALocalBlock { 213 char *block_name; 214 uint8_t *local_host_addr; /* local virtual address */ 215 uint64_t remote_host_addr; /* remote virtual address */ 216 uint64_t offset; 217 uint64_t length; 218 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 219 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 220 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 221 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 222 int index; /* which block are we */ 223 unsigned int src_index; /* (Only used on dest) */ 224 bool is_ram_block; 225 int nb_chunks; 226 unsigned long *transit_bitmap; 227 unsigned long *unregister_bitmap; 228 } RDMALocalBlock; 229 230 /* 231 * Also represents a RAMblock, but only on the dest. 232 * This gets transmitted by the dest during connection-time 233 * to the source VM and then is used to populate the 234 * corresponding RDMALocalBlock with 235 * the information needed to perform the actual RDMA. 236 */ 237 typedef struct QEMU_PACKED RDMADestBlock { 238 uint64_t remote_host_addr; 239 uint64_t offset; 240 uint64_t length; 241 uint32_t remote_rkey; 242 uint32_t padding; 243 } RDMADestBlock; 244 245 static const char *control_desc(unsigned int rdma_control) 246 { 247 static const char *strs[] = { 248 [RDMA_CONTROL_NONE] = "NONE", 249 [RDMA_CONTROL_ERROR] = "ERROR", 250 [RDMA_CONTROL_READY] = "READY", 251 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 252 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 253 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 254 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 255 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 256 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 257 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 258 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 259 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 260 }; 261 262 if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) { 263 return "??BAD CONTROL VALUE??"; 264 } 265 266 return strs[rdma_control]; 267 } 268 269 static uint64_t htonll(uint64_t v) 270 { 271 union { uint32_t lv[2]; uint64_t llv; } u; 272 u.lv[0] = htonl(v >> 32); 273 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 274 return u.llv; 275 } 276 277 static uint64_t ntohll(uint64_t v) 278 { 279 union { uint32_t lv[2]; uint64_t llv; } u; 280 u.llv = v; 281 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 282 } 283 284 static void dest_block_to_network(RDMADestBlock *db) 285 { 286 db->remote_host_addr = htonll(db->remote_host_addr); 287 db->offset = htonll(db->offset); 288 db->length = htonll(db->length); 289 db->remote_rkey = htonl(db->remote_rkey); 290 } 291 292 static void network_to_dest_block(RDMADestBlock *db) 293 { 294 db->remote_host_addr = ntohll(db->remote_host_addr); 295 db->offset = ntohll(db->offset); 296 db->length = ntohll(db->length); 297 db->remote_rkey = ntohl(db->remote_rkey); 298 } 299 300 /* 301 * Virtual address of the above structures used for transmitting 302 * the RAMBlock descriptions at connection-time. 303 * This structure is *not* transmitted. 304 */ 305 typedef struct RDMALocalBlocks { 306 int nb_blocks; 307 bool init; /* main memory init complete */ 308 RDMALocalBlock *block; 309 } RDMALocalBlocks; 310 311 /* 312 * Main data structure for RDMA state. 313 * While there is only one copy of this structure being allocated right now, 314 * this is the place where one would start if you wanted to consider 315 * having more than one RDMA connection open at the same time. 316 */ 317 typedef struct RDMAContext { 318 char *host; 319 int port; 320 char *host_port; 321 322 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 323 324 /* 325 * This is used by *_exchange_send() to figure out whether or not 326 * the initial "READY" message has already been received or not. 327 * This is because other functions may potentially poll() and detect 328 * the READY message before send() does, in which case we need to 329 * know if it completed. 330 */ 331 int control_ready_expected; 332 333 /* number of outstanding writes */ 334 int nb_sent; 335 336 /* store info about current buffer so that we can 337 merge it with future sends */ 338 uint64_t current_addr; 339 uint64_t current_length; 340 /* index of ram block the current buffer belongs to */ 341 int current_index; 342 /* index of the chunk in the current ram block */ 343 int current_chunk; 344 345 bool pin_all; 346 347 /* 348 * infiniband-specific variables for opening the device 349 * and maintaining connection state and so forth. 350 * 351 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 352 * cm_id->verbs, cm_id->channel, and cm_id->qp. 353 */ 354 struct rdma_cm_id *cm_id; /* connection manager ID */ 355 struct rdma_cm_id *listen_id; 356 bool connected; 357 358 struct ibv_context *verbs; 359 struct rdma_event_channel *channel; 360 struct ibv_qp *qp; /* queue pair */ 361 struct ibv_comp_channel *comp_channel; /* completion channel */ 362 struct ibv_pd *pd; /* protection domain */ 363 struct ibv_cq *cq; /* completion queue */ 364 365 /* 366 * If a previous write failed (perhaps because of a failed 367 * memory registration, then do not attempt any future work 368 * and remember the error state. 369 */ 370 int error_state; 371 int error_reported; 372 int received_error; 373 374 /* 375 * Description of ram blocks used throughout the code. 376 */ 377 RDMALocalBlocks local_ram_blocks; 378 RDMADestBlock *dest_blocks; 379 380 /* Index of the next RAMBlock received during block registration */ 381 unsigned int next_src_index; 382 383 /* 384 * Migration on *destination* started. 385 * Then use coroutine yield function. 386 * Source runs in a thread, so we don't care. 387 */ 388 int migration_started_on_destination; 389 390 int total_registrations; 391 int total_writes; 392 393 int unregister_current, unregister_next; 394 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 395 396 GHashTable *blockmap; 397 398 /* the RDMAContext for return path */ 399 struct RDMAContext *return_path; 400 bool is_return_path; 401 } RDMAContext; 402 403 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" 404 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA) 405 406 407 408 struct QIOChannelRDMA { 409 QIOChannel parent; 410 RDMAContext *rdmain; 411 RDMAContext *rdmaout; 412 QEMUFile *file; 413 bool blocking; /* XXX we don't actually honour this yet */ 414 }; 415 416 /* 417 * Main structure for IB Send/Recv control messages. 418 * This gets prepended at the beginning of every Send/Recv. 419 */ 420 typedef struct QEMU_PACKED { 421 uint32_t len; /* Total length of data portion */ 422 uint32_t type; /* which control command to perform */ 423 uint32_t repeat; /* number of commands in data portion of same type */ 424 uint32_t padding; 425 } RDMAControlHeader; 426 427 static void control_to_network(RDMAControlHeader *control) 428 { 429 control->type = htonl(control->type); 430 control->len = htonl(control->len); 431 control->repeat = htonl(control->repeat); 432 } 433 434 static void network_to_control(RDMAControlHeader *control) 435 { 436 control->type = ntohl(control->type); 437 control->len = ntohl(control->len); 438 control->repeat = ntohl(control->repeat); 439 } 440 441 /* 442 * Register a single Chunk. 443 * Information sent by the source VM to inform the dest 444 * to register an single chunk of memory before we can perform 445 * the actual RDMA operation. 446 */ 447 typedef struct QEMU_PACKED { 448 union QEMU_PACKED { 449 uint64_t current_addr; /* offset into the ram_addr_t space */ 450 uint64_t chunk; /* chunk to lookup if unregistering */ 451 } key; 452 uint32_t current_index; /* which ramblock the chunk belongs to */ 453 uint32_t padding; 454 uint64_t chunks; /* how many sequential chunks to register */ 455 } RDMARegister; 456 457 static void register_to_network(RDMAContext *rdma, RDMARegister *reg) 458 { 459 RDMALocalBlock *local_block; 460 local_block = &rdma->local_ram_blocks.block[reg->current_index]; 461 462 if (local_block->is_ram_block) { 463 /* 464 * current_addr as passed in is an address in the local ram_addr_t 465 * space, we need to translate this for the destination 466 */ 467 reg->key.current_addr -= local_block->offset; 468 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset; 469 } 470 reg->key.current_addr = htonll(reg->key.current_addr); 471 reg->current_index = htonl(reg->current_index); 472 reg->chunks = htonll(reg->chunks); 473 } 474 475 static void network_to_register(RDMARegister *reg) 476 { 477 reg->key.current_addr = ntohll(reg->key.current_addr); 478 reg->current_index = ntohl(reg->current_index); 479 reg->chunks = ntohll(reg->chunks); 480 } 481 482 typedef struct QEMU_PACKED { 483 uint32_t value; /* if zero, we will madvise() */ 484 uint32_t block_idx; /* which ram block index */ 485 uint64_t offset; /* Address in remote ram_addr_t space */ 486 uint64_t length; /* length of the chunk */ 487 } RDMACompress; 488 489 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp) 490 { 491 comp->value = htonl(comp->value); 492 /* 493 * comp->offset as passed in is an address in the local ram_addr_t 494 * space, we need to translate this for the destination 495 */ 496 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset; 497 comp->offset += rdma->dest_blocks[comp->block_idx].offset; 498 comp->block_idx = htonl(comp->block_idx); 499 comp->offset = htonll(comp->offset); 500 comp->length = htonll(comp->length); 501 } 502 503 static void network_to_compress(RDMACompress *comp) 504 { 505 comp->value = ntohl(comp->value); 506 comp->block_idx = ntohl(comp->block_idx); 507 comp->offset = ntohll(comp->offset); 508 comp->length = ntohll(comp->length); 509 } 510 511 /* 512 * The result of the dest's memory registration produces an "rkey" 513 * which the source VM must reference in order to perform 514 * the RDMA operation. 515 */ 516 typedef struct QEMU_PACKED { 517 uint32_t rkey; 518 uint32_t padding; 519 uint64_t host_addr; 520 } RDMARegisterResult; 521 522 static void result_to_network(RDMARegisterResult *result) 523 { 524 result->rkey = htonl(result->rkey); 525 result->host_addr = htonll(result->host_addr); 526 }; 527 528 static void network_to_result(RDMARegisterResult *result) 529 { 530 result->rkey = ntohl(result->rkey); 531 result->host_addr = ntohll(result->host_addr); 532 }; 533 534 const char *print_wrid(int wrid); 535 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 536 uint8_t *data, RDMAControlHeader *resp, 537 int *resp_idx, 538 int (*callback)(RDMAContext *rdma)); 539 540 static inline uint64_t ram_chunk_index(const uint8_t *start, 541 const uint8_t *host) 542 { 543 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 544 } 545 546 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 547 uint64_t i) 548 { 549 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 550 (i << RDMA_REG_CHUNK_SHIFT)); 551 } 552 553 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 554 uint64_t i) 555 { 556 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 557 (1UL << RDMA_REG_CHUNK_SHIFT); 558 559 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 560 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 561 } 562 563 return result; 564 } 565 566 static int rdma_add_block(RDMAContext *rdma, const char *block_name, 567 void *host_addr, 568 ram_addr_t block_offset, uint64_t length) 569 { 570 RDMALocalBlocks *local = &rdma->local_ram_blocks; 571 RDMALocalBlock *block; 572 RDMALocalBlock *old = local->block; 573 574 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1); 575 576 if (local->nb_blocks) { 577 int x; 578 579 if (rdma->blockmap) { 580 for (x = 0; x < local->nb_blocks; x++) { 581 g_hash_table_remove(rdma->blockmap, 582 (void *)(uintptr_t)old[x].offset); 583 g_hash_table_insert(rdma->blockmap, 584 (void *)(uintptr_t)old[x].offset, 585 &local->block[x]); 586 } 587 } 588 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 589 g_free(old); 590 } 591 592 block = &local->block[local->nb_blocks]; 593 594 block->block_name = g_strdup(block_name); 595 block->local_host_addr = host_addr; 596 block->offset = block_offset; 597 block->length = length; 598 block->index = local->nb_blocks; 599 block->src_index = ~0U; /* Filled in by the receipt of the block list */ 600 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 601 block->transit_bitmap = bitmap_new(block->nb_chunks); 602 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 603 block->unregister_bitmap = bitmap_new(block->nb_chunks); 604 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 605 block->remote_keys = g_new0(uint32_t, block->nb_chunks); 606 607 block->is_ram_block = local->init ? false : true; 608 609 if (rdma->blockmap) { 610 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block); 611 } 612 613 trace_rdma_add_block(block_name, local->nb_blocks, 614 (uintptr_t) block->local_host_addr, 615 block->offset, block->length, 616 (uintptr_t) (block->local_host_addr + block->length), 617 BITS_TO_LONGS(block->nb_chunks) * 618 sizeof(unsigned long) * 8, 619 block->nb_chunks); 620 621 local->nb_blocks++; 622 623 return 0; 624 } 625 626 /* 627 * Memory regions need to be registered with the device and queue pairs setup 628 * in advanced before the migration starts. This tells us where the RAM blocks 629 * are so that we can register them individually. 630 */ 631 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque) 632 { 633 const char *block_name = qemu_ram_get_idstr(rb); 634 void *host_addr = qemu_ram_get_host_addr(rb); 635 ram_addr_t block_offset = qemu_ram_get_offset(rb); 636 ram_addr_t length = qemu_ram_get_used_length(rb); 637 return rdma_add_block(opaque, block_name, host_addr, block_offset, length); 638 } 639 640 /* 641 * Identify the RAMBlocks and their quantity. They will be references to 642 * identify chunk boundaries inside each RAMBlock and also be referenced 643 * during dynamic page registration. 644 */ 645 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma) 646 { 647 RDMALocalBlocks *local = &rdma->local_ram_blocks; 648 int ret; 649 650 assert(rdma->blockmap == NULL); 651 memset(local, 0, sizeof *local); 652 ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma); 653 if (ret) { 654 return ret; 655 } 656 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 657 rdma->dest_blocks = g_new0(RDMADestBlock, 658 rdma->local_ram_blocks.nb_blocks); 659 local->init = true; 660 return 0; 661 } 662 663 /* 664 * Note: If used outside of cleanup, the caller must ensure that the destination 665 * block structures are also updated 666 */ 667 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) 668 { 669 RDMALocalBlocks *local = &rdma->local_ram_blocks; 670 RDMALocalBlock *old = local->block; 671 int x; 672 673 if (rdma->blockmap) { 674 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset); 675 } 676 if (block->pmr) { 677 int j; 678 679 for (j = 0; j < block->nb_chunks; j++) { 680 if (!block->pmr[j]) { 681 continue; 682 } 683 ibv_dereg_mr(block->pmr[j]); 684 rdma->total_registrations--; 685 } 686 g_free(block->pmr); 687 block->pmr = NULL; 688 } 689 690 if (block->mr) { 691 ibv_dereg_mr(block->mr); 692 rdma->total_registrations--; 693 block->mr = NULL; 694 } 695 696 g_free(block->transit_bitmap); 697 block->transit_bitmap = NULL; 698 699 g_free(block->unregister_bitmap); 700 block->unregister_bitmap = NULL; 701 702 g_free(block->remote_keys); 703 block->remote_keys = NULL; 704 705 g_free(block->block_name); 706 block->block_name = NULL; 707 708 if (rdma->blockmap) { 709 for (x = 0; x < local->nb_blocks; x++) { 710 g_hash_table_remove(rdma->blockmap, 711 (void *)(uintptr_t)old[x].offset); 712 } 713 } 714 715 if (local->nb_blocks > 1) { 716 717 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1); 718 719 if (block->index) { 720 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 721 } 722 723 if (block->index < (local->nb_blocks - 1)) { 724 memcpy(local->block + block->index, old + (block->index + 1), 725 sizeof(RDMALocalBlock) * 726 (local->nb_blocks - (block->index + 1))); 727 for (x = block->index; x < local->nb_blocks - 1; x++) { 728 local->block[x].index--; 729 } 730 } 731 } else { 732 assert(block == local->block); 733 local->block = NULL; 734 } 735 736 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr, 737 block->offset, block->length, 738 (uintptr_t)(block->local_host_addr + block->length), 739 BITS_TO_LONGS(block->nb_chunks) * 740 sizeof(unsigned long) * 8, block->nb_chunks); 741 742 g_free(old); 743 744 local->nb_blocks--; 745 746 if (local->nb_blocks && rdma->blockmap) { 747 for (x = 0; x < local->nb_blocks; x++) { 748 g_hash_table_insert(rdma->blockmap, 749 (void *)(uintptr_t)local->block[x].offset, 750 &local->block[x]); 751 } 752 } 753 754 return 0; 755 } 756 757 /* 758 * Put in the log file which RDMA device was opened and the details 759 * associated with that device. 760 */ 761 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 762 { 763 struct ibv_port_attr port; 764 765 if (ibv_query_port(verbs, 1, &port)) { 766 error_report("Failed to query port information"); 767 return; 768 } 769 770 printf("%s RDMA Device opened: kernel name %s " 771 "uverbs device name %s, " 772 "infiniband_verbs class device path %s, " 773 "infiniband class device path %s, " 774 "transport: (%d) %s\n", 775 who, 776 verbs->device->name, 777 verbs->device->dev_name, 778 verbs->device->dev_path, 779 verbs->device->ibdev_path, 780 port.link_layer, 781 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" : 782 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 783 ? "Ethernet" : "Unknown")); 784 } 785 786 /* 787 * Put in the log file the RDMA gid addressing information, 788 * useful for folks who have trouble understanding the 789 * RDMA device hierarchy in the kernel. 790 */ 791 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 792 { 793 char sgid[33]; 794 char dgid[33]; 795 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 796 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 797 trace_qemu_rdma_dump_gid(who, sgid, dgid); 798 } 799 800 /* 801 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 802 * We will try the next addrinfo struct, and fail if there are 803 * no other valid addresses to bind against. 804 * 805 * If user is listening on '[::]', then we will not have a opened a device 806 * yet and have no way of verifying if the device is RoCE or not. 807 * 808 * In this case, the source VM will throw an error for ALL types of 809 * connections (both IPv4 and IPv6) if the destination machine does not have 810 * a regular infiniband network available for use. 811 * 812 * The only way to guarantee that an error is thrown for broken kernels is 813 * for the management software to choose a *specific* interface at bind time 814 * and validate what time of hardware it is. 815 * 816 * Unfortunately, this puts the user in a fix: 817 * 818 * If the source VM connects with an IPv4 address without knowing that the 819 * destination has bound to '[::]' the migration will unconditionally fail 820 * unless the management software is explicitly listening on the IPv4 821 * address while using a RoCE-based device. 822 * 823 * If the source VM connects with an IPv6 address, then we're OK because we can 824 * throw an error on the source (and similarly on the destination). 825 * 826 * But in mixed environments, this will be broken for a while until it is fixed 827 * inside linux. 828 * 829 * We do provide a *tiny* bit of help in this function: We can list all of the 830 * devices in the system and check to see if all the devices are RoCE or 831 * Infiniband. 832 * 833 * If we detect that we have a *pure* RoCE environment, then we can safely 834 * thrown an error even if the management software has specified '[::]' as the 835 * bind address. 836 * 837 * However, if there is are multiple hetergeneous devices, then we cannot make 838 * this assumption and the user just has to be sure they know what they are 839 * doing. 840 * 841 * Patches are being reviewed on linux-rdma. 842 */ 843 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp) 844 { 845 /* This bug only exists in linux, to our knowledge. */ 846 #ifdef CONFIG_LINUX 847 struct ibv_port_attr port_attr; 848 849 /* 850 * Verbs are only NULL if management has bound to '[::]'. 851 * 852 * Let's iterate through all the devices and see if there any pure IB 853 * devices (non-ethernet). 854 * 855 * If not, then we can safely proceed with the migration. 856 * Otherwise, there are no guarantees until the bug is fixed in linux. 857 */ 858 if (!verbs) { 859 int num_devices, x; 860 struct ibv_device **dev_list = ibv_get_device_list(&num_devices); 861 bool roce_found = false; 862 bool ib_found = false; 863 864 for (x = 0; x < num_devices; x++) { 865 verbs = ibv_open_device(dev_list[x]); 866 if (!verbs) { 867 if (errno == EPERM) { 868 continue; 869 } else { 870 return -EINVAL; 871 } 872 } 873 874 if (ibv_query_port(verbs, 1, &port_attr)) { 875 ibv_close_device(verbs); 876 ERROR(errp, "Could not query initial IB port"); 877 return -EINVAL; 878 } 879 880 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 881 ib_found = true; 882 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 883 roce_found = true; 884 } 885 886 ibv_close_device(verbs); 887 888 } 889 890 if (roce_found) { 891 if (ib_found) { 892 fprintf(stderr, "WARN: migrations may fail:" 893 " IPv6 over RoCE / iWARP in linux" 894 " is broken. But since you appear to have a" 895 " mixed RoCE / IB environment, be sure to only" 896 " migrate over the IB fabric until the kernel " 897 " fixes the bug.\n"); 898 } else { 899 ERROR(errp, "You only have RoCE / iWARP devices in your systems" 900 " and your management software has specified '[::]'" 901 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 902 return -ENONET; 903 } 904 } 905 906 return 0; 907 } 908 909 /* 910 * If we have a verbs context, that means that some other than '[::]' was 911 * used by the management software for binding. In which case we can 912 * actually warn the user about a potentially broken kernel. 913 */ 914 915 /* IB ports start with 1, not 0 */ 916 if (ibv_query_port(verbs, 1, &port_attr)) { 917 ERROR(errp, "Could not query initial IB port"); 918 return -EINVAL; 919 } 920 921 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 922 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 " 923 "(but patches on linux-rdma in progress)"); 924 return -ENONET; 925 } 926 927 #endif 928 929 return 0; 930 } 931 932 /* 933 * Figure out which RDMA device corresponds to the requested IP hostname 934 * Also create the initial connection manager identifiers for opening 935 * the connection. 936 */ 937 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 938 { 939 int ret; 940 struct rdma_addrinfo *res; 941 char port_str[16]; 942 struct rdma_cm_event *cm_event; 943 char ip[40] = "unknown"; 944 struct rdma_addrinfo *e; 945 946 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 947 ERROR(errp, "RDMA hostname has not been set"); 948 return -EINVAL; 949 } 950 951 /* create CM channel */ 952 rdma->channel = rdma_create_event_channel(); 953 if (!rdma->channel) { 954 ERROR(errp, "could not create CM channel"); 955 return -EINVAL; 956 } 957 958 /* create CM id */ 959 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 960 if (ret) { 961 ERROR(errp, "could not create channel id"); 962 goto err_resolve_create_id; 963 } 964 965 snprintf(port_str, 16, "%d", rdma->port); 966 port_str[15] = '\0'; 967 968 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 969 if (ret < 0) { 970 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 971 goto err_resolve_get_addr; 972 } 973 974 for (e = res; e != NULL; e = e->ai_next) { 975 inet_ntop(e->ai_family, 976 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 977 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 978 979 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 980 RDMA_RESOLVE_TIMEOUT_MS); 981 if (!ret) { 982 if (e->ai_family == AF_INET6) { 983 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp); 984 if (ret) { 985 continue; 986 } 987 } 988 goto route; 989 } 990 } 991 992 rdma_freeaddrinfo(res); 993 ERROR(errp, "could not resolve address %s", rdma->host); 994 goto err_resolve_get_addr; 995 996 route: 997 rdma_freeaddrinfo(res); 998 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 999 1000 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1001 if (ret) { 1002 ERROR(errp, "could not perform event_addr_resolved"); 1003 goto err_resolve_get_addr; 1004 } 1005 1006 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 1007 ERROR(errp, "result not equal to event_addr_resolved %s", 1008 rdma_event_str(cm_event->event)); 1009 error_report("rdma_resolve_addr"); 1010 rdma_ack_cm_event(cm_event); 1011 ret = -EINVAL; 1012 goto err_resolve_get_addr; 1013 } 1014 rdma_ack_cm_event(cm_event); 1015 1016 /* resolve route */ 1017 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 1018 if (ret) { 1019 ERROR(errp, "could not resolve rdma route"); 1020 goto err_resolve_get_addr; 1021 } 1022 1023 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1024 if (ret) { 1025 ERROR(errp, "could not perform event_route_resolved"); 1026 goto err_resolve_get_addr; 1027 } 1028 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 1029 ERROR(errp, "result not equal to event_route_resolved: %s", 1030 rdma_event_str(cm_event->event)); 1031 rdma_ack_cm_event(cm_event); 1032 ret = -EINVAL; 1033 goto err_resolve_get_addr; 1034 } 1035 rdma_ack_cm_event(cm_event); 1036 rdma->verbs = rdma->cm_id->verbs; 1037 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 1038 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 1039 return 0; 1040 1041 err_resolve_get_addr: 1042 rdma_destroy_id(rdma->cm_id); 1043 rdma->cm_id = NULL; 1044 err_resolve_create_id: 1045 rdma_destroy_event_channel(rdma->channel); 1046 rdma->channel = NULL; 1047 return ret; 1048 } 1049 1050 /* 1051 * Create protection domain and completion queues 1052 */ 1053 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma) 1054 { 1055 /* allocate pd */ 1056 rdma->pd = ibv_alloc_pd(rdma->verbs); 1057 if (!rdma->pd) { 1058 error_report("failed to allocate protection domain"); 1059 return -1; 1060 } 1061 1062 /* create completion channel */ 1063 rdma->comp_channel = ibv_create_comp_channel(rdma->verbs); 1064 if (!rdma->comp_channel) { 1065 error_report("failed to allocate completion channel"); 1066 goto err_alloc_pd_cq; 1067 } 1068 1069 /* 1070 * Completion queue can be filled by both read and write work requests, 1071 * so must reflect the sum of both possible queue sizes. 1072 */ 1073 rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1074 NULL, rdma->comp_channel, 0); 1075 if (!rdma->cq) { 1076 error_report("failed to allocate completion queue"); 1077 goto err_alloc_pd_cq; 1078 } 1079 1080 return 0; 1081 1082 err_alloc_pd_cq: 1083 if (rdma->pd) { 1084 ibv_dealloc_pd(rdma->pd); 1085 } 1086 if (rdma->comp_channel) { 1087 ibv_destroy_comp_channel(rdma->comp_channel); 1088 } 1089 rdma->pd = NULL; 1090 rdma->comp_channel = NULL; 1091 return -1; 1092 1093 } 1094 1095 /* 1096 * Create queue pairs. 1097 */ 1098 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1099 { 1100 struct ibv_qp_init_attr attr = { 0 }; 1101 int ret; 1102 1103 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1104 attr.cap.max_recv_wr = 3; 1105 attr.cap.max_send_sge = 1; 1106 attr.cap.max_recv_sge = 1; 1107 attr.send_cq = rdma->cq; 1108 attr.recv_cq = rdma->cq; 1109 attr.qp_type = IBV_QPT_RC; 1110 1111 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr); 1112 if (ret) { 1113 return -1; 1114 } 1115 1116 rdma->qp = rdma->cm_id->qp; 1117 return 0; 1118 } 1119 1120 /* Check whether On-Demand Paging is supported by RDAM device */ 1121 static bool rdma_support_odp(struct ibv_context *dev) 1122 { 1123 struct ibv_device_attr_ex attr = {0}; 1124 int ret = ibv_query_device_ex(dev, NULL, &attr); 1125 if (ret) { 1126 return false; 1127 } 1128 1129 if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) { 1130 return true; 1131 } 1132 1133 return false; 1134 } 1135 1136 /* 1137 * ibv_advise_mr to avoid RNR NAK error as far as possible. 1138 * The responder mr registering with ODP will sent RNR NAK back to 1139 * the requester in the face of the page fault. 1140 */ 1141 static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr, 1142 uint32_t len, uint32_t lkey, 1143 const char *name, bool wr) 1144 { 1145 #ifdef HAVE_IBV_ADVISE_MR 1146 int ret; 1147 int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE : 1148 IBV_ADVISE_MR_ADVICE_PREFETCH; 1149 struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len}; 1150 1151 ret = ibv_advise_mr(pd, advice, 1152 IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1); 1153 /* ignore the error */ 1154 if (ret) { 1155 trace_qemu_rdma_advise_mr(name, len, addr, strerror(errno)); 1156 } else { 1157 trace_qemu_rdma_advise_mr(name, len, addr, "successed"); 1158 } 1159 #endif 1160 } 1161 1162 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma) 1163 { 1164 int i; 1165 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1166 1167 for (i = 0; i < local->nb_blocks; i++) { 1168 int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE; 1169 1170 local->block[i].mr = 1171 ibv_reg_mr(rdma->pd, 1172 local->block[i].local_host_addr, 1173 local->block[i].length, access 1174 ); 1175 1176 if (!local->block[i].mr && 1177 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) { 1178 access |= IBV_ACCESS_ON_DEMAND; 1179 /* register ODP mr */ 1180 local->block[i].mr = 1181 ibv_reg_mr(rdma->pd, 1182 local->block[i].local_host_addr, 1183 local->block[i].length, access); 1184 trace_qemu_rdma_register_odp_mr(local->block[i].block_name); 1185 1186 if (local->block[i].mr) { 1187 qemu_rdma_advise_prefetch_mr(rdma->pd, 1188 (uintptr_t)local->block[i].local_host_addr, 1189 local->block[i].length, 1190 local->block[i].mr->lkey, 1191 local->block[i].block_name, 1192 true); 1193 } 1194 } 1195 1196 if (!local->block[i].mr) { 1197 perror("Failed to register local dest ram block!"); 1198 break; 1199 } 1200 rdma->total_registrations++; 1201 } 1202 1203 if (i >= local->nb_blocks) { 1204 return 0; 1205 } 1206 1207 for (i--; i >= 0; i--) { 1208 ibv_dereg_mr(local->block[i].mr); 1209 local->block[i].mr = NULL; 1210 rdma->total_registrations--; 1211 } 1212 1213 return -1; 1214 1215 } 1216 1217 /* 1218 * Find the ram block that corresponds to the page requested to be 1219 * transmitted by QEMU. 1220 * 1221 * Once the block is found, also identify which 'chunk' within that 1222 * block that the page belongs to. 1223 * 1224 * This search cannot fail or the migration will fail. 1225 */ 1226 static int qemu_rdma_search_ram_block(RDMAContext *rdma, 1227 uintptr_t block_offset, 1228 uint64_t offset, 1229 uint64_t length, 1230 uint64_t *block_index, 1231 uint64_t *chunk_index) 1232 { 1233 uint64_t current_addr = block_offset + offset; 1234 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1235 (void *) block_offset); 1236 assert(block); 1237 assert(current_addr >= block->offset); 1238 assert((current_addr + length) <= (block->offset + block->length)); 1239 1240 *block_index = block->index; 1241 *chunk_index = ram_chunk_index(block->local_host_addr, 1242 block->local_host_addr + (current_addr - block->offset)); 1243 1244 return 0; 1245 } 1246 1247 /* 1248 * Register a chunk with IB. If the chunk was already registered 1249 * previously, then skip. 1250 * 1251 * Also return the keys associated with the registration needed 1252 * to perform the actual RDMA operation. 1253 */ 1254 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1255 RDMALocalBlock *block, uintptr_t host_addr, 1256 uint32_t *lkey, uint32_t *rkey, int chunk, 1257 uint8_t *chunk_start, uint8_t *chunk_end) 1258 { 1259 if (block->mr) { 1260 if (lkey) { 1261 *lkey = block->mr->lkey; 1262 } 1263 if (rkey) { 1264 *rkey = block->mr->rkey; 1265 } 1266 return 0; 1267 } 1268 1269 /* allocate memory to store chunk MRs */ 1270 if (!block->pmr) { 1271 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks); 1272 } 1273 1274 /* 1275 * If 'rkey', then we're the destination, so grant access to the source. 1276 * 1277 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1278 */ 1279 if (!block->pmr[chunk]) { 1280 uint64_t len = chunk_end - chunk_start; 1281 int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE : 1282 0; 1283 1284 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1285 1286 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access); 1287 if (!block->pmr[chunk] && 1288 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) { 1289 access |= IBV_ACCESS_ON_DEMAND; 1290 /* register ODP mr */ 1291 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access); 1292 trace_qemu_rdma_register_odp_mr(block->block_name); 1293 1294 if (block->pmr[chunk]) { 1295 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start, 1296 len, block->pmr[chunk]->lkey, 1297 block->block_name, rkey); 1298 1299 } 1300 } 1301 } 1302 if (!block->pmr[chunk]) { 1303 perror("Failed to register chunk!"); 1304 fprintf(stderr, "Chunk details: block: %d chunk index %d" 1305 " start %" PRIuPTR " end %" PRIuPTR 1306 " host %" PRIuPTR 1307 " local %" PRIuPTR " registrations: %d\n", 1308 block->index, chunk, (uintptr_t)chunk_start, 1309 (uintptr_t)chunk_end, host_addr, 1310 (uintptr_t)block->local_host_addr, 1311 rdma->total_registrations); 1312 return -1; 1313 } 1314 rdma->total_registrations++; 1315 1316 if (lkey) { 1317 *lkey = block->pmr[chunk]->lkey; 1318 } 1319 if (rkey) { 1320 *rkey = block->pmr[chunk]->rkey; 1321 } 1322 return 0; 1323 } 1324 1325 /* 1326 * Register (at connection time) the memory used for control 1327 * channel messages. 1328 */ 1329 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1330 { 1331 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1332 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1333 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1334 if (rdma->wr_data[idx].control_mr) { 1335 rdma->total_registrations++; 1336 return 0; 1337 } 1338 error_report("qemu_rdma_reg_control failed"); 1339 return -1; 1340 } 1341 1342 const char *print_wrid(int wrid) 1343 { 1344 if (wrid >= RDMA_WRID_RECV_CONTROL) { 1345 return wrid_desc[RDMA_WRID_RECV_CONTROL]; 1346 } 1347 return wrid_desc[wrid]; 1348 } 1349 1350 /* 1351 * RDMA requires memory registration (mlock/pinning), but this is not good for 1352 * overcommitment. 1353 * 1354 * In preparation for the future where LRU information or workload-specific 1355 * writable writable working set memory access behavior is available to QEMU 1356 * it would be nice to have in place the ability to UN-register/UN-pin 1357 * particular memory regions from the RDMA hardware when it is determine that 1358 * those regions of memory will likely not be accessed again in the near future. 1359 * 1360 * While we do not yet have such information right now, the following 1361 * compile-time option allows us to perform a non-optimized version of this 1362 * behavior. 1363 * 1364 * By uncommenting this option, you will cause *all* RDMA transfers to be 1365 * unregistered immediately after the transfer completes on both sides of the 1366 * connection. This has no effect in 'rdma-pin-all' mode, only regular mode. 1367 * 1368 * This will have a terrible impact on migration performance, so until future 1369 * workload information or LRU information is available, do not attempt to use 1370 * this feature except for basic testing. 1371 */ 1372 /* #define RDMA_UNREGISTRATION_EXAMPLE */ 1373 1374 /* 1375 * Perform a non-optimized memory unregistration after every transfer 1376 * for demonstration purposes, only if pin-all is not requested. 1377 * 1378 * Potential optimizations: 1379 * 1. Start a new thread to run this function continuously 1380 - for bit clearing 1381 - and for receipt of unregister messages 1382 * 2. Use an LRU. 1383 * 3. Use workload hints. 1384 */ 1385 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1386 { 1387 while (rdma->unregistrations[rdma->unregister_current]) { 1388 int ret; 1389 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1390 uint64_t chunk = 1391 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1392 uint64_t index = 1393 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1394 RDMALocalBlock *block = 1395 &(rdma->local_ram_blocks.block[index]); 1396 RDMARegister reg = { .current_index = index }; 1397 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1398 }; 1399 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1400 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1401 .repeat = 1, 1402 }; 1403 1404 trace_qemu_rdma_unregister_waiting_proc(chunk, 1405 rdma->unregister_current); 1406 1407 rdma->unregistrations[rdma->unregister_current] = 0; 1408 rdma->unregister_current++; 1409 1410 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1411 rdma->unregister_current = 0; 1412 } 1413 1414 1415 /* 1416 * Unregistration is speculative (because migration is single-threaded 1417 * and we cannot break the protocol's inifinband message ordering). 1418 * Thus, if the memory is currently being used for transmission, 1419 * then abort the attempt to unregister and try again 1420 * later the next time a completion is received for this memory. 1421 */ 1422 clear_bit(chunk, block->unregister_bitmap); 1423 1424 if (test_bit(chunk, block->transit_bitmap)) { 1425 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1426 continue; 1427 } 1428 1429 trace_qemu_rdma_unregister_waiting_send(chunk); 1430 1431 ret = ibv_dereg_mr(block->pmr[chunk]); 1432 block->pmr[chunk] = NULL; 1433 block->remote_keys[chunk] = 0; 1434 1435 if (ret != 0) { 1436 perror("unregistration chunk failed"); 1437 return -ret; 1438 } 1439 rdma->total_registrations--; 1440 1441 reg.key.chunk = chunk; 1442 register_to_network(rdma, ®); 1443 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1444 &resp, NULL, NULL); 1445 if (ret < 0) { 1446 return ret; 1447 } 1448 1449 trace_qemu_rdma_unregister_waiting_complete(chunk); 1450 } 1451 1452 return 0; 1453 } 1454 1455 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1456 uint64_t chunk) 1457 { 1458 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1459 1460 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1461 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1462 1463 return result; 1464 } 1465 1466 /* 1467 * Set bit for unregistration in the next iteration. 1468 * We cannot transmit right here, but will unpin later. 1469 */ 1470 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index, 1471 uint64_t chunk, uint64_t wr_id) 1472 { 1473 if (rdma->unregistrations[rdma->unregister_next] != 0) { 1474 error_report("rdma migration: queue is full"); 1475 } else { 1476 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1477 1478 if (!test_and_set_bit(chunk, block->unregister_bitmap)) { 1479 trace_qemu_rdma_signal_unregister_append(chunk, 1480 rdma->unregister_next); 1481 1482 rdma->unregistrations[rdma->unregister_next++] = 1483 qemu_rdma_make_wrid(wr_id, index, chunk); 1484 1485 if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) { 1486 rdma->unregister_next = 0; 1487 } 1488 } else { 1489 trace_qemu_rdma_signal_unregister_already(chunk); 1490 } 1491 } 1492 } 1493 1494 /* 1495 * Consult the connection manager to see a work request 1496 * (of any kind) has completed. 1497 * Return the work request ID that completed. 1498 */ 1499 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out, 1500 uint32_t *byte_len) 1501 { 1502 int ret; 1503 struct ibv_wc wc; 1504 uint64_t wr_id; 1505 1506 ret = ibv_poll_cq(rdma->cq, 1, &wc); 1507 1508 if (!ret) { 1509 *wr_id_out = RDMA_WRID_NONE; 1510 return 0; 1511 } 1512 1513 if (ret < 0) { 1514 error_report("ibv_poll_cq return %d", ret); 1515 return ret; 1516 } 1517 1518 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1519 1520 if (wc.status != IBV_WC_SUCCESS) { 1521 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n", 1522 wc.status, ibv_wc_status_str(wc.status)); 1523 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]); 1524 1525 return -1; 1526 } 1527 1528 if (rdma->control_ready_expected && 1529 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1530 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL], 1531 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent); 1532 rdma->control_ready_expected = 0; 1533 } 1534 1535 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1536 uint64_t chunk = 1537 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1538 uint64_t index = 1539 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1540 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1541 1542 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent, 1543 index, chunk, block->local_host_addr, 1544 (void *)(uintptr_t)block->remote_host_addr); 1545 1546 clear_bit(chunk, block->transit_bitmap); 1547 1548 if (rdma->nb_sent > 0) { 1549 rdma->nb_sent--; 1550 } 1551 1552 if (!rdma->pin_all) { 1553 /* 1554 * FYI: If one wanted to signal a specific chunk to be unregistered 1555 * using LRU or workload-specific information, this is the function 1556 * you would call to do so. That chunk would then get asynchronously 1557 * unregistered later. 1558 */ 1559 #ifdef RDMA_UNREGISTRATION_EXAMPLE 1560 qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id); 1561 #endif 1562 } 1563 } else { 1564 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent); 1565 } 1566 1567 *wr_id_out = wc.wr_id; 1568 if (byte_len) { 1569 *byte_len = wc.byte_len; 1570 } 1571 1572 return 0; 1573 } 1574 1575 /* Wait for activity on the completion channel. 1576 * Returns 0 on success, none-0 on error. 1577 */ 1578 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma) 1579 { 1580 struct rdma_cm_event *cm_event; 1581 int ret = -1; 1582 1583 /* 1584 * Coroutine doesn't start until migration_fd_process_incoming() 1585 * so don't yield unless we know we're running inside of a coroutine. 1586 */ 1587 if (rdma->migration_started_on_destination && 1588 migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) { 1589 yield_until_fd_readable(rdma->comp_channel->fd); 1590 } else { 1591 /* This is the source side, we're in a separate thread 1592 * or destination prior to migration_fd_process_incoming() 1593 * after postcopy, the destination also in a separate thread. 1594 * we can't yield; so we have to poll the fd. 1595 * But we need to be able to handle 'cancel' or an error 1596 * without hanging forever. 1597 */ 1598 while (!rdma->error_state && !rdma->received_error) { 1599 GPollFD pfds[2]; 1600 pfds[0].fd = rdma->comp_channel->fd; 1601 pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1602 pfds[0].revents = 0; 1603 1604 pfds[1].fd = rdma->channel->fd; 1605 pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1606 pfds[1].revents = 0; 1607 1608 /* 0.1s timeout, should be fine for a 'cancel' */ 1609 switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) { 1610 case 2: 1611 case 1: /* fd active */ 1612 if (pfds[0].revents) { 1613 return 0; 1614 } 1615 1616 if (pfds[1].revents) { 1617 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1618 if (ret) { 1619 error_report("failed to get cm event while wait " 1620 "completion channel"); 1621 return -EPIPE; 1622 } 1623 1624 error_report("receive cm event while wait comp channel," 1625 "cm event is %d", cm_event->event); 1626 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 1627 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 1628 rdma_ack_cm_event(cm_event); 1629 return -EPIPE; 1630 } 1631 rdma_ack_cm_event(cm_event); 1632 } 1633 break; 1634 1635 case 0: /* Timeout, go around again */ 1636 break; 1637 1638 default: /* Error of some type - 1639 * I don't trust errno from qemu_poll_ns 1640 */ 1641 error_report("%s: poll failed", __func__); 1642 return -EPIPE; 1643 } 1644 1645 if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) { 1646 /* Bail out and let the cancellation happen */ 1647 return -EPIPE; 1648 } 1649 } 1650 } 1651 1652 if (rdma->received_error) { 1653 return -EPIPE; 1654 } 1655 return rdma->error_state; 1656 } 1657 1658 /* 1659 * Block until the next work request has completed. 1660 * 1661 * First poll to see if a work request has already completed, 1662 * otherwise block. 1663 * 1664 * If we encounter completed work requests for IDs other than 1665 * the one we're interested in, then that's generally an error. 1666 * 1667 * The only exception is actual RDMA Write completions. These 1668 * completions only need to be recorded, but do not actually 1669 * need further processing. 1670 */ 1671 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested, 1672 uint32_t *byte_len) 1673 { 1674 int num_cq_events = 0, ret = 0; 1675 struct ibv_cq *cq; 1676 void *cq_ctx; 1677 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1678 1679 if (ibv_req_notify_cq(rdma->cq, 0)) { 1680 return -1; 1681 } 1682 /* poll cq first */ 1683 while (wr_id != wrid_requested) { 1684 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1685 if (ret < 0) { 1686 return ret; 1687 } 1688 1689 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1690 1691 if (wr_id == RDMA_WRID_NONE) { 1692 break; 1693 } 1694 if (wr_id != wrid_requested) { 1695 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1696 wrid_requested, print_wrid(wr_id), wr_id); 1697 } 1698 } 1699 1700 if (wr_id == wrid_requested) { 1701 return 0; 1702 } 1703 1704 while (1) { 1705 ret = qemu_rdma_wait_comp_channel(rdma); 1706 if (ret) { 1707 goto err_block_for_wrid; 1708 } 1709 1710 ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx); 1711 if (ret) { 1712 perror("ibv_get_cq_event"); 1713 goto err_block_for_wrid; 1714 } 1715 1716 num_cq_events++; 1717 1718 ret = -ibv_req_notify_cq(cq, 0); 1719 if (ret) { 1720 goto err_block_for_wrid; 1721 } 1722 1723 while (wr_id != wrid_requested) { 1724 ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len); 1725 if (ret < 0) { 1726 goto err_block_for_wrid; 1727 } 1728 1729 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1730 1731 if (wr_id == RDMA_WRID_NONE) { 1732 break; 1733 } 1734 if (wr_id != wrid_requested) { 1735 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1736 wrid_requested, print_wrid(wr_id), wr_id); 1737 } 1738 } 1739 1740 if (wr_id == wrid_requested) { 1741 goto success_block_for_wrid; 1742 } 1743 } 1744 1745 success_block_for_wrid: 1746 if (num_cq_events) { 1747 ibv_ack_cq_events(cq, num_cq_events); 1748 } 1749 return 0; 1750 1751 err_block_for_wrid: 1752 if (num_cq_events) { 1753 ibv_ack_cq_events(cq, num_cq_events); 1754 } 1755 1756 rdma->error_state = ret; 1757 return ret; 1758 } 1759 1760 /* 1761 * Post a SEND message work request for the control channel 1762 * containing some data and block until the post completes. 1763 */ 1764 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1765 RDMAControlHeader *head) 1766 { 1767 int ret = 0; 1768 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1769 struct ibv_send_wr *bad_wr; 1770 struct ibv_sge sge = { 1771 .addr = (uintptr_t)(wr->control), 1772 .length = head->len + sizeof(RDMAControlHeader), 1773 .lkey = wr->control_mr->lkey, 1774 }; 1775 struct ibv_send_wr send_wr = { 1776 .wr_id = RDMA_WRID_SEND_CONTROL, 1777 .opcode = IBV_WR_SEND, 1778 .send_flags = IBV_SEND_SIGNALED, 1779 .sg_list = &sge, 1780 .num_sge = 1, 1781 }; 1782 1783 trace_qemu_rdma_post_send_control(control_desc(head->type)); 1784 1785 /* 1786 * We don't actually need to do a memcpy() in here if we used 1787 * the "sge" properly, but since we're only sending control messages 1788 * (not RAM in a performance-critical path), then its OK for now. 1789 * 1790 * The copy makes the RDMAControlHeader simpler to manipulate 1791 * for the time being. 1792 */ 1793 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1794 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1795 control_to_network((void *) wr->control); 1796 1797 if (buf) { 1798 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1799 } 1800 1801 1802 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1803 1804 if (ret > 0) { 1805 error_report("Failed to use post IB SEND for control"); 1806 return -ret; 1807 } 1808 1809 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1810 if (ret < 0) { 1811 error_report("rdma migration: send polling control error"); 1812 } 1813 1814 return ret; 1815 } 1816 1817 /* 1818 * Post a RECV work request in anticipation of some future receipt 1819 * of data on the control channel. 1820 */ 1821 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx) 1822 { 1823 struct ibv_recv_wr *bad_wr; 1824 struct ibv_sge sge = { 1825 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1826 .length = RDMA_CONTROL_MAX_BUFFER, 1827 .lkey = rdma->wr_data[idx].control_mr->lkey, 1828 }; 1829 1830 struct ibv_recv_wr recv_wr = { 1831 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1832 .sg_list = &sge, 1833 .num_sge = 1, 1834 }; 1835 1836 1837 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1838 return -1; 1839 } 1840 1841 return 0; 1842 } 1843 1844 /* 1845 * Block and wait for a RECV control channel message to arrive. 1846 */ 1847 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1848 RDMAControlHeader *head, int expecting, int idx) 1849 { 1850 uint32_t byte_len; 1851 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1852 &byte_len); 1853 1854 if (ret < 0) { 1855 error_report("rdma migration: recv polling control error!"); 1856 return ret; 1857 } 1858 1859 network_to_control((void *) rdma->wr_data[idx].control); 1860 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1861 1862 trace_qemu_rdma_exchange_get_response_start(control_desc(expecting)); 1863 1864 if (expecting == RDMA_CONTROL_NONE) { 1865 trace_qemu_rdma_exchange_get_response_none(control_desc(head->type), 1866 head->type); 1867 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1868 error_report("Was expecting a %s (%d) control message" 1869 ", but got: %s (%d), length: %d", 1870 control_desc(expecting), expecting, 1871 control_desc(head->type), head->type, head->len); 1872 if (head->type == RDMA_CONTROL_ERROR) { 1873 rdma->received_error = true; 1874 } 1875 return -EIO; 1876 } 1877 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1878 error_report("too long length: %d", head->len); 1879 return -EINVAL; 1880 } 1881 if (sizeof(*head) + head->len != byte_len) { 1882 error_report("Malformed length: %d byte_len %d", head->len, byte_len); 1883 return -EINVAL; 1884 } 1885 1886 return 0; 1887 } 1888 1889 /* 1890 * When a RECV work request has completed, the work request's 1891 * buffer is pointed at the header. 1892 * 1893 * This will advance the pointer to the data portion 1894 * of the control message of the work request's buffer that 1895 * was populated after the work request finished. 1896 */ 1897 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1898 RDMAControlHeader *head) 1899 { 1900 rdma->wr_data[idx].control_len = head->len; 1901 rdma->wr_data[idx].control_curr = 1902 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1903 } 1904 1905 /* 1906 * This is an 'atomic' high-level operation to deliver a single, unified 1907 * control-channel message. 1908 * 1909 * Additionally, if the user is expecting some kind of reply to this message, 1910 * they can request a 'resp' response message be filled in by posting an 1911 * additional work request on behalf of the user and waiting for an additional 1912 * completion. 1913 * 1914 * The extra (optional) response is used during registration to us from having 1915 * to perform an *additional* exchange of message just to provide a response by 1916 * instead piggy-backing on the acknowledgement. 1917 */ 1918 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1919 uint8_t *data, RDMAControlHeader *resp, 1920 int *resp_idx, 1921 int (*callback)(RDMAContext *rdma)) 1922 { 1923 int ret = 0; 1924 1925 /* 1926 * Wait until the dest is ready before attempting to deliver the message 1927 * by waiting for a READY message. 1928 */ 1929 if (rdma->control_ready_expected) { 1930 RDMAControlHeader resp; 1931 ret = qemu_rdma_exchange_get_response(rdma, 1932 &resp, RDMA_CONTROL_READY, RDMA_WRID_READY); 1933 if (ret < 0) { 1934 return ret; 1935 } 1936 } 1937 1938 /* 1939 * If the user is expecting a response, post a WR in anticipation of it. 1940 */ 1941 if (resp) { 1942 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA); 1943 if (ret) { 1944 error_report("rdma migration: error posting" 1945 " extra control recv for anticipated result!"); 1946 return ret; 1947 } 1948 } 1949 1950 /* 1951 * Post a WR to replace the one we just consumed for the READY message. 1952 */ 1953 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1954 if (ret) { 1955 error_report("rdma migration: error posting first control recv!"); 1956 return ret; 1957 } 1958 1959 /* 1960 * Deliver the control message that was requested. 1961 */ 1962 ret = qemu_rdma_post_send_control(rdma, data, head); 1963 1964 if (ret < 0) { 1965 error_report("Failed to send control buffer!"); 1966 return ret; 1967 } 1968 1969 /* 1970 * If we're expecting a response, block and wait for it. 1971 */ 1972 if (resp) { 1973 if (callback) { 1974 trace_qemu_rdma_exchange_send_issue_callback(); 1975 ret = callback(rdma); 1976 if (ret < 0) { 1977 return ret; 1978 } 1979 } 1980 1981 trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type)); 1982 ret = qemu_rdma_exchange_get_response(rdma, resp, 1983 resp->type, RDMA_WRID_DATA); 1984 1985 if (ret < 0) { 1986 return ret; 1987 } 1988 1989 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1990 if (resp_idx) { 1991 *resp_idx = RDMA_WRID_DATA; 1992 } 1993 trace_qemu_rdma_exchange_send_received(control_desc(resp->type)); 1994 } 1995 1996 rdma->control_ready_expected = 1; 1997 1998 return 0; 1999 } 2000 2001 /* 2002 * This is an 'atomic' high-level operation to receive a single, unified 2003 * control-channel message. 2004 */ 2005 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 2006 int expecting) 2007 { 2008 RDMAControlHeader ready = { 2009 .len = 0, 2010 .type = RDMA_CONTROL_READY, 2011 .repeat = 1, 2012 }; 2013 int ret; 2014 2015 /* 2016 * Inform the source that we're ready to receive a message. 2017 */ 2018 ret = qemu_rdma_post_send_control(rdma, NULL, &ready); 2019 2020 if (ret < 0) { 2021 error_report("Failed to send control buffer!"); 2022 return ret; 2023 } 2024 2025 /* 2026 * Block and wait for the message. 2027 */ 2028 ret = qemu_rdma_exchange_get_response(rdma, head, 2029 expecting, RDMA_WRID_READY); 2030 2031 if (ret < 0) { 2032 return ret; 2033 } 2034 2035 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 2036 2037 /* 2038 * Post a new RECV work request to replace the one we just consumed. 2039 */ 2040 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2041 if (ret) { 2042 error_report("rdma migration: error posting second control recv!"); 2043 return ret; 2044 } 2045 2046 return 0; 2047 } 2048 2049 /* 2050 * Write an actual chunk of memory using RDMA. 2051 * 2052 * If we're using dynamic registration on the dest-side, we have to 2053 * send a registration command first. 2054 */ 2055 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma, 2056 int current_index, uint64_t current_addr, 2057 uint64_t length) 2058 { 2059 struct ibv_sge sge; 2060 struct ibv_send_wr send_wr = { 0 }; 2061 struct ibv_send_wr *bad_wr; 2062 int reg_result_idx, ret, count = 0; 2063 uint64_t chunk, chunks; 2064 uint8_t *chunk_start, *chunk_end; 2065 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 2066 RDMARegister reg; 2067 RDMARegisterResult *reg_result; 2068 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 2069 RDMAControlHeader head = { .len = sizeof(RDMARegister), 2070 .type = RDMA_CONTROL_REGISTER_REQUEST, 2071 .repeat = 1, 2072 }; 2073 2074 retry: 2075 sge.addr = (uintptr_t)(block->local_host_addr + 2076 (current_addr - block->offset)); 2077 sge.length = length; 2078 2079 chunk = ram_chunk_index(block->local_host_addr, 2080 (uint8_t *)(uintptr_t)sge.addr); 2081 chunk_start = ram_chunk_start(block, chunk); 2082 2083 if (block->is_ram_block) { 2084 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 2085 2086 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 2087 chunks--; 2088 } 2089 } else { 2090 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 2091 2092 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 2093 chunks--; 2094 } 2095 } 2096 2097 trace_qemu_rdma_write_one_top(chunks + 1, 2098 (chunks + 1) * 2099 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 2100 2101 chunk_end = ram_chunk_end(block, chunk + chunks); 2102 2103 if (!rdma->pin_all) { 2104 #ifdef RDMA_UNREGISTRATION_EXAMPLE 2105 qemu_rdma_unregister_waiting(rdma); 2106 #endif 2107 } 2108 2109 while (test_bit(chunk, block->transit_bitmap)) { 2110 (void)count; 2111 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 2112 sge.addr, length, rdma->nb_sent, block->nb_chunks); 2113 2114 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2115 2116 if (ret < 0) { 2117 error_report("Failed to Wait for previous write to complete " 2118 "block %d chunk %" PRIu64 2119 " current %" PRIu64 " len %" PRIu64 " %d", 2120 current_index, chunk, sge.addr, length, rdma->nb_sent); 2121 return ret; 2122 } 2123 } 2124 2125 if (!rdma->pin_all || !block->is_ram_block) { 2126 if (!block->remote_keys[chunk]) { 2127 /* 2128 * This chunk has not yet been registered, so first check to see 2129 * if the entire chunk is zero. If so, tell the other size to 2130 * memset() + madvise() the entire chunk without RDMA. 2131 */ 2132 2133 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) { 2134 RDMACompress comp = { 2135 .offset = current_addr, 2136 .value = 0, 2137 .block_idx = current_index, 2138 .length = length, 2139 }; 2140 2141 head.len = sizeof(comp); 2142 head.type = RDMA_CONTROL_COMPRESS; 2143 2144 trace_qemu_rdma_write_one_zero(chunk, sge.length, 2145 current_index, current_addr); 2146 2147 compress_to_network(rdma, &comp); 2148 ret = qemu_rdma_exchange_send(rdma, &head, 2149 (uint8_t *) &comp, NULL, NULL, NULL); 2150 2151 if (ret < 0) { 2152 return -EIO; 2153 } 2154 2155 acct_update_position(f, sge.length, true); 2156 2157 return 1; 2158 } 2159 2160 /* 2161 * Otherwise, tell other side to register. 2162 */ 2163 reg.current_index = current_index; 2164 if (block->is_ram_block) { 2165 reg.key.current_addr = current_addr; 2166 } else { 2167 reg.key.chunk = chunk; 2168 } 2169 reg.chunks = chunks; 2170 2171 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 2172 current_addr); 2173 2174 register_to_network(rdma, ®); 2175 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 2176 &resp, ®_result_idx, NULL); 2177 if (ret < 0) { 2178 return ret; 2179 } 2180 2181 /* try to overlap this single registration with the one we sent. */ 2182 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2183 &sge.lkey, NULL, chunk, 2184 chunk_start, chunk_end)) { 2185 error_report("cannot get lkey"); 2186 return -EINVAL; 2187 } 2188 2189 reg_result = (RDMARegisterResult *) 2190 rdma->wr_data[reg_result_idx].control_curr; 2191 2192 network_to_result(reg_result); 2193 2194 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 2195 reg_result->rkey, chunk); 2196 2197 block->remote_keys[chunk] = reg_result->rkey; 2198 block->remote_host_addr = reg_result->host_addr; 2199 } else { 2200 /* already registered before */ 2201 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2202 &sge.lkey, NULL, chunk, 2203 chunk_start, chunk_end)) { 2204 error_report("cannot get lkey!"); 2205 return -EINVAL; 2206 } 2207 } 2208 2209 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 2210 } else { 2211 send_wr.wr.rdma.rkey = block->remote_rkey; 2212 2213 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2214 &sge.lkey, NULL, chunk, 2215 chunk_start, chunk_end)) { 2216 error_report("cannot get lkey!"); 2217 return -EINVAL; 2218 } 2219 } 2220 2221 /* 2222 * Encode the ram block index and chunk within this wrid. 2223 * We will use this information at the time of completion 2224 * to figure out which bitmap to check against and then which 2225 * chunk in the bitmap to look for. 2226 */ 2227 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 2228 current_index, chunk); 2229 2230 send_wr.opcode = IBV_WR_RDMA_WRITE; 2231 send_wr.send_flags = IBV_SEND_SIGNALED; 2232 send_wr.sg_list = &sge; 2233 send_wr.num_sge = 1; 2234 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2235 (current_addr - block->offset); 2236 2237 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2238 sge.length); 2239 2240 /* 2241 * ibv_post_send() does not return negative error numbers, 2242 * per the specification they are positive - no idea why. 2243 */ 2244 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2245 2246 if (ret == ENOMEM) { 2247 trace_qemu_rdma_write_one_queue_full(); 2248 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2249 if (ret < 0) { 2250 error_report("rdma migration: failed to make " 2251 "room in full send queue! %d", ret); 2252 return ret; 2253 } 2254 2255 goto retry; 2256 2257 } else if (ret > 0) { 2258 perror("rdma migration: post rdma write failed"); 2259 return -ret; 2260 } 2261 2262 set_bit(chunk, block->transit_bitmap); 2263 acct_update_position(f, sge.length, false); 2264 rdma->total_writes++; 2265 2266 return 0; 2267 } 2268 2269 /* 2270 * Push out any unwritten RDMA operations. 2271 * 2272 * We support sending out multiple chunks at the same time. 2273 * Not all of them need to get signaled in the completion queue. 2274 */ 2275 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma) 2276 { 2277 int ret; 2278 2279 if (!rdma->current_length) { 2280 return 0; 2281 } 2282 2283 ret = qemu_rdma_write_one(f, rdma, 2284 rdma->current_index, rdma->current_addr, rdma->current_length); 2285 2286 if (ret < 0) { 2287 return ret; 2288 } 2289 2290 if (ret == 0) { 2291 rdma->nb_sent++; 2292 trace_qemu_rdma_write_flush(rdma->nb_sent); 2293 } 2294 2295 rdma->current_length = 0; 2296 rdma->current_addr = 0; 2297 2298 return 0; 2299 } 2300 2301 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma, 2302 uint64_t offset, uint64_t len) 2303 { 2304 RDMALocalBlock *block; 2305 uint8_t *host_addr; 2306 uint8_t *chunk_end; 2307 2308 if (rdma->current_index < 0) { 2309 return 0; 2310 } 2311 2312 if (rdma->current_chunk < 0) { 2313 return 0; 2314 } 2315 2316 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2317 host_addr = block->local_host_addr + (offset - block->offset); 2318 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2319 2320 if (rdma->current_length == 0) { 2321 return 0; 2322 } 2323 2324 /* 2325 * Only merge into chunk sequentially. 2326 */ 2327 if (offset != (rdma->current_addr + rdma->current_length)) { 2328 return 0; 2329 } 2330 2331 if (offset < block->offset) { 2332 return 0; 2333 } 2334 2335 if ((offset + len) > (block->offset + block->length)) { 2336 return 0; 2337 } 2338 2339 if ((host_addr + len) > chunk_end) { 2340 return 0; 2341 } 2342 2343 return 1; 2344 } 2345 2346 /* 2347 * We're not actually writing here, but doing three things: 2348 * 2349 * 1. Identify the chunk the buffer belongs to. 2350 * 2. If the chunk is full or the buffer doesn't belong to the current 2351 * chunk, then start a new chunk and flush() the old chunk. 2352 * 3. To keep the hardware busy, we also group chunks into batches 2353 * and only require that a batch gets acknowledged in the completion 2354 * queue instead of each individual chunk. 2355 */ 2356 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma, 2357 uint64_t block_offset, uint64_t offset, 2358 uint64_t len) 2359 { 2360 uint64_t current_addr = block_offset + offset; 2361 uint64_t index = rdma->current_index; 2362 uint64_t chunk = rdma->current_chunk; 2363 int ret; 2364 2365 /* If we cannot merge it, we flush the current buffer first. */ 2366 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) { 2367 ret = qemu_rdma_write_flush(f, rdma); 2368 if (ret) { 2369 return ret; 2370 } 2371 rdma->current_length = 0; 2372 rdma->current_addr = current_addr; 2373 2374 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2375 offset, len, &index, &chunk); 2376 if (ret) { 2377 error_report("ram block search failed"); 2378 return ret; 2379 } 2380 rdma->current_index = index; 2381 rdma->current_chunk = chunk; 2382 } 2383 2384 /* merge it */ 2385 rdma->current_length += len; 2386 2387 /* flush it if buffer is too large */ 2388 if (rdma->current_length >= RDMA_MERGE_MAX) { 2389 return qemu_rdma_write_flush(f, rdma); 2390 } 2391 2392 return 0; 2393 } 2394 2395 static void qemu_rdma_cleanup(RDMAContext *rdma) 2396 { 2397 int idx; 2398 2399 if (rdma->cm_id && rdma->connected) { 2400 if ((rdma->error_state || 2401 migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) && 2402 !rdma->received_error) { 2403 RDMAControlHeader head = { .len = 0, 2404 .type = RDMA_CONTROL_ERROR, 2405 .repeat = 1, 2406 }; 2407 error_report("Early error. Sending error."); 2408 qemu_rdma_post_send_control(rdma, NULL, &head); 2409 } 2410 2411 rdma_disconnect(rdma->cm_id); 2412 trace_qemu_rdma_cleanup_disconnect(); 2413 rdma->connected = false; 2414 } 2415 2416 if (rdma->channel) { 2417 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 2418 } 2419 g_free(rdma->dest_blocks); 2420 rdma->dest_blocks = NULL; 2421 2422 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2423 if (rdma->wr_data[idx].control_mr) { 2424 rdma->total_registrations--; 2425 ibv_dereg_mr(rdma->wr_data[idx].control_mr); 2426 } 2427 rdma->wr_data[idx].control_mr = NULL; 2428 } 2429 2430 if (rdma->local_ram_blocks.block) { 2431 while (rdma->local_ram_blocks.nb_blocks) { 2432 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); 2433 } 2434 } 2435 2436 if (rdma->qp) { 2437 rdma_destroy_qp(rdma->cm_id); 2438 rdma->qp = NULL; 2439 } 2440 if (rdma->cq) { 2441 ibv_destroy_cq(rdma->cq); 2442 rdma->cq = NULL; 2443 } 2444 if (rdma->comp_channel) { 2445 ibv_destroy_comp_channel(rdma->comp_channel); 2446 rdma->comp_channel = NULL; 2447 } 2448 if (rdma->pd) { 2449 ibv_dealloc_pd(rdma->pd); 2450 rdma->pd = NULL; 2451 } 2452 if (rdma->cm_id) { 2453 rdma_destroy_id(rdma->cm_id); 2454 rdma->cm_id = NULL; 2455 } 2456 2457 /* the destination side, listen_id and channel is shared */ 2458 if (rdma->listen_id) { 2459 if (!rdma->is_return_path) { 2460 rdma_destroy_id(rdma->listen_id); 2461 } 2462 rdma->listen_id = NULL; 2463 2464 if (rdma->channel) { 2465 if (!rdma->is_return_path) { 2466 rdma_destroy_event_channel(rdma->channel); 2467 } 2468 rdma->channel = NULL; 2469 } 2470 } 2471 2472 if (rdma->channel) { 2473 rdma_destroy_event_channel(rdma->channel); 2474 rdma->channel = NULL; 2475 } 2476 g_free(rdma->host); 2477 g_free(rdma->host_port); 2478 rdma->host = NULL; 2479 rdma->host_port = NULL; 2480 } 2481 2482 2483 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp) 2484 { 2485 int ret, idx; 2486 Error *local_err = NULL, **temp = &local_err; 2487 2488 /* 2489 * Will be validated against destination's actual capabilities 2490 * after the connect() completes. 2491 */ 2492 rdma->pin_all = pin_all; 2493 2494 ret = qemu_rdma_resolve_host(rdma, temp); 2495 if (ret) { 2496 goto err_rdma_source_init; 2497 } 2498 2499 ret = qemu_rdma_alloc_pd_cq(rdma); 2500 if (ret) { 2501 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()" 2502 " limits may be too low. Please check $ ulimit -a # and " 2503 "search for 'ulimit -l' in the output"); 2504 goto err_rdma_source_init; 2505 } 2506 2507 ret = qemu_rdma_alloc_qp(rdma); 2508 if (ret) { 2509 ERROR(temp, "rdma migration: error allocating qp!"); 2510 goto err_rdma_source_init; 2511 } 2512 2513 ret = qemu_rdma_init_ram_blocks(rdma); 2514 if (ret) { 2515 ERROR(temp, "rdma migration: error initializing ram blocks!"); 2516 goto err_rdma_source_init; 2517 } 2518 2519 /* Build the hash that maps from offset to RAMBlock */ 2520 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 2521 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) { 2522 g_hash_table_insert(rdma->blockmap, 2523 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset, 2524 &rdma->local_ram_blocks.block[idx]); 2525 } 2526 2527 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2528 ret = qemu_rdma_reg_control(rdma, idx); 2529 if (ret) { 2530 ERROR(temp, "rdma migration: error registering %d control!", 2531 idx); 2532 goto err_rdma_source_init; 2533 } 2534 } 2535 2536 return 0; 2537 2538 err_rdma_source_init: 2539 error_propagate(errp, local_err); 2540 qemu_rdma_cleanup(rdma); 2541 return -1; 2542 } 2543 2544 static int qemu_get_cm_event_timeout(RDMAContext *rdma, 2545 struct rdma_cm_event **cm_event, 2546 long msec, Error **errp) 2547 { 2548 int ret; 2549 struct pollfd poll_fd = { 2550 .fd = rdma->channel->fd, 2551 .events = POLLIN, 2552 .revents = 0 2553 }; 2554 2555 do { 2556 ret = poll(&poll_fd, 1, msec); 2557 } while (ret < 0 && errno == EINTR); 2558 2559 if (ret == 0) { 2560 ERROR(errp, "poll cm event timeout"); 2561 return -1; 2562 } else if (ret < 0) { 2563 ERROR(errp, "failed to poll cm event, errno=%i", errno); 2564 return -1; 2565 } else if (poll_fd.revents & POLLIN) { 2566 return rdma_get_cm_event(rdma->channel, cm_event); 2567 } else { 2568 ERROR(errp, "no POLLIN event, revent=%x", poll_fd.revents); 2569 return -1; 2570 } 2571 } 2572 2573 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path) 2574 { 2575 RDMACapabilities cap = { 2576 .version = RDMA_CONTROL_VERSION_CURRENT, 2577 .flags = 0, 2578 }; 2579 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2580 .retry_count = 5, 2581 .private_data = &cap, 2582 .private_data_len = sizeof(cap), 2583 }; 2584 struct rdma_cm_event *cm_event; 2585 int ret; 2586 2587 /* 2588 * Only negotiate the capability with destination if the user 2589 * on the source first requested the capability. 2590 */ 2591 if (rdma->pin_all) { 2592 trace_qemu_rdma_connect_pin_all_requested(); 2593 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2594 } 2595 2596 caps_to_network(&cap); 2597 2598 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2599 if (ret) { 2600 ERROR(errp, "posting second control recv"); 2601 goto err_rdma_source_connect; 2602 } 2603 2604 ret = rdma_connect(rdma->cm_id, &conn_param); 2605 if (ret) { 2606 perror("rdma_connect"); 2607 ERROR(errp, "connecting to destination!"); 2608 goto err_rdma_source_connect; 2609 } 2610 2611 if (return_path) { 2612 ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp); 2613 } else { 2614 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2615 } 2616 if (ret) { 2617 perror("rdma_get_cm_event after rdma_connect"); 2618 ERROR(errp, "connecting to destination!"); 2619 goto err_rdma_source_connect; 2620 } 2621 2622 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2623 error_report("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect"); 2624 ERROR(errp, "connecting to destination!"); 2625 rdma_ack_cm_event(cm_event); 2626 goto err_rdma_source_connect; 2627 } 2628 rdma->connected = true; 2629 2630 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2631 network_to_caps(&cap); 2632 2633 /* 2634 * Verify that the *requested* capabilities are supported by the destination 2635 * and disable them otherwise. 2636 */ 2637 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2638 ERROR(errp, "Server cannot support pinning all memory. " 2639 "Will register memory dynamically."); 2640 rdma->pin_all = false; 2641 } 2642 2643 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2644 2645 rdma_ack_cm_event(cm_event); 2646 2647 rdma->control_ready_expected = 1; 2648 rdma->nb_sent = 0; 2649 return 0; 2650 2651 err_rdma_source_connect: 2652 qemu_rdma_cleanup(rdma); 2653 return -1; 2654 } 2655 2656 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2657 { 2658 int ret, idx; 2659 struct rdma_cm_id *listen_id; 2660 char ip[40] = "unknown"; 2661 struct rdma_addrinfo *res, *e; 2662 char port_str[16]; 2663 2664 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2665 rdma->wr_data[idx].control_len = 0; 2666 rdma->wr_data[idx].control_curr = NULL; 2667 } 2668 2669 if (!rdma->host || !rdma->host[0]) { 2670 ERROR(errp, "RDMA host is not set!"); 2671 rdma->error_state = -EINVAL; 2672 return -1; 2673 } 2674 /* create CM channel */ 2675 rdma->channel = rdma_create_event_channel(); 2676 if (!rdma->channel) { 2677 ERROR(errp, "could not create rdma event channel"); 2678 rdma->error_state = -EINVAL; 2679 return -1; 2680 } 2681 2682 /* create CM id */ 2683 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2684 if (ret) { 2685 ERROR(errp, "could not create cm_id!"); 2686 goto err_dest_init_create_listen_id; 2687 } 2688 2689 snprintf(port_str, 16, "%d", rdma->port); 2690 port_str[15] = '\0'; 2691 2692 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2693 if (ret < 0) { 2694 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 2695 goto err_dest_init_bind_addr; 2696 } 2697 2698 for (e = res; e != NULL; e = e->ai_next) { 2699 inet_ntop(e->ai_family, 2700 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2701 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2702 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2703 if (ret) { 2704 continue; 2705 } 2706 if (e->ai_family == AF_INET6) { 2707 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp); 2708 if (ret) { 2709 continue; 2710 } 2711 } 2712 break; 2713 } 2714 2715 rdma_freeaddrinfo(res); 2716 if (!e) { 2717 ERROR(errp, "Error: could not rdma_bind_addr!"); 2718 goto err_dest_init_bind_addr; 2719 } 2720 2721 rdma->listen_id = listen_id; 2722 qemu_rdma_dump_gid("dest_init", listen_id); 2723 return 0; 2724 2725 err_dest_init_bind_addr: 2726 rdma_destroy_id(listen_id); 2727 err_dest_init_create_listen_id: 2728 rdma_destroy_event_channel(rdma->channel); 2729 rdma->channel = NULL; 2730 rdma->error_state = ret; 2731 return ret; 2732 2733 } 2734 2735 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path, 2736 RDMAContext *rdma) 2737 { 2738 int idx; 2739 2740 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2741 rdma_return_path->wr_data[idx].control_len = 0; 2742 rdma_return_path->wr_data[idx].control_curr = NULL; 2743 } 2744 2745 /*the CM channel and CM id is shared*/ 2746 rdma_return_path->channel = rdma->channel; 2747 rdma_return_path->listen_id = rdma->listen_id; 2748 2749 rdma->return_path = rdma_return_path; 2750 rdma_return_path->return_path = rdma; 2751 rdma_return_path->is_return_path = true; 2752 } 2753 2754 static void *qemu_rdma_data_init(const char *host_port, Error **errp) 2755 { 2756 RDMAContext *rdma = NULL; 2757 InetSocketAddress *addr; 2758 2759 if (host_port) { 2760 rdma = g_new0(RDMAContext, 1); 2761 rdma->current_index = -1; 2762 rdma->current_chunk = -1; 2763 2764 addr = g_new(InetSocketAddress, 1); 2765 if (!inet_parse(addr, host_port, NULL)) { 2766 rdma->port = atoi(addr->port); 2767 rdma->host = g_strdup(addr->host); 2768 rdma->host_port = g_strdup(host_port); 2769 } else { 2770 ERROR(errp, "bad RDMA migration address '%s'", host_port); 2771 g_free(rdma); 2772 rdma = NULL; 2773 } 2774 2775 qapi_free_InetSocketAddress(addr); 2776 } 2777 2778 return rdma; 2779 } 2780 2781 /* 2782 * QEMUFile interface to the control channel. 2783 * SEND messages for control only. 2784 * VM's ram is handled with regular RDMA messages. 2785 */ 2786 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, 2787 const struct iovec *iov, 2788 size_t niov, 2789 int *fds, 2790 size_t nfds, 2791 Error **errp) 2792 { 2793 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2794 QEMUFile *f = rioc->file; 2795 RDMAContext *rdma; 2796 int ret; 2797 ssize_t done = 0; 2798 size_t i; 2799 size_t len = 0; 2800 2801 RCU_READ_LOCK_GUARD(); 2802 rdma = qatomic_rcu_read(&rioc->rdmaout); 2803 2804 if (!rdma) { 2805 return -EIO; 2806 } 2807 2808 CHECK_ERROR_STATE(); 2809 2810 /* 2811 * Push out any writes that 2812 * we're queued up for VM's ram. 2813 */ 2814 ret = qemu_rdma_write_flush(f, rdma); 2815 if (ret < 0) { 2816 rdma->error_state = ret; 2817 return ret; 2818 } 2819 2820 for (i = 0; i < niov; i++) { 2821 size_t remaining = iov[i].iov_len; 2822 uint8_t * data = (void *)iov[i].iov_base; 2823 while (remaining) { 2824 RDMAControlHeader head; 2825 2826 len = MIN(remaining, RDMA_SEND_INCREMENT); 2827 remaining -= len; 2828 2829 head.len = len; 2830 head.type = RDMA_CONTROL_QEMU_FILE; 2831 2832 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); 2833 2834 if (ret < 0) { 2835 rdma->error_state = ret; 2836 return ret; 2837 } 2838 2839 data += len; 2840 done += len; 2841 } 2842 } 2843 2844 return done; 2845 } 2846 2847 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2848 size_t size, int idx) 2849 { 2850 size_t len = 0; 2851 2852 if (rdma->wr_data[idx].control_len) { 2853 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2854 2855 len = MIN(size, rdma->wr_data[idx].control_len); 2856 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2857 rdma->wr_data[idx].control_curr += len; 2858 rdma->wr_data[idx].control_len -= len; 2859 } 2860 2861 return len; 2862 } 2863 2864 /* 2865 * QEMUFile interface to the control channel. 2866 * RDMA links don't use bytestreams, so we have to 2867 * return bytes to QEMUFile opportunistically. 2868 */ 2869 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, 2870 const struct iovec *iov, 2871 size_t niov, 2872 int **fds, 2873 size_t *nfds, 2874 Error **errp) 2875 { 2876 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2877 RDMAContext *rdma; 2878 RDMAControlHeader head; 2879 int ret = 0; 2880 ssize_t i; 2881 size_t done = 0; 2882 2883 RCU_READ_LOCK_GUARD(); 2884 rdma = qatomic_rcu_read(&rioc->rdmain); 2885 2886 if (!rdma) { 2887 return -EIO; 2888 } 2889 2890 CHECK_ERROR_STATE(); 2891 2892 for (i = 0; i < niov; i++) { 2893 size_t want = iov[i].iov_len; 2894 uint8_t *data = (void *)iov[i].iov_base; 2895 2896 /* 2897 * First, we hold on to the last SEND message we 2898 * were given and dish out the bytes until we run 2899 * out of bytes. 2900 */ 2901 ret = qemu_rdma_fill(rdma, data, want, 0); 2902 done += ret; 2903 want -= ret; 2904 /* Got what we needed, so go to next iovec */ 2905 if (want == 0) { 2906 continue; 2907 } 2908 2909 /* If we got any data so far, then don't wait 2910 * for more, just return what we have */ 2911 if (done > 0) { 2912 break; 2913 } 2914 2915 2916 /* We've got nothing at all, so lets wait for 2917 * more to arrive 2918 */ 2919 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); 2920 2921 if (ret < 0) { 2922 rdma->error_state = ret; 2923 return ret; 2924 } 2925 2926 /* 2927 * SEND was received with new bytes, now try again. 2928 */ 2929 ret = qemu_rdma_fill(rdma, data, want, 0); 2930 done += ret; 2931 want -= ret; 2932 2933 /* Still didn't get enough, so lets just return */ 2934 if (want) { 2935 if (done == 0) { 2936 return QIO_CHANNEL_ERR_BLOCK; 2937 } else { 2938 break; 2939 } 2940 } 2941 } 2942 return done; 2943 } 2944 2945 /* 2946 * Block until all the outstanding chunks have been delivered by the hardware. 2947 */ 2948 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) 2949 { 2950 int ret; 2951 2952 if (qemu_rdma_write_flush(f, rdma) < 0) { 2953 return -EIO; 2954 } 2955 2956 while (rdma->nb_sent) { 2957 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2958 if (ret < 0) { 2959 error_report("rdma migration: complete polling error!"); 2960 return -EIO; 2961 } 2962 } 2963 2964 qemu_rdma_unregister_waiting(rdma); 2965 2966 return 0; 2967 } 2968 2969 2970 static int qio_channel_rdma_set_blocking(QIOChannel *ioc, 2971 bool blocking, 2972 Error **errp) 2973 { 2974 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2975 /* XXX we should make readv/writev actually honour this :-) */ 2976 rioc->blocking = blocking; 2977 return 0; 2978 } 2979 2980 2981 typedef struct QIOChannelRDMASource QIOChannelRDMASource; 2982 struct QIOChannelRDMASource { 2983 GSource parent; 2984 QIOChannelRDMA *rioc; 2985 GIOCondition condition; 2986 }; 2987 2988 static gboolean 2989 qio_channel_rdma_source_prepare(GSource *source, 2990 gint *timeout) 2991 { 2992 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2993 RDMAContext *rdma; 2994 GIOCondition cond = 0; 2995 *timeout = -1; 2996 2997 RCU_READ_LOCK_GUARD(); 2998 if (rsource->condition == G_IO_IN) { 2999 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 3000 } else { 3001 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 3002 } 3003 3004 if (!rdma) { 3005 error_report("RDMAContext is NULL when prepare Gsource"); 3006 return FALSE; 3007 } 3008 3009 if (rdma->wr_data[0].control_len) { 3010 cond |= G_IO_IN; 3011 } 3012 cond |= G_IO_OUT; 3013 3014 return cond & rsource->condition; 3015 } 3016 3017 static gboolean 3018 qio_channel_rdma_source_check(GSource *source) 3019 { 3020 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 3021 RDMAContext *rdma; 3022 GIOCondition cond = 0; 3023 3024 RCU_READ_LOCK_GUARD(); 3025 if (rsource->condition == G_IO_IN) { 3026 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 3027 } else { 3028 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 3029 } 3030 3031 if (!rdma) { 3032 error_report("RDMAContext is NULL when check Gsource"); 3033 return FALSE; 3034 } 3035 3036 if (rdma->wr_data[0].control_len) { 3037 cond |= G_IO_IN; 3038 } 3039 cond |= G_IO_OUT; 3040 3041 return cond & rsource->condition; 3042 } 3043 3044 static gboolean 3045 qio_channel_rdma_source_dispatch(GSource *source, 3046 GSourceFunc callback, 3047 gpointer user_data) 3048 { 3049 QIOChannelFunc func = (QIOChannelFunc)callback; 3050 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 3051 RDMAContext *rdma; 3052 GIOCondition cond = 0; 3053 3054 RCU_READ_LOCK_GUARD(); 3055 if (rsource->condition == G_IO_IN) { 3056 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 3057 } else { 3058 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 3059 } 3060 3061 if (!rdma) { 3062 error_report("RDMAContext is NULL when dispatch Gsource"); 3063 return FALSE; 3064 } 3065 3066 if (rdma->wr_data[0].control_len) { 3067 cond |= G_IO_IN; 3068 } 3069 cond |= G_IO_OUT; 3070 3071 return (*func)(QIO_CHANNEL(rsource->rioc), 3072 (cond & rsource->condition), 3073 user_data); 3074 } 3075 3076 static void 3077 qio_channel_rdma_source_finalize(GSource *source) 3078 { 3079 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source; 3080 3081 object_unref(OBJECT(ssource->rioc)); 3082 } 3083 3084 GSourceFuncs qio_channel_rdma_source_funcs = { 3085 qio_channel_rdma_source_prepare, 3086 qio_channel_rdma_source_check, 3087 qio_channel_rdma_source_dispatch, 3088 qio_channel_rdma_source_finalize 3089 }; 3090 3091 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, 3092 GIOCondition condition) 3093 { 3094 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3095 QIOChannelRDMASource *ssource; 3096 GSource *source; 3097 3098 source = g_source_new(&qio_channel_rdma_source_funcs, 3099 sizeof(QIOChannelRDMASource)); 3100 ssource = (QIOChannelRDMASource *)source; 3101 3102 ssource->rioc = rioc; 3103 object_ref(OBJECT(rioc)); 3104 3105 ssource->condition = condition; 3106 3107 return source; 3108 } 3109 3110 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, 3111 AioContext *ctx, 3112 IOHandler *io_read, 3113 IOHandler *io_write, 3114 void *opaque) 3115 { 3116 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3117 if (io_read) { 3118 aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd, 3119 false, io_read, io_write, NULL, opaque); 3120 } else { 3121 aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd, 3122 false, io_read, io_write, NULL, opaque); 3123 } 3124 } 3125 3126 struct rdma_close_rcu { 3127 struct rcu_head rcu; 3128 RDMAContext *rdmain; 3129 RDMAContext *rdmaout; 3130 }; 3131 3132 /* callback from qio_channel_rdma_close via call_rcu */ 3133 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu) 3134 { 3135 if (rcu->rdmain) { 3136 qemu_rdma_cleanup(rcu->rdmain); 3137 } 3138 3139 if (rcu->rdmaout) { 3140 qemu_rdma_cleanup(rcu->rdmaout); 3141 } 3142 3143 g_free(rcu->rdmain); 3144 g_free(rcu->rdmaout); 3145 g_free(rcu); 3146 } 3147 3148 static int qio_channel_rdma_close(QIOChannel *ioc, 3149 Error **errp) 3150 { 3151 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3152 RDMAContext *rdmain, *rdmaout; 3153 struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1); 3154 3155 trace_qemu_rdma_close(); 3156 3157 rdmain = rioc->rdmain; 3158 if (rdmain) { 3159 qatomic_rcu_set(&rioc->rdmain, NULL); 3160 } 3161 3162 rdmaout = rioc->rdmaout; 3163 if (rdmaout) { 3164 qatomic_rcu_set(&rioc->rdmaout, NULL); 3165 } 3166 3167 rcu->rdmain = rdmain; 3168 rcu->rdmaout = rdmaout; 3169 call_rcu(rcu, qio_channel_rdma_close_rcu, rcu); 3170 3171 return 0; 3172 } 3173 3174 static int 3175 qio_channel_rdma_shutdown(QIOChannel *ioc, 3176 QIOChannelShutdown how, 3177 Error **errp) 3178 { 3179 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3180 RDMAContext *rdmain, *rdmaout; 3181 3182 RCU_READ_LOCK_GUARD(); 3183 3184 rdmain = qatomic_rcu_read(&rioc->rdmain); 3185 rdmaout = qatomic_rcu_read(&rioc->rdmain); 3186 3187 switch (how) { 3188 case QIO_CHANNEL_SHUTDOWN_READ: 3189 if (rdmain) { 3190 rdmain->error_state = -1; 3191 } 3192 break; 3193 case QIO_CHANNEL_SHUTDOWN_WRITE: 3194 if (rdmaout) { 3195 rdmaout->error_state = -1; 3196 } 3197 break; 3198 case QIO_CHANNEL_SHUTDOWN_BOTH: 3199 default: 3200 if (rdmain) { 3201 rdmain->error_state = -1; 3202 } 3203 if (rdmaout) { 3204 rdmaout->error_state = -1; 3205 } 3206 break; 3207 } 3208 3209 return 0; 3210 } 3211 3212 /* 3213 * Parameters: 3214 * @offset == 0 : 3215 * This means that 'block_offset' is a full virtual address that does not 3216 * belong to a RAMBlock of the virtual machine and instead 3217 * represents a private malloc'd memory area that the caller wishes to 3218 * transfer. 3219 * 3220 * @offset != 0 : 3221 * Offset is an offset to be added to block_offset and used 3222 * to also lookup the corresponding RAMBlock. 3223 * 3224 * @size > 0 : 3225 * Initiate an transfer this size. 3226 * 3227 * @size == 0 : 3228 * A 'hint' or 'advice' that means that we wish to speculatively 3229 * and asynchronously unregister this memory. In this case, there is no 3230 * guarantee that the unregister will actually happen, for example, 3231 * if the memory is being actively transmitted. Additionally, the memory 3232 * may be re-registered at any future time if a write within the same 3233 * chunk was requested again, even if you attempted to unregister it 3234 * here. 3235 * 3236 * @size < 0 : TODO, not yet supported 3237 * Unregister the memory NOW. This means that the caller does not 3238 * expect there to be any future RDMA transfers and we just want to clean 3239 * things up. This is used in case the upper layer owns the memory and 3240 * cannot wait for qemu_fclose() to occur. 3241 * 3242 * @bytes_sent : User-specificed pointer to indicate how many bytes were 3243 * sent. Usually, this will not be more than a few bytes of 3244 * the protocol because most transfers are sent asynchronously. 3245 */ 3246 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, 3247 ram_addr_t block_offset, ram_addr_t offset, 3248 size_t size, uint64_t *bytes_sent) 3249 { 3250 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3251 RDMAContext *rdma; 3252 int ret; 3253 3254 RCU_READ_LOCK_GUARD(); 3255 rdma = qatomic_rcu_read(&rioc->rdmaout); 3256 3257 if (!rdma) { 3258 return -EIO; 3259 } 3260 3261 CHECK_ERROR_STATE(); 3262 3263 if (migration_in_postcopy()) { 3264 return RAM_SAVE_CONTROL_NOT_SUPP; 3265 } 3266 3267 qemu_fflush(f); 3268 3269 if (size > 0) { 3270 /* 3271 * Add this page to the current 'chunk'. If the chunk 3272 * is full, or the page doesn't belong to the current chunk, 3273 * an actual RDMA write will occur and a new chunk will be formed. 3274 */ 3275 ret = qemu_rdma_write(f, rdma, block_offset, offset, size); 3276 if (ret < 0) { 3277 error_report("rdma migration: write error! %d", ret); 3278 goto err; 3279 } 3280 3281 /* 3282 * We always return 1 bytes because the RDMA 3283 * protocol is completely asynchronous. We do not yet know 3284 * whether an identified chunk is zero or not because we're 3285 * waiting for other pages to potentially be merged with 3286 * the current chunk. So, we have to call qemu_update_position() 3287 * later on when the actual write occurs. 3288 */ 3289 if (bytes_sent) { 3290 *bytes_sent = 1; 3291 } 3292 } else { 3293 uint64_t index, chunk; 3294 3295 /* TODO: Change QEMUFileOps prototype to be signed: size_t => long 3296 if (size < 0) { 3297 ret = qemu_rdma_drain_cq(f, rdma); 3298 if (ret < 0) { 3299 fprintf(stderr, "rdma: failed to synchronously drain" 3300 " completion queue before unregistration.\n"); 3301 goto err; 3302 } 3303 } 3304 */ 3305 3306 ret = qemu_rdma_search_ram_block(rdma, block_offset, 3307 offset, size, &index, &chunk); 3308 3309 if (ret) { 3310 error_report("ram block search failed"); 3311 goto err; 3312 } 3313 3314 qemu_rdma_signal_unregister(rdma, index, chunk, 0); 3315 3316 /* 3317 * TODO: Synchronous, guaranteed unregistration (should not occur during 3318 * fast-path). Otherwise, unregisters will process on the next call to 3319 * qemu_rdma_drain_cq() 3320 if (size < 0) { 3321 qemu_rdma_unregister_waiting(rdma); 3322 } 3323 */ 3324 } 3325 3326 /* 3327 * Drain the Completion Queue if possible, but do not block, 3328 * just poll. 3329 * 3330 * If nothing to poll, the end of the iteration will do this 3331 * again to make sure we don't overflow the request queue. 3332 */ 3333 while (1) { 3334 uint64_t wr_id, wr_id_in; 3335 int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL); 3336 if (ret < 0) { 3337 error_report("rdma migration: polling error! %d", ret); 3338 goto err; 3339 } 3340 3341 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 3342 3343 if (wr_id == RDMA_WRID_NONE) { 3344 break; 3345 } 3346 } 3347 3348 return RAM_SAVE_CONTROL_DELAYED; 3349 err: 3350 rdma->error_state = ret; 3351 return ret; 3352 } 3353 3354 static void rdma_accept_incoming_migration(void *opaque); 3355 3356 static void rdma_cm_poll_handler(void *opaque) 3357 { 3358 RDMAContext *rdma = opaque; 3359 int ret; 3360 struct rdma_cm_event *cm_event; 3361 MigrationIncomingState *mis = migration_incoming_get_current(); 3362 3363 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3364 if (ret) { 3365 error_report("get_cm_event failed %d", errno); 3366 return; 3367 } 3368 3369 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 3370 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 3371 if (!rdma->error_state && 3372 migration_incoming_get_current()->state != 3373 MIGRATION_STATUS_COMPLETED) { 3374 error_report("receive cm event, cm event is %d", cm_event->event); 3375 rdma->error_state = -EPIPE; 3376 if (rdma->return_path) { 3377 rdma->return_path->error_state = -EPIPE; 3378 } 3379 } 3380 rdma_ack_cm_event(cm_event); 3381 3382 if (mis->migration_incoming_co) { 3383 qemu_coroutine_enter(mis->migration_incoming_co); 3384 } 3385 return; 3386 } 3387 rdma_ack_cm_event(cm_event); 3388 } 3389 3390 static int qemu_rdma_accept(RDMAContext *rdma) 3391 { 3392 RDMACapabilities cap; 3393 struct rdma_conn_param conn_param = { 3394 .responder_resources = 2, 3395 .private_data = &cap, 3396 .private_data_len = sizeof(cap), 3397 }; 3398 RDMAContext *rdma_return_path = NULL; 3399 struct rdma_cm_event *cm_event; 3400 struct ibv_context *verbs; 3401 int ret = -EINVAL; 3402 int idx; 3403 3404 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3405 if (ret) { 3406 goto err_rdma_dest_wait; 3407 } 3408 3409 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 3410 rdma_ack_cm_event(cm_event); 3411 goto err_rdma_dest_wait; 3412 } 3413 3414 /* 3415 * initialize the RDMAContext for return path for postcopy after first 3416 * connection request reached. 3417 */ 3418 if (migrate_postcopy() && !rdma->is_return_path) { 3419 rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL); 3420 if (rdma_return_path == NULL) { 3421 rdma_ack_cm_event(cm_event); 3422 goto err_rdma_dest_wait; 3423 } 3424 3425 qemu_rdma_return_path_dest_init(rdma_return_path, rdma); 3426 } 3427 3428 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 3429 3430 network_to_caps(&cap); 3431 3432 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 3433 error_report("Unknown source RDMA version: %d, bailing...", 3434 cap.version); 3435 rdma_ack_cm_event(cm_event); 3436 goto err_rdma_dest_wait; 3437 } 3438 3439 /* 3440 * Respond with only the capabilities this version of QEMU knows about. 3441 */ 3442 cap.flags &= known_capabilities; 3443 3444 /* 3445 * Enable the ones that we do know about. 3446 * Add other checks here as new ones are introduced. 3447 */ 3448 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 3449 rdma->pin_all = true; 3450 } 3451 3452 rdma->cm_id = cm_event->id; 3453 verbs = cm_event->id->verbs; 3454 3455 rdma_ack_cm_event(cm_event); 3456 3457 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 3458 3459 caps_to_network(&cap); 3460 3461 trace_qemu_rdma_accept_pin_verbsc(verbs); 3462 3463 if (!rdma->verbs) { 3464 rdma->verbs = verbs; 3465 } else if (rdma->verbs != verbs) { 3466 error_report("ibv context not matching %p, %p!", rdma->verbs, 3467 verbs); 3468 goto err_rdma_dest_wait; 3469 } 3470 3471 qemu_rdma_dump_id("dest_init", verbs); 3472 3473 ret = qemu_rdma_alloc_pd_cq(rdma); 3474 if (ret) { 3475 error_report("rdma migration: error allocating pd and cq!"); 3476 goto err_rdma_dest_wait; 3477 } 3478 3479 ret = qemu_rdma_alloc_qp(rdma); 3480 if (ret) { 3481 error_report("rdma migration: error allocating qp!"); 3482 goto err_rdma_dest_wait; 3483 } 3484 3485 ret = qemu_rdma_init_ram_blocks(rdma); 3486 if (ret) { 3487 error_report("rdma migration: error initializing ram blocks!"); 3488 goto err_rdma_dest_wait; 3489 } 3490 3491 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 3492 ret = qemu_rdma_reg_control(rdma, idx); 3493 if (ret) { 3494 error_report("rdma: error registering %d control", idx); 3495 goto err_rdma_dest_wait; 3496 } 3497 } 3498 3499 /* Accept the second connection request for return path */ 3500 if (migrate_postcopy() && !rdma->is_return_path) { 3501 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3502 NULL, 3503 (void *)(intptr_t)rdma->return_path); 3504 } else { 3505 qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, 3506 NULL, rdma); 3507 } 3508 3509 ret = rdma_accept(rdma->cm_id, &conn_param); 3510 if (ret) { 3511 error_report("rdma_accept returns %d", ret); 3512 goto err_rdma_dest_wait; 3513 } 3514 3515 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3516 if (ret) { 3517 error_report("rdma_accept get_cm_event failed %d", ret); 3518 goto err_rdma_dest_wait; 3519 } 3520 3521 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 3522 error_report("rdma_accept not event established"); 3523 rdma_ack_cm_event(cm_event); 3524 goto err_rdma_dest_wait; 3525 } 3526 3527 rdma_ack_cm_event(cm_event); 3528 rdma->connected = true; 3529 3530 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 3531 if (ret) { 3532 error_report("rdma migration: error posting second control recv"); 3533 goto err_rdma_dest_wait; 3534 } 3535 3536 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 3537 3538 return 0; 3539 3540 err_rdma_dest_wait: 3541 rdma->error_state = ret; 3542 qemu_rdma_cleanup(rdma); 3543 g_free(rdma_return_path); 3544 return ret; 3545 } 3546 3547 static int dest_ram_sort_func(const void *a, const void *b) 3548 { 3549 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index; 3550 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index; 3551 3552 return (a_index < b_index) ? -1 : (a_index != b_index); 3553 } 3554 3555 /* 3556 * During each iteration of the migration, we listen for instructions 3557 * by the source VM to perform dynamic page registrations before they 3558 * can perform RDMA operations. 3559 * 3560 * We respond with the 'rkey'. 3561 * 3562 * Keep doing this until the source tells us to stop. 3563 */ 3564 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) 3565 { 3566 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 3567 .type = RDMA_CONTROL_REGISTER_RESULT, 3568 .repeat = 0, 3569 }; 3570 RDMAControlHeader unreg_resp = { .len = 0, 3571 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 3572 .repeat = 0, 3573 }; 3574 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 3575 .repeat = 1 }; 3576 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3577 RDMAContext *rdma; 3578 RDMALocalBlocks *local; 3579 RDMAControlHeader head; 3580 RDMARegister *reg, *registers; 3581 RDMACompress *comp; 3582 RDMARegisterResult *reg_result; 3583 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 3584 RDMALocalBlock *block; 3585 void *host_addr; 3586 int ret = 0; 3587 int idx = 0; 3588 int count = 0; 3589 int i = 0; 3590 3591 RCU_READ_LOCK_GUARD(); 3592 rdma = qatomic_rcu_read(&rioc->rdmain); 3593 3594 if (!rdma) { 3595 return -EIO; 3596 } 3597 3598 CHECK_ERROR_STATE(); 3599 3600 local = &rdma->local_ram_blocks; 3601 do { 3602 trace_qemu_rdma_registration_handle_wait(); 3603 3604 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE); 3605 3606 if (ret < 0) { 3607 break; 3608 } 3609 3610 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 3611 error_report("rdma: Too many requests in this message (%d)." 3612 "Bailing.", head.repeat); 3613 ret = -EIO; 3614 break; 3615 } 3616 3617 switch (head.type) { 3618 case RDMA_CONTROL_COMPRESS: 3619 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 3620 network_to_compress(comp); 3621 3622 trace_qemu_rdma_registration_handle_compress(comp->length, 3623 comp->block_idx, 3624 comp->offset); 3625 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) { 3626 error_report("rdma: 'compress' bad block index %u (vs %d)", 3627 (unsigned int)comp->block_idx, 3628 rdma->local_ram_blocks.nb_blocks); 3629 ret = -EIO; 3630 goto out; 3631 } 3632 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 3633 3634 host_addr = block->local_host_addr + 3635 (comp->offset - block->offset); 3636 3637 ram_handle_compressed(host_addr, comp->value, comp->length); 3638 break; 3639 3640 case RDMA_CONTROL_REGISTER_FINISHED: 3641 trace_qemu_rdma_registration_handle_finished(); 3642 goto out; 3643 3644 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 3645 trace_qemu_rdma_registration_handle_ram_blocks(); 3646 3647 /* Sort our local RAM Block list so it's the same as the source, 3648 * we can do this since we've filled in a src_index in the list 3649 * as we received the RAMBlock list earlier. 3650 */ 3651 qsort(rdma->local_ram_blocks.block, 3652 rdma->local_ram_blocks.nb_blocks, 3653 sizeof(RDMALocalBlock), dest_ram_sort_func); 3654 for (i = 0; i < local->nb_blocks; i++) { 3655 local->block[i].index = i; 3656 } 3657 3658 if (rdma->pin_all) { 3659 ret = qemu_rdma_reg_whole_ram_blocks(rdma); 3660 if (ret) { 3661 error_report("rdma migration: error dest " 3662 "registering ram blocks"); 3663 goto out; 3664 } 3665 } 3666 3667 /* 3668 * Dest uses this to prepare to transmit the RAMBlock descriptions 3669 * to the source VM after connection setup. 3670 * Both sides use the "remote" structure to communicate and update 3671 * their "local" descriptions with what was sent. 3672 */ 3673 for (i = 0; i < local->nb_blocks; i++) { 3674 rdma->dest_blocks[i].remote_host_addr = 3675 (uintptr_t)(local->block[i].local_host_addr); 3676 3677 if (rdma->pin_all) { 3678 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 3679 } 3680 3681 rdma->dest_blocks[i].offset = local->block[i].offset; 3682 rdma->dest_blocks[i].length = local->block[i].length; 3683 3684 dest_block_to_network(&rdma->dest_blocks[i]); 3685 trace_qemu_rdma_registration_handle_ram_blocks_loop( 3686 local->block[i].block_name, 3687 local->block[i].offset, 3688 local->block[i].length, 3689 local->block[i].local_host_addr, 3690 local->block[i].src_index); 3691 } 3692 3693 blocks.len = rdma->local_ram_blocks.nb_blocks 3694 * sizeof(RDMADestBlock); 3695 3696 3697 ret = qemu_rdma_post_send_control(rdma, 3698 (uint8_t *) rdma->dest_blocks, &blocks); 3699 3700 if (ret < 0) { 3701 error_report("rdma migration: error sending remote info"); 3702 goto out; 3703 } 3704 3705 break; 3706 case RDMA_CONTROL_REGISTER_REQUEST: 3707 trace_qemu_rdma_registration_handle_register(head.repeat); 3708 3709 reg_resp.repeat = head.repeat; 3710 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3711 3712 for (count = 0; count < head.repeat; count++) { 3713 uint64_t chunk; 3714 uint8_t *chunk_start, *chunk_end; 3715 3716 reg = ®isters[count]; 3717 network_to_register(reg); 3718 3719 reg_result = &results[count]; 3720 3721 trace_qemu_rdma_registration_handle_register_loop(count, 3722 reg->current_index, reg->key.current_addr, reg->chunks); 3723 3724 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) { 3725 error_report("rdma: 'register' bad block index %u (vs %d)", 3726 (unsigned int)reg->current_index, 3727 rdma->local_ram_blocks.nb_blocks); 3728 ret = -ENOENT; 3729 goto out; 3730 } 3731 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3732 if (block->is_ram_block) { 3733 if (block->offset > reg->key.current_addr) { 3734 error_report("rdma: bad register address for block %s" 3735 " offset: %" PRIx64 " current_addr: %" PRIx64, 3736 block->block_name, block->offset, 3737 reg->key.current_addr); 3738 ret = -ERANGE; 3739 goto out; 3740 } 3741 host_addr = (block->local_host_addr + 3742 (reg->key.current_addr - block->offset)); 3743 chunk = ram_chunk_index(block->local_host_addr, 3744 (uint8_t *) host_addr); 3745 } else { 3746 chunk = reg->key.chunk; 3747 host_addr = block->local_host_addr + 3748 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3749 /* Check for particularly bad chunk value */ 3750 if (host_addr < (void *)block->local_host_addr) { 3751 error_report("rdma: bad chunk for block %s" 3752 " chunk: %" PRIx64, 3753 block->block_name, reg->key.chunk); 3754 ret = -ERANGE; 3755 goto out; 3756 } 3757 } 3758 chunk_start = ram_chunk_start(block, chunk); 3759 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3760 /* avoid "-Waddress-of-packed-member" warning */ 3761 uint32_t tmp_rkey = 0; 3762 if (qemu_rdma_register_and_get_keys(rdma, block, 3763 (uintptr_t)host_addr, NULL, &tmp_rkey, 3764 chunk, chunk_start, chunk_end)) { 3765 error_report("cannot get rkey"); 3766 ret = -EINVAL; 3767 goto out; 3768 } 3769 reg_result->rkey = tmp_rkey; 3770 3771 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3772 3773 trace_qemu_rdma_registration_handle_register_rkey( 3774 reg_result->rkey); 3775 3776 result_to_network(reg_result); 3777 } 3778 3779 ret = qemu_rdma_post_send_control(rdma, 3780 (uint8_t *) results, ®_resp); 3781 3782 if (ret < 0) { 3783 error_report("Failed to send control buffer"); 3784 goto out; 3785 } 3786 break; 3787 case RDMA_CONTROL_UNREGISTER_REQUEST: 3788 trace_qemu_rdma_registration_handle_unregister(head.repeat); 3789 unreg_resp.repeat = head.repeat; 3790 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3791 3792 for (count = 0; count < head.repeat; count++) { 3793 reg = ®isters[count]; 3794 network_to_register(reg); 3795 3796 trace_qemu_rdma_registration_handle_unregister_loop(count, 3797 reg->current_index, reg->key.chunk); 3798 3799 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3800 3801 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3802 block->pmr[reg->key.chunk] = NULL; 3803 3804 if (ret != 0) { 3805 perror("rdma unregistration chunk failed"); 3806 ret = -ret; 3807 goto out; 3808 } 3809 3810 rdma->total_registrations--; 3811 3812 trace_qemu_rdma_registration_handle_unregister_success( 3813 reg->key.chunk); 3814 } 3815 3816 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp); 3817 3818 if (ret < 0) { 3819 error_report("Failed to send control buffer"); 3820 goto out; 3821 } 3822 break; 3823 case RDMA_CONTROL_REGISTER_RESULT: 3824 error_report("Invalid RESULT message at dest."); 3825 ret = -EIO; 3826 goto out; 3827 default: 3828 error_report("Unknown control message %s", control_desc(head.type)); 3829 ret = -EIO; 3830 goto out; 3831 } 3832 } while (1); 3833 out: 3834 if (ret < 0) { 3835 rdma->error_state = ret; 3836 } 3837 return ret; 3838 } 3839 3840 /* Destination: 3841 * Called via a ram_control_load_hook during the initial RAM load section which 3842 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks 3843 * on the source. 3844 * We've already built our local RAMBlock list, but not yet sent the list to 3845 * the source. 3846 */ 3847 static int 3848 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) 3849 { 3850 RDMAContext *rdma; 3851 int curr; 3852 int found = -1; 3853 3854 RCU_READ_LOCK_GUARD(); 3855 rdma = qatomic_rcu_read(&rioc->rdmain); 3856 3857 if (!rdma) { 3858 return -EIO; 3859 } 3860 3861 /* Find the matching RAMBlock in our local list */ 3862 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { 3863 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { 3864 found = curr; 3865 break; 3866 } 3867 } 3868 3869 if (found == -1) { 3870 error_report("RAMBlock '%s' not found on destination", name); 3871 return -ENOENT; 3872 } 3873 3874 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index; 3875 trace_rdma_block_notification_handle(name, rdma->next_src_index); 3876 rdma->next_src_index++; 3877 3878 return 0; 3879 } 3880 3881 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) 3882 { 3883 switch (flags) { 3884 case RAM_CONTROL_BLOCK_REG: 3885 return rdma_block_notification_handle(opaque, data); 3886 3887 case RAM_CONTROL_HOOK: 3888 return qemu_rdma_registration_handle(f, opaque); 3889 3890 default: 3891 /* Shouldn't be called with any other values */ 3892 abort(); 3893 } 3894 } 3895 3896 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, 3897 uint64_t flags, void *data) 3898 { 3899 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3900 RDMAContext *rdma; 3901 3902 RCU_READ_LOCK_GUARD(); 3903 rdma = qatomic_rcu_read(&rioc->rdmaout); 3904 if (!rdma) { 3905 return -EIO; 3906 } 3907 3908 CHECK_ERROR_STATE(); 3909 3910 if (migration_in_postcopy()) { 3911 return 0; 3912 } 3913 3914 trace_qemu_rdma_registration_start(flags); 3915 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3916 qemu_fflush(f); 3917 3918 return 0; 3919 } 3920 3921 /* 3922 * Inform dest that dynamic registrations are done for now. 3923 * First, flush writes, if any. 3924 */ 3925 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, 3926 uint64_t flags, void *data) 3927 { 3928 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); 3929 RDMAContext *rdma; 3930 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3931 int ret = 0; 3932 3933 RCU_READ_LOCK_GUARD(); 3934 rdma = qatomic_rcu_read(&rioc->rdmaout); 3935 if (!rdma) { 3936 return -EIO; 3937 } 3938 3939 CHECK_ERROR_STATE(); 3940 3941 if (migration_in_postcopy()) { 3942 return 0; 3943 } 3944 3945 qemu_fflush(f); 3946 ret = qemu_rdma_drain_cq(f, rdma); 3947 3948 if (ret < 0) { 3949 goto err; 3950 } 3951 3952 if (flags == RAM_CONTROL_SETUP) { 3953 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3954 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3955 int reg_result_idx, i, nb_dest_blocks; 3956 3957 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3958 trace_qemu_rdma_registration_stop_ram(); 3959 3960 /* 3961 * Make sure that we parallelize the pinning on both sides. 3962 * For very large guests, doing this serially takes a really 3963 * long time, so we have to 'interleave' the pinning locally 3964 * with the control messages by performing the pinning on this 3965 * side before we receive the control response from the other 3966 * side that the pinning has completed. 3967 */ 3968 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3969 ®_result_idx, rdma->pin_all ? 3970 qemu_rdma_reg_whole_ram_blocks : NULL); 3971 if (ret < 0) { 3972 fprintf(stderr, "receiving remote info!"); 3973 return ret; 3974 } 3975 3976 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3977 3978 /* 3979 * The protocol uses two different sets of rkeys (mutually exclusive): 3980 * 1. One key to represent the virtual address of the entire ram block. 3981 * (dynamic chunk registration disabled - pin everything with one rkey.) 3982 * 2. One to represent individual chunks within a ram block. 3983 * (dynamic chunk registration enabled - pin individual chunks.) 3984 * 3985 * Once the capability is successfully negotiated, the destination transmits 3986 * the keys to use (or sends them later) including the virtual addresses 3987 * and then propagates the remote ram block descriptions to his local copy. 3988 */ 3989 3990 if (local->nb_blocks != nb_dest_blocks) { 3991 fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) " 3992 "Your QEMU command line parameters are probably " 3993 "not identical on both the source and destination.", 3994 local->nb_blocks, nb_dest_blocks); 3995 rdma->error_state = -EINVAL; 3996 return -EINVAL; 3997 } 3998 3999 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 4000 memcpy(rdma->dest_blocks, 4001 rdma->wr_data[reg_result_idx].control_curr, resp.len); 4002 for (i = 0; i < nb_dest_blocks; i++) { 4003 network_to_dest_block(&rdma->dest_blocks[i]); 4004 4005 /* We require that the blocks are in the same order */ 4006 if (rdma->dest_blocks[i].length != local->block[i].length) { 4007 fprintf(stderr, "Block %s/%d has a different length %" PRIu64 4008 "vs %" PRIu64, local->block[i].block_name, i, 4009 local->block[i].length, 4010 rdma->dest_blocks[i].length); 4011 rdma->error_state = -EINVAL; 4012 return -EINVAL; 4013 } 4014 local->block[i].remote_host_addr = 4015 rdma->dest_blocks[i].remote_host_addr; 4016 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; 4017 } 4018 } 4019 4020 trace_qemu_rdma_registration_stop(flags); 4021 4022 head.type = RDMA_CONTROL_REGISTER_FINISHED; 4023 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); 4024 4025 if (ret < 0) { 4026 goto err; 4027 } 4028 4029 return 0; 4030 err: 4031 rdma->error_state = ret; 4032 return ret; 4033 } 4034 4035 static const QEMUFileHooks rdma_read_hooks = { 4036 .hook_ram_load = rdma_load_hook, 4037 }; 4038 4039 static const QEMUFileHooks rdma_write_hooks = { 4040 .before_ram_iterate = qemu_rdma_registration_start, 4041 .after_ram_iterate = qemu_rdma_registration_stop, 4042 .save_page = qemu_rdma_save_page, 4043 }; 4044 4045 4046 static void qio_channel_rdma_finalize(Object *obj) 4047 { 4048 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); 4049 if (rioc->rdmain) { 4050 qemu_rdma_cleanup(rioc->rdmain); 4051 g_free(rioc->rdmain); 4052 rioc->rdmain = NULL; 4053 } 4054 if (rioc->rdmaout) { 4055 qemu_rdma_cleanup(rioc->rdmaout); 4056 g_free(rioc->rdmaout); 4057 rioc->rdmaout = NULL; 4058 } 4059 } 4060 4061 static void qio_channel_rdma_class_init(ObjectClass *klass, 4062 void *class_data G_GNUC_UNUSED) 4063 { 4064 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 4065 4066 ioc_klass->io_writev = qio_channel_rdma_writev; 4067 ioc_klass->io_readv = qio_channel_rdma_readv; 4068 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; 4069 ioc_klass->io_close = qio_channel_rdma_close; 4070 ioc_klass->io_create_watch = qio_channel_rdma_create_watch; 4071 ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler; 4072 ioc_klass->io_shutdown = qio_channel_rdma_shutdown; 4073 } 4074 4075 static const TypeInfo qio_channel_rdma_info = { 4076 .parent = TYPE_QIO_CHANNEL, 4077 .name = TYPE_QIO_CHANNEL_RDMA, 4078 .instance_size = sizeof(QIOChannelRDMA), 4079 .instance_finalize = qio_channel_rdma_finalize, 4080 .class_init = qio_channel_rdma_class_init, 4081 }; 4082 4083 static void qio_channel_rdma_register_types(void) 4084 { 4085 type_register_static(&qio_channel_rdma_info); 4086 } 4087 4088 type_init(qio_channel_rdma_register_types); 4089 4090 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) 4091 { 4092 QIOChannelRDMA *rioc; 4093 4094 if (qemu_file_mode_is_not_valid(mode)) { 4095 return NULL; 4096 } 4097 4098 rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); 4099 4100 if (mode[0] == 'w') { 4101 rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); 4102 rioc->rdmaout = rdma; 4103 rioc->rdmain = rdma->return_path; 4104 qemu_file_set_hooks(rioc->file, &rdma_write_hooks); 4105 } else { 4106 rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc)); 4107 rioc->rdmain = rdma; 4108 rioc->rdmaout = rdma->return_path; 4109 qemu_file_set_hooks(rioc->file, &rdma_read_hooks); 4110 } 4111 4112 return rioc->file; 4113 } 4114 4115 static void rdma_accept_incoming_migration(void *opaque) 4116 { 4117 RDMAContext *rdma = opaque; 4118 int ret; 4119 QEMUFile *f; 4120 Error *local_err = NULL; 4121 4122 trace_qemu_rdma_accept_incoming_migration(); 4123 ret = qemu_rdma_accept(rdma); 4124 4125 if (ret) { 4126 fprintf(stderr, "RDMA ERROR: Migration initialization failed\n"); 4127 return; 4128 } 4129 4130 trace_qemu_rdma_accept_incoming_migration_accepted(); 4131 4132 if (rdma->is_return_path) { 4133 return; 4134 } 4135 4136 f = qemu_fopen_rdma(rdma, "rb"); 4137 if (f == NULL) { 4138 fprintf(stderr, "RDMA ERROR: could not qemu_fopen_rdma\n"); 4139 qemu_rdma_cleanup(rdma); 4140 return; 4141 } 4142 4143 rdma->migration_started_on_destination = 1; 4144 migration_fd_process_incoming(f, &local_err); 4145 if (local_err) { 4146 error_reportf_err(local_err, "RDMA ERROR:"); 4147 } 4148 } 4149 4150 void rdma_start_incoming_migration(const char *host_port, Error **errp) 4151 { 4152 int ret; 4153 RDMAContext *rdma, *rdma_return_path = NULL; 4154 Error *local_err = NULL; 4155 4156 trace_rdma_start_incoming_migration(); 4157 4158 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 4159 if (ram_block_discard_is_required()) { 4160 error_setg(errp, "RDMA: cannot disable RAM discard"); 4161 return; 4162 } 4163 4164 rdma = qemu_rdma_data_init(host_port, &local_err); 4165 if (rdma == NULL) { 4166 goto err; 4167 } 4168 4169 ret = qemu_rdma_dest_init(rdma, &local_err); 4170 4171 if (ret) { 4172 goto err; 4173 } 4174 4175 trace_rdma_start_incoming_migration_after_dest_init(); 4176 4177 ret = rdma_listen(rdma->listen_id, 5); 4178 4179 if (ret) { 4180 ERROR(errp, "listening on socket!"); 4181 goto cleanup_rdma; 4182 } 4183 4184 trace_rdma_start_incoming_migration_after_rdma_listen(); 4185 4186 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 4187 NULL, (void *)(intptr_t)rdma); 4188 return; 4189 4190 cleanup_rdma: 4191 qemu_rdma_cleanup(rdma); 4192 err: 4193 error_propagate(errp, local_err); 4194 if (rdma) { 4195 g_free(rdma->host); 4196 g_free(rdma->host_port); 4197 } 4198 g_free(rdma); 4199 g_free(rdma_return_path); 4200 } 4201 4202 void rdma_start_outgoing_migration(void *opaque, 4203 const char *host_port, Error **errp) 4204 { 4205 MigrationState *s = opaque; 4206 RDMAContext *rdma_return_path = NULL; 4207 RDMAContext *rdma; 4208 int ret = 0; 4209 4210 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 4211 if (ram_block_discard_is_required()) { 4212 error_setg(errp, "RDMA: cannot disable RAM discard"); 4213 return; 4214 } 4215 4216 rdma = qemu_rdma_data_init(host_port, errp); 4217 if (rdma == NULL) { 4218 goto err; 4219 } 4220 4221 ret = qemu_rdma_source_init(rdma, 4222 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp); 4223 4224 if (ret) { 4225 goto err; 4226 } 4227 4228 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 4229 ret = qemu_rdma_connect(rdma, errp, false); 4230 4231 if (ret) { 4232 goto err; 4233 } 4234 4235 /* RDMA postcopy need a separate queue pair for return path */ 4236 if (migrate_postcopy()) { 4237 rdma_return_path = qemu_rdma_data_init(host_port, errp); 4238 4239 if (rdma_return_path == NULL) { 4240 goto return_path_err; 4241 } 4242 4243 ret = qemu_rdma_source_init(rdma_return_path, 4244 s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp); 4245 4246 if (ret) { 4247 goto return_path_err; 4248 } 4249 4250 ret = qemu_rdma_connect(rdma_return_path, errp, true); 4251 4252 if (ret) { 4253 goto return_path_err; 4254 } 4255 4256 rdma->return_path = rdma_return_path; 4257 rdma_return_path->return_path = rdma; 4258 rdma_return_path->is_return_path = true; 4259 } 4260 4261 trace_rdma_start_outgoing_migration_after_rdma_connect(); 4262 4263 s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); 4264 migrate_fd_connect(s, NULL); 4265 return; 4266 return_path_err: 4267 qemu_rdma_cleanup(rdma); 4268 err: 4269 g_free(rdma); 4270 g_free(rdma_return_path); 4271 } 4272