1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * Copyright Red Hat, Inc. 2015-2016 6 * 7 * Authors: 8 * Michael R. Hines <mrhines@us.ibm.com> 9 * Jiuxing Liu <jl@us.ibm.com> 10 * Daniel P. Berrange <berrange@redhat.com> 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or 13 * later. See the COPYING file in the top-level directory. 14 * 15 */ 16 17 #include "qemu/osdep.h" 18 #include "qapi/error.h" 19 #include "qemu/cutils.h" 20 #include "exec/target_page.h" 21 #include "rdma.h" 22 #include "migration.h" 23 #include "migration-stats.h" 24 #include "qemu-file.h" 25 #include "ram.h" 26 #include "qemu/error-report.h" 27 #include "qemu/main-loop.h" 28 #include "qemu/module.h" 29 #include "qemu/rcu.h" 30 #include "qemu/sockets.h" 31 #include "qemu/bitmap.h" 32 #include "qemu/coroutine.h" 33 #include "exec/memory.h" 34 #include <sys/socket.h> 35 #include <netdb.h> 36 #include <arpa/inet.h> 37 #include <rdma/rdma_cma.h> 38 #include "trace.h" 39 #include "qom/object.h" 40 #include "options.h" 41 #include <poll.h> 42 43 /* 44 * Print and error on both the Monitor and the Log file. 45 */ 46 #define ERROR(errp, fmt, ...) \ 47 do { \ 48 fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \ 49 if (errp && (*(errp) == NULL)) { \ 50 error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \ 51 } \ 52 } while (0) 53 54 #define RDMA_RESOLVE_TIMEOUT_MS 10000 55 56 /* Do not merge data if larger than this. */ 57 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 58 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 59 60 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 61 62 /* 63 * This is only for non-live state being migrated. 64 * Instead of RDMA_WRITE messages, we use RDMA_SEND 65 * messages for that state, which requires a different 66 * delivery design than main memory. 67 */ 68 #define RDMA_SEND_INCREMENT 32768 69 70 /* 71 * Maximum size infiniband SEND message 72 */ 73 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 74 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 75 76 #define RDMA_CONTROL_VERSION_CURRENT 1 77 /* 78 * Capabilities for negotiation. 79 */ 80 #define RDMA_CAPABILITY_PIN_ALL 0x01 81 82 /* 83 * Add the other flags above to this list of known capabilities 84 * as they are introduced. 85 */ 86 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 87 88 #define CHECK_ERROR_STATE() \ 89 do { \ 90 if (rdma->error_state) { \ 91 if (!rdma->error_reported) { \ 92 error_report("RDMA is in an error state waiting migration" \ 93 " to abort!"); \ 94 rdma->error_reported = 1; \ 95 } \ 96 return rdma->error_state; \ 97 } \ 98 } while (0) 99 100 /* 101 * A work request ID is 64-bits and we split up these bits 102 * into 3 parts: 103 * 104 * bits 0-15 : type of control message, 2^16 105 * bits 16-29: ram block index, 2^14 106 * bits 30-63: ram block chunk number, 2^34 107 * 108 * The last two bit ranges are only used for RDMA writes, 109 * in order to track their completion and potentially 110 * also track unregistration status of the message. 111 */ 112 #define RDMA_WRID_TYPE_SHIFT 0UL 113 #define RDMA_WRID_BLOCK_SHIFT 16UL 114 #define RDMA_WRID_CHUNK_SHIFT 30UL 115 116 #define RDMA_WRID_TYPE_MASK \ 117 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 118 119 #define RDMA_WRID_BLOCK_MASK \ 120 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 121 122 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 123 124 /* 125 * RDMA migration protocol: 126 * 1. RDMA Writes (data messages, i.e. RAM) 127 * 2. IB Send/Recv (control channel messages) 128 */ 129 enum { 130 RDMA_WRID_NONE = 0, 131 RDMA_WRID_RDMA_WRITE = 1, 132 RDMA_WRID_SEND_CONTROL = 2000, 133 RDMA_WRID_RECV_CONTROL = 4000, 134 }; 135 136 static const char *wrid_desc[] = { 137 [RDMA_WRID_NONE] = "NONE", 138 [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA", 139 [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND", 140 [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV", 141 }; 142 143 /* 144 * Work request IDs for IB SEND messages only (not RDMA writes). 145 * This is used by the migration protocol to transmit 146 * control messages (such as device state and registration commands) 147 * 148 * We could use more WRs, but we have enough for now. 149 */ 150 enum { 151 RDMA_WRID_READY = 0, 152 RDMA_WRID_DATA, 153 RDMA_WRID_CONTROL, 154 RDMA_WRID_MAX, 155 }; 156 157 /* 158 * SEND/RECV IB Control Messages. 159 */ 160 enum { 161 RDMA_CONTROL_NONE = 0, 162 RDMA_CONTROL_ERROR, 163 RDMA_CONTROL_READY, /* ready to receive */ 164 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 165 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 166 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 167 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 168 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 169 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 170 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 171 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 172 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 173 }; 174 175 176 /* 177 * Memory and MR structures used to represent an IB Send/Recv work request. 178 * This is *not* used for RDMA writes, only IB Send/Recv. 179 */ 180 typedef struct { 181 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 182 struct ibv_mr *control_mr; /* registration metadata */ 183 size_t control_len; /* length of the message */ 184 uint8_t *control_curr; /* start of unconsumed bytes */ 185 } RDMAWorkRequestData; 186 187 /* 188 * Negotiate RDMA capabilities during connection-setup time. 189 */ 190 typedef struct { 191 uint32_t version; 192 uint32_t flags; 193 } RDMACapabilities; 194 195 static void caps_to_network(RDMACapabilities *cap) 196 { 197 cap->version = htonl(cap->version); 198 cap->flags = htonl(cap->flags); 199 } 200 201 static void network_to_caps(RDMACapabilities *cap) 202 { 203 cap->version = ntohl(cap->version); 204 cap->flags = ntohl(cap->flags); 205 } 206 207 /* 208 * Representation of a RAMBlock from an RDMA perspective. 209 * This is not transmitted, only local. 210 * This and subsequent structures cannot be linked lists 211 * because we're using a single IB message to transmit 212 * the information. It's small anyway, so a list is overkill. 213 */ 214 typedef struct RDMALocalBlock { 215 char *block_name; 216 uint8_t *local_host_addr; /* local virtual address */ 217 uint64_t remote_host_addr; /* remote virtual address */ 218 uint64_t offset; 219 uint64_t length; 220 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 221 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 222 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 223 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 224 int index; /* which block are we */ 225 unsigned int src_index; /* (Only used on dest) */ 226 bool is_ram_block; 227 int nb_chunks; 228 unsigned long *transit_bitmap; 229 unsigned long *unregister_bitmap; 230 } RDMALocalBlock; 231 232 /* 233 * Also represents a RAMblock, but only on the dest. 234 * This gets transmitted by the dest during connection-time 235 * to the source VM and then is used to populate the 236 * corresponding RDMALocalBlock with 237 * the information needed to perform the actual RDMA. 238 */ 239 typedef struct QEMU_PACKED RDMADestBlock { 240 uint64_t remote_host_addr; 241 uint64_t offset; 242 uint64_t length; 243 uint32_t remote_rkey; 244 uint32_t padding; 245 } RDMADestBlock; 246 247 static const char *control_desc(unsigned int rdma_control) 248 { 249 static const char *strs[] = { 250 [RDMA_CONTROL_NONE] = "NONE", 251 [RDMA_CONTROL_ERROR] = "ERROR", 252 [RDMA_CONTROL_READY] = "READY", 253 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 254 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 255 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 256 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 257 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 258 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 259 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 260 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 261 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 262 }; 263 264 if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) { 265 return "??BAD CONTROL VALUE??"; 266 } 267 268 return strs[rdma_control]; 269 } 270 271 static uint64_t htonll(uint64_t v) 272 { 273 union { uint32_t lv[2]; uint64_t llv; } u; 274 u.lv[0] = htonl(v >> 32); 275 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 276 return u.llv; 277 } 278 279 static uint64_t ntohll(uint64_t v) 280 { 281 union { uint32_t lv[2]; uint64_t llv; } u; 282 u.llv = v; 283 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 284 } 285 286 static void dest_block_to_network(RDMADestBlock *db) 287 { 288 db->remote_host_addr = htonll(db->remote_host_addr); 289 db->offset = htonll(db->offset); 290 db->length = htonll(db->length); 291 db->remote_rkey = htonl(db->remote_rkey); 292 } 293 294 static void network_to_dest_block(RDMADestBlock *db) 295 { 296 db->remote_host_addr = ntohll(db->remote_host_addr); 297 db->offset = ntohll(db->offset); 298 db->length = ntohll(db->length); 299 db->remote_rkey = ntohl(db->remote_rkey); 300 } 301 302 /* 303 * Virtual address of the above structures used for transmitting 304 * the RAMBlock descriptions at connection-time. 305 * This structure is *not* transmitted. 306 */ 307 typedef struct RDMALocalBlocks { 308 int nb_blocks; 309 bool init; /* main memory init complete */ 310 RDMALocalBlock *block; 311 } RDMALocalBlocks; 312 313 /* 314 * Main data structure for RDMA state. 315 * While there is only one copy of this structure being allocated right now, 316 * this is the place where one would start if you wanted to consider 317 * having more than one RDMA connection open at the same time. 318 */ 319 typedef struct RDMAContext { 320 char *host; 321 int port; 322 char *host_port; 323 324 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 325 326 /* 327 * This is used by *_exchange_send() to figure out whether or not 328 * the initial "READY" message has already been received or not. 329 * This is because other functions may potentially poll() and detect 330 * the READY message before send() does, in which case we need to 331 * know if it completed. 332 */ 333 int control_ready_expected; 334 335 /* number of outstanding writes */ 336 int nb_sent; 337 338 /* store info about current buffer so that we can 339 merge it with future sends */ 340 uint64_t current_addr; 341 uint64_t current_length; 342 /* index of ram block the current buffer belongs to */ 343 int current_index; 344 /* index of the chunk in the current ram block */ 345 int current_chunk; 346 347 bool pin_all; 348 349 /* 350 * infiniband-specific variables for opening the device 351 * and maintaining connection state and so forth. 352 * 353 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 354 * cm_id->verbs, cm_id->channel, and cm_id->qp. 355 */ 356 struct rdma_cm_id *cm_id; /* connection manager ID */ 357 struct rdma_cm_id *listen_id; 358 bool connected; 359 360 struct ibv_context *verbs; 361 struct rdma_event_channel *channel; 362 struct ibv_qp *qp; /* queue pair */ 363 struct ibv_comp_channel *recv_comp_channel; /* recv completion channel */ 364 struct ibv_comp_channel *send_comp_channel; /* send completion channel */ 365 struct ibv_pd *pd; /* protection domain */ 366 struct ibv_cq *recv_cq; /* recvieve completion queue */ 367 struct ibv_cq *send_cq; /* send completion queue */ 368 369 /* 370 * If a previous write failed (perhaps because of a failed 371 * memory registration, then do not attempt any future work 372 * and remember the error state. 373 */ 374 int error_state; 375 int error_reported; 376 int received_error; 377 378 /* 379 * Description of ram blocks used throughout the code. 380 */ 381 RDMALocalBlocks local_ram_blocks; 382 RDMADestBlock *dest_blocks; 383 384 /* Index of the next RAMBlock received during block registration */ 385 unsigned int next_src_index; 386 387 /* 388 * Migration on *destination* started. 389 * Then use coroutine yield function. 390 * Source runs in a thread, so we don't care. 391 */ 392 int migration_started_on_destination; 393 394 int total_registrations; 395 int total_writes; 396 397 int unregister_current, unregister_next; 398 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 399 400 GHashTable *blockmap; 401 402 /* the RDMAContext for return path */ 403 struct RDMAContext *return_path; 404 bool is_return_path; 405 } RDMAContext; 406 407 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" 408 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA) 409 410 411 412 struct QIOChannelRDMA { 413 QIOChannel parent; 414 RDMAContext *rdmain; 415 RDMAContext *rdmaout; 416 QEMUFile *file; 417 bool blocking; /* XXX we don't actually honour this yet */ 418 }; 419 420 /* 421 * Main structure for IB Send/Recv control messages. 422 * This gets prepended at the beginning of every Send/Recv. 423 */ 424 typedef struct QEMU_PACKED { 425 uint32_t len; /* Total length of data portion */ 426 uint32_t type; /* which control command to perform */ 427 uint32_t repeat; /* number of commands in data portion of same type */ 428 uint32_t padding; 429 } RDMAControlHeader; 430 431 static void control_to_network(RDMAControlHeader *control) 432 { 433 control->type = htonl(control->type); 434 control->len = htonl(control->len); 435 control->repeat = htonl(control->repeat); 436 } 437 438 static void network_to_control(RDMAControlHeader *control) 439 { 440 control->type = ntohl(control->type); 441 control->len = ntohl(control->len); 442 control->repeat = ntohl(control->repeat); 443 } 444 445 /* 446 * Register a single Chunk. 447 * Information sent by the source VM to inform the dest 448 * to register an single chunk of memory before we can perform 449 * the actual RDMA operation. 450 */ 451 typedef struct QEMU_PACKED { 452 union QEMU_PACKED { 453 uint64_t current_addr; /* offset into the ram_addr_t space */ 454 uint64_t chunk; /* chunk to lookup if unregistering */ 455 } key; 456 uint32_t current_index; /* which ramblock the chunk belongs to */ 457 uint32_t padding; 458 uint64_t chunks; /* how many sequential chunks to register */ 459 } RDMARegister; 460 461 static void register_to_network(RDMAContext *rdma, RDMARegister *reg) 462 { 463 RDMALocalBlock *local_block; 464 local_block = &rdma->local_ram_blocks.block[reg->current_index]; 465 466 if (local_block->is_ram_block) { 467 /* 468 * current_addr as passed in is an address in the local ram_addr_t 469 * space, we need to translate this for the destination 470 */ 471 reg->key.current_addr -= local_block->offset; 472 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset; 473 } 474 reg->key.current_addr = htonll(reg->key.current_addr); 475 reg->current_index = htonl(reg->current_index); 476 reg->chunks = htonll(reg->chunks); 477 } 478 479 static void network_to_register(RDMARegister *reg) 480 { 481 reg->key.current_addr = ntohll(reg->key.current_addr); 482 reg->current_index = ntohl(reg->current_index); 483 reg->chunks = ntohll(reg->chunks); 484 } 485 486 typedef struct QEMU_PACKED { 487 uint32_t value; /* if zero, we will madvise() */ 488 uint32_t block_idx; /* which ram block index */ 489 uint64_t offset; /* Address in remote ram_addr_t space */ 490 uint64_t length; /* length of the chunk */ 491 } RDMACompress; 492 493 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp) 494 { 495 comp->value = htonl(comp->value); 496 /* 497 * comp->offset as passed in is an address in the local ram_addr_t 498 * space, we need to translate this for the destination 499 */ 500 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset; 501 comp->offset += rdma->dest_blocks[comp->block_idx].offset; 502 comp->block_idx = htonl(comp->block_idx); 503 comp->offset = htonll(comp->offset); 504 comp->length = htonll(comp->length); 505 } 506 507 static void network_to_compress(RDMACompress *comp) 508 { 509 comp->value = ntohl(comp->value); 510 comp->block_idx = ntohl(comp->block_idx); 511 comp->offset = ntohll(comp->offset); 512 comp->length = ntohll(comp->length); 513 } 514 515 /* 516 * The result of the dest's memory registration produces an "rkey" 517 * which the source VM must reference in order to perform 518 * the RDMA operation. 519 */ 520 typedef struct QEMU_PACKED { 521 uint32_t rkey; 522 uint32_t padding; 523 uint64_t host_addr; 524 } RDMARegisterResult; 525 526 static void result_to_network(RDMARegisterResult *result) 527 { 528 result->rkey = htonl(result->rkey); 529 result->host_addr = htonll(result->host_addr); 530 }; 531 532 static void network_to_result(RDMARegisterResult *result) 533 { 534 result->rkey = ntohl(result->rkey); 535 result->host_addr = ntohll(result->host_addr); 536 }; 537 538 const char *print_wrid(int wrid); 539 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 540 uint8_t *data, RDMAControlHeader *resp, 541 int *resp_idx, 542 int (*callback)(RDMAContext *rdma)); 543 544 static inline uint64_t ram_chunk_index(const uint8_t *start, 545 const uint8_t *host) 546 { 547 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 548 } 549 550 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 551 uint64_t i) 552 { 553 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 554 (i << RDMA_REG_CHUNK_SHIFT)); 555 } 556 557 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 558 uint64_t i) 559 { 560 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 561 (1UL << RDMA_REG_CHUNK_SHIFT); 562 563 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 564 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 565 } 566 567 return result; 568 } 569 570 static int rdma_add_block(RDMAContext *rdma, const char *block_name, 571 void *host_addr, 572 ram_addr_t block_offset, uint64_t length) 573 { 574 RDMALocalBlocks *local = &rdma->local_ram_blocks; 575 RDMALocalBlock *block; 576 RDMALocalBlock *old = local->block; 577 578 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1); 579 580 if (local->nb_blocks) { 581 int x; 582 583 if (rdma->blockmap) { 584 for (x = 0; x < local->nb_blocks; x++) { 585 g_hash_table_remove(rdma->blockmap, 586 (void *)(uintptr_t)old[x].offset); 587 g_hash_table_insert(rdma->blockmap, 588 (void *)(uintptr_t)old[x].offset, 589 &local->block[x]); 590 } 591 } 592 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 593 g_free(old); 594 } 595 596 block = &local->block[local->nb_blocks]; 597 598 block->block_name = g_strdup(block_name); 599 block->local_host_addr = host_addr; 600 block->offset = block_offset; 601 block->length = length; 602 block->index = local->nb_blocks; 603 block->src_index = ~0U; /* Filled in by the receipt of the block list */ 604 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 605 block->transit_bitmap = bitmap_new(block->nb_chunks); 606 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 607 block->unregister_bitmap = bitmap_new(block->nb_chunks); 608 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 609 block->remote_keys = g_new0(uint32_t, block->nb_chunks); 610 611 block->is_ram_block = local->init ? false : true; 612 613 if (rdma->blockmap) { 614 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block); 615 } 616 617 trace_rdma_add_block(block_name, local->nb_blocks, 618 (uintptr_t) block->local_host_addr, 619 block->offset, block->length, 620 (uintptr_t) (block->local_host_addr + block->length), 621 BITS_TO_LONGS(block->nb_chunks) * 622 sizeof(unsigned long) * 8, 623 block->nb_chunks); 624 625 local->nb_blocks++; 626 627 return 0; 628 } 629 630 /* 631 * Memory regions need to be registered with the device and queue pairs setup 632 * in advanced before the migration starts. This tells us where the RAM blocks 633 * are so that we can register them individually. 634 */ 635 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque) 636 { 637 const char *block_name = qemu_ram_get_idstr(rb); 638 void *host_addr = qemu_ram_get_host_addr(rb); 639 ram_addr_t block_offset = qemu_ram_get_offset(rb); 640 ram_addr_t length = qemu_ram_get_used_length(rb); 641 return rdma_add_block(opaque, block_name, host_addr, block_offset, length); 642 } 643 644 /* 645 * Identify the RAMBlocks and their quantity. They will be references to 646 * identify chunk boundaries inside each RAMBlock and also be referenced 647 * during dynamic page registration. 648 */ 649 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma) 650 { 651 RDMALocalBlocks *local = &rdma->local_ram_blocks; 652 int ret; 653 654 assert(rdma->blockmap == NULL); 655 memset(local, 0, sizeof *local); 656 ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma); 657 if (ret) { 658 return ret; 659 } 660 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 661 rdma->dest_blocks = g_new0(RDMADestBlock, 662 rdma->local_ram_blocks.nb_blocks); 663 local->init = true; 664 return 0; 665 } 666 667 /* 668 * Note: If used outside of cleanup, the caller must ensure that the destination 669 * block structures are also updated 670 */ 671 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) 672 { 673 RDMALocalBlocks *local = &rdma->local_ram_blocks; 674 RDMALocalBlock *old = local->block; 675 int x; 676 677 if (rdma->blockmap) { 678 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset); 679 } 680 if (block->pmr) { 681 int j; 682 683 for (j = 0; j < block->nb_chunks; j++) { 684 if (!block->pmr[j]) { 685 continue; 686 } 687 ibv_dereg_mr(block->pmr[j]); 688 rdma->total_registrations--; 689 } 690 g_free(block->pmr); 691 block->pmr = NULL; 692 } 693 694 if (block->mr) { 695 ibv_dereg_mr(block->mr); 696 rdma->total_registrations--; 697 block->mr = NULL; 698 } 699 700 g_free(block->transit_bitmap); 701 block->transit_bitmap = NULL; 702 703 g_free(block->unregister_bitmap); 704 block->unregister_bitmap = NULL; 705 706 g_free(block->remote_keys); 707 block->remote_keys = NULL; 708 709 g_free(block->block_name); 710 block->block_name = NULL; 711 712 if (rdma->blockmap) { 713 for (x = 0; x < local->nb_blocks; x++) { 714 g_hash_table_remove(rdma->blockmap, 715 (void *)(uintptr_t)old[x].offset); 716 } 717 } 718 719 if (local->nb_blocks > 1) { 720 721 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1); 722 723 if (block->index) { 724 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 725 } 726 727 if (block->index < (local->nb_blocks - 1)) { 728 memcpy(local->block + block->index, old + (block->index + 1), 729 sizeof(RDMALocalBlock) * 730 (local->nb_blocks - (block->index + 1))); 731 for (x = block->index; x < local->nb_blocks - 1; x++) { 732 local->block[x].index--; 733 } 734 } 735 } else { 736 assert(block == local->block); 737 local->block = NULL; 738 } 739 740 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr, 741 block->offset, block->length, 742 (uintptr_t)(block->local_host_addr + block->length), 743 BITS_TO_LONGS(block->nb_chunks) * 744 sizeof(unsigned long) * 8, block->nb_chunks); 745 746 g_free(old); 747 748 local->nb_blocks--; 749 750 if (local->nb_blocks && rdma->blockmap) { 751 for (x = 0; x < local->nb_blocks; x++) { 752 g_hash_table_insert(rdma->blockmap, 753 (void *)(uintptr_t)local->block[x].offset, 754 &local->block[x]); 755 } 756 } 757 758 return 0; 759 } 760 761 /* 762 * Put in the log file which RDMA device was opened and the details 763 * associated with that device. 764 */ 765 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 766 { 767 struct ibv_port_attr port; 768 769 if (ibv_query_port(verbs, 1, &port)) { 770 error_report("Failed to query port information"); 771 return; 772 } 773 774 printf("%s RDMA Device opened: kernel name %s " 775 "uverbs device name %s, " 776 "infiniband_verbs class device path %s, " 777 "infiniband class device path %s, " 778 "transport: (%d) %s\n", 779 who, 780 verbs->device->name, 781 verbs->device->dev_name, 782 verbs->device->dev_path, 783 verbs->device->ibdev_path, 784 port.link_layer, 785 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" : 786 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 787 ? "Ethernet" : "Unknown")); 788 } 789 790 /* 791 * Put in the log file the RDMA gid addressing information, 792 * useful for folks who have trouble understanding the 793 * RDMA device hierarchy in the kernel. 794 */ 795 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 796 { 797 char sgid[33]; 798 char dgid[33]; 799 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 800 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 801 trace_qemu_rdma_dump_gid(who, sgid, dgid); 802 } 803 804 /* 805 * As of now, IPv6 over RoCE / iWARP is not supported by linux. 806 * We will try the next addrinfo struct, and fail if there are 807 * no other valid addresses to bind against. 808 * 809 * If user is listening on '[::]', then we will not have a opened a device 810 * yet and have no way of verifying if the device is RoCE or not. 811 * 812 * In this case, the source VM will throw an error for ALL types of 813 * connections (both IPv4 and IPv6) if the destination machine does not have 814 * a regular infiniband network available for use. 815 * 816 * The only way to guarantee that an error is thrown for broken kernels is 817 * for the management software to choose a *specific* interface at bind time 818 * and validate what time of hardware it is. 819 * 820 * Unfortunately, this puts the user in a fix: 821 * 822 * If the source VM connects with an IPv4 address without knowing that the 823 * destination has bound to '[::]' the migration will unconditionally fail 824 * unless the management software is explicitly listening on the IPv4 825 * address while using a RoCE-based device. 826 * 827 * If the source VM connects with an IPv6 address, then we're OK because we can 828 * throw an error on the source (and similarly on the destination). 829 * 830 * But in mixed environments, this will be broken for a while until it is fixed 831 * inside linux. 832 * 833 * We do provide a *tiny* bit of help in this function: We can list all of the 834 * devices in the system and check to see if all the devices are RoCE or 835 * Infiniband. 836 * 837 * If we detect that we have a *pure* RoCE environment, then we can safely 838 * thrown an error even if the management software has specified '[::]' as the 839 * bind address. 840 * 841 * However, if there is are multiple hetergeneous devices, then we cannot make 842 * this assumption and the user just has to be sure they know what they are 843 * doing. 844 * 845 * Patches are being reviewed on linux-rdma. 846 */ 847 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp) 848 { 849 /* This bug only exists in linux, to our knowledge. */ 850 #ifdef CONFIG_LINUX 851 struct ibv_port_attr port_attr; 852 853 /* 854 * Verbs are only NULL if management has bound to '[::]'. 855 * 856 * Let's iterate through all the devices and see if there any pure IB 857 * devices (non-ethernet). 858 * 859 * If not, then we can safely proceed with the migration. 860 * Otherwise, there are no guarantees until the bug is fixed in linux. 861 */ 862 if (!verbs) { 863 int num_devices, x; 864 struct ibv_device **dev_list = ibv_get_device_list(&num_devices); 865 bool roce_found = false; 866 bool ib_found = false; 867 868 for (x = 0; x < num_devices; x++) { 869 verbs = ibv_open_device(dev_list[x]); 870 if (!verbs) { 871 if (errno == EPERM) { 872 continue; 873 } else { 874 return -EINVAL; 875 } 876 } 877 878 if (ibv_query_port(verbs, 1, &port_attr)) { 879 ibv_close_device(verbs); 880 ERROR(errp, "Could not query initial IB port"); 881 return -EINVAL; 882 } 883 884 if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) { 885 ib_found = true; 886 } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 887 roce_found = true; 888 } 889 890 ibv_close_device(verbs); 891 892 } 893 894 if (roce_found) { 895 if (ib_found) { 896 fprintf(stderr, "WARN: migrations may fail:" 897 " IPv6 over RoCE / iWARP in linux" 898 " is broken. But since you appear to have a" 899 " mixed RoCE / IB environment, be sure to only" 900 " migrate over the IB fabric until the kernel " 901 " fixes the bug.\n"); 902 } else { 903 ERROR(errp, "You only have RoCE / iWARP devices in your systems" 904 " and your management software has specified '[::]'" 905 ", but IPv6 over RoCE / iWARP is not supported in Linux."); 906 return -ENONET; 907 } 908 } 909 910 return 0; 911 } 912 913 /* 914 * If we have a verbs context, that means that some other than '[::]' was 915 * used by the management software for binding. In which case we can 916 * actually warn the user about a potentially broken kernel. 917 */ 918 919 /* IB ports start with 1, not 0 */ 920 if (ibv_query_port(verbs, 1, &port_attr)) { 921 ERROR(errp, "Could not query initial IB port"); 922 return -EINVAL; 923 } 924 925 if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) { 926 ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 " 927 "(but patches on linux-rdma in progress)"); 928 return -ENONET; 929 } 930 931 #endif 932 933 return 0; 934 } 935 936 /* 937 * Figure out which RDMA device corresponds to the requested IP hostname 938 * Also create the initial connection manager identifiers for opening 939 * the connection. 940 */ 941 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 942 { 943 int ret; 944 struct rdma_addrinfo *res; 945 char port_str[16]; 946 struct rdma_cm_event *cm_event; 947 char ip[40] = "unknown"; 948 struct rdma_addrinfo *e; 949 950 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 951 ERROR(errp, "RDMA hostname has not been set"); 952 return -EINVAL; 953 } 954 955 /* create CM channel */ 956 rdma->channel = rdma_create_event_channel(); 957 if (!rdma->channel) { 958 ERROR(errp, "could not create CM channel"); 959 return -EINVAL; 960 } 961 962 /* create CM id */ 963 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 964 if (ret) { 965 ERROR(errp, "could not create channel id"); 966 goto err_resolve_create_id; 967 } 968 969 snprintf(port_str, 16, "%d", rdma->port); 970 port_str[15] = '\0'; 971 972 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 973 if (ret < 0) { 974 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 975 goto err_resolve_get_addr; 976 } 977 978 for (e = res; e != NULL; e = e->ai_next) { 979 inet_ntop(e->ai_family, 980 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 981 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 982 983 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 984 RDMA_RESOLVE_TIMEOUT_MS); 985 if (!ret) { 986 if (e->ai_family == AF_INET6) { 987 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp); 988 if (ret) { 989 continue; 990 } 991 } 992 goto route; 993 } 994 } 995 996 rdma_freeaddrinfo(res); 997 ERROR(errp, "could not resolve address %s", rdma->host); 998 goto err_resolve_get_addr; 999 1000 route: 1001 rdma_freeaddrinfo(res); 1002 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 1003 1004 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1005 if (ret) { 1006 ERROR(errp, "could not perform event_addr_resolved"); 1007 goto err_resolve_get_addr; 1008 } 1009 1010 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 1011 ERROR(errp, "result not equal to event_addr_resolved %s", 1012 rdma_event_str(cm_event->event)); 1013 error_report("rdma_resolve_addr"); 1014 rdma_ack_cm_event(cm_event); 1015 ret = -EINVAL; 1016 goto err_resolve_get_addr; 1017 } 1018 rdma_ack_cm_event(cm_event); 1019 1020 /* resolve route */ 1021 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 1022 if (ret) { 1023 ERROR(errp, "could not resolve rdma route"); 1024 goto err_resolve_get_addr; 1025 } 1026 1027 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1028 if (ret) { 1029 ERROR(errp, "could not perform event_route_resolved"); 1030 goto err_resolve_get_addr; 1031 } 1032 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 1033 ERROR(errp, "result not equal to event_route_resolved: %s", 1034 rdma_event_str(cm_event->event)); 1035 rdma_ack_cm_event(cm_event); 1036 ret = -EINVAL; 1037 goto err_resolve_get_addr; 1038 } 1039 rdma_ack_cm_event(cm_event); 1040 rdma->verbs = rdma->cm_id->verbs; 1041 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 1042 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 1043 return 0; 1044 1045 err_resolve_get_addr: 1046 rdma_destroy_id(rdma->cm_id); 1047 rdma->cm_id = NULL; 1048 err_resolve_create_id: 1049 rdma_destroy_event_channel(rdma->channel); 1050 rdma->channel = NULL; 1051 return ret; 1052 } 1053 1054 /* 1055 * Create protection domain and completion queues 1056 */ 1057 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma) 1058 { 1059 /* allocate pd */ 1060 rdma->pd = ibv_alloc_pd(rdma->verbs); 1061 if (!rdma->pd) { 1062 error_report("failed to allocate protection domain"); 1063 return -1; 1064 } 1065 1066 /* create receive completion channel */ 1067 rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs); 1068 if (!rdma->recv_comp_channel) { 1069 error_report("failed to allocate receive completion channel"); 1070 goto err_alloc_pd_cq; 1071 } 1072 1073 /* 1074 * Completion queue can be filled by read work requests. 1075 */ 1076 rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1077 NULL, rdma->recv_comp_channel, 0); 1078 if (!rdma->recv_cq) { 1079 error_report("failed to allocate receive completion queue"); 1080 goto err_alloc_pd_cq; 1081 } 1082 1083 /* create send completion channel */ 1084 rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs); 1085 if (!rdma->send_comp_channel) { 1086 error_report("failed to allocate send completion channel"); 1087 goto err_alloc_pd_cq; 1088 } 1089 1090 rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 1091 NULL, rdma->send_comp_channel, 0); 1092 if (!rdma->send_cq) { 1093 error_report("failed to allocate send completion queue"); 1094 goto err_alloc_pd_cq; 1095 } 1096 1097 return 0; 1098 1099 err_alloc_pd_cq: 1100 if (rdma->pd) { 1101 ibv_dealloc_pd(rdma->pd); 1102 } 1103 if (rdma->recv_comp_channel) { 1104 ibv_destroy_comp_channel(rdma->recv_comp_channel); 1105 } 1106 if (rdma->send_comp_channel) { 1107 ibv_destroy_comp_channel(rdma->send_comp_channel); 1108 } 1109 if (rdma->recv_cq) { 1110 ibv_destroy_cq(rdma->recv_cq); 1111 rdma->recv_cq = NULL; 1112 } 1113 rdma->pd = NULL; 1114 rdma->recv_comp_channel = NULL; 1115 rdma->send_comp_channel = NULL; 1116 return -1; 1117 1118 } 1119 1120 /* 1121 * Create queue pairs. 1122 */ 1123 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 1124 { 1125 struct ibv_qp_init_attr attr = { 0 }; 1126 int ret; 1127 1128 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 1129 attr.cap.max_recv_wr = 3; 1130 attr.cap.max_send_sge = 1; 1131 attr.cap.max_recv_sge = 1; 1132 attr.send_cq = rdma->send_cq; 1133 attr.recv_cq = rdma->recv_cq; 1134 attr.qp_type = IBV_QPT_RC; 1135 1136 ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr); 1137 if (ret) { 1138 return -1; 1139 } 1140 1141 rdma->qp = rdma->cm_id->qp; 1142 return 0; 1143 } 1144 1145 /* Check whether On-Demand Paging is supported by RDAM device */ 1146 static bool rdma_support_odp(struct ibv_context *dev) 1147 { 1148 struct ibv_device_attr_ex attr = {0}; 1149 int ret = ibv_query_device_ex(dev, NULL, &attr); 1150 if (ret) { 1151 return false; 1152 } 1153 1154 if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) { 1155 return true; 1156 } 1157 1158 return false; 1159 } 1160 1161 /* 1162 * ibv_advise_mr to avoid RNR NAK error as far as possible. 1163 * The responder mr registering with ODP will sent RNR NAK back to 1164 * the requester in the face of the page fault. 1165 */ 1166 static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr, 1167 uint32_t len, uint32_t lkey, 1168 const char *name, bool wr) 1169 { 1170 #ifdef HAVE_IBV_ADVISE_MR 1171 int ret; 1172 int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE : 1173 IBV_ADVISE_MR_ADVICE_PREFETCH; 1174 struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len}; 1175 1176 ret = ibv_advise_mr(pd, advice, 1177 IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1); 1178 /* ignore the error */ 1179 if (ret) { 1180 trace_qemu_rdma_advise_mr(name, len, addr, strerror(errno)); 1181 } else { 1182 trace_qemu_rdma_advise_mr(name, len, addr, "successed"); 1183 } 1184 #endif 1185 } 1186 1187 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma) 1188 { 1189 int i; 1190 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1191 1192 for (i = 0; i < local->nb_blocks; i++) { 1193 int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE; 1194 1195 local->block[i].mr = 1196 ibv_reg_mr(rdma->pd, 1197 local->block[i].local_host_addr, 1198 local->block[i].length, access 1199 ); 1200 1201 if (!local->block[i].mr && 1202 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) { 1203 access |= IBV_ACCESS_ON_DEMAND; 1204 /* register ODP mr */ 1205 local->block[i].mr = 1206 ibv_reg_mr(rdma->pd, 1207 local->block[i].local_host_addr, 1208 local->block[i].length, access); 1209 trace_qemu_rdma_register_odp_mr(local->block[i].block_name); 1210 1211 if (local->block[i].mr) { 1212 qemu_rdma_advise_prefetch_mr(rdma->pd, 1213 (uintptr_t)local->block[i].local_host_addr, 1214 local->block[i].length, 1215 local->block[i].mr->lkey, 1216 local->block[i].block_name, 1217 true); 1218 } 1219 } 1220 1221 if (!local->block[i].mr) { 1222 perror("Failed to register local dest ram block!"); 1223 break; 1224 } 1225 rdma->total_registrations++; 1226 } 1227 1228 if (i >= local->nb_blocks) { 1229 return 0; 1230 } 1231 1232 for (i--; i >= 0; i--) { 1233 ibv_dereg_mr(local->block[i].mr); 1234 local->block[i].mr = NULL; 1235 rdma->total_registrations--; 1236 } 1237 1238 return -1; 1239 1240 } 1241 1242 /* 1243 * Find the ram block that corresponds to the page requested to be 1244 * transmitted by QEMU. 1245 * 1246 * Once the block is found, also identify which 'chunk' within that 1247 * block that the page belongs to. 1248 * 1249 * This search cannot fail or the migration will fail. 1250 */ 1251 static int qemu_rdma_search_ram_block(RDMAContext *rdma, 1252 uintptr_t block_offset, 1253 uint64_t offset, 1254 uint64_t length, 1255 uint64_t *block_index, 1256 uint64_t *chunk_index) 1257 { 1258 uint64_t current_addr = block_offset + offset; 1259 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1260 (void *) block_offset); 1261 assert(block); 1262 assert(current_addr >= block->offset); 1263 assert((current_addr + length) <= (block->offset + block->length)); 1264 1265 *block_index = block->index; 1266 *chunk_index = ram_chunk_index(block->local_host_addr, 1267 block->local_host_addr + (current_addr - block->offset)); 1268 1269 return 0; 1270 } 1271 1272 /* 1273 * Register a chunk with IB. If the chunk was already registered 1274 * previously, then skip. 1275 * 1276 * Also return the keys associated with the registration needed 1277 * to perform the actual RDMA operation. 1278 */ 1279 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1280 RDMALocalBlock *block, uintptr_t host_addr, 1281 uint32_t *lkey, uint32_t *rkey, int chunk, 1282 uint8_t *chunk_start, uint8_t *chunk_end) 1283 { 1284 if (block->mr) { 1285 if (lkey) { 1286 *lkey = block->mr->lkey; 1287 } 1288 if (rkey) { 1289 *rkey = block->mr->rkey; 1290 } 1291 return 0; 1292 } 1293 1294 /* allocate memory to store chunk MRs */ 1295 if (!block->pmr) { 1296 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks); 1297 } 1298 1299 /* 1300 * If 'rkey', then we're the destination, so grant access to the source. 1301 * 1302 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1303 */ 1304 if (!block->pmr[chunk]) { 1305 uint64_t len = chunk_end - chunk_start; 1306 int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE : 1307 0; 1308 1309 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1310 1311 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access); 1312 if (!block->pmr[chunk] && 1313 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) { 1314 access |= IBV_ACCESS_ON_DEMAND; 1315 /* register ODP mr */ 1316 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access); 1317 trace_qemu_rdma_register_odp_mr(block->block_name); 1318 1319 if (block->pmr[chunk]) { 1320 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start, 1321 len, block->pmr[chunk]->lkey, 1322 block->block_name, rkey); 1323 1324 } 1325 } 1326 } 1327 if (!block->pmr[chunk]) { 1328 perror("Failed to register chunk!"); 1329 fprintf(stderr, "Chunk details: block: %d chunk index %d" 1330 " start %" PRIuPTR " end %" PRIuPTR 1331 " host %" PRIuPTR 1332 " local %" PRIuPTR " registrations: %d\n", 1333 block->index, chunk, (uintptr_t)chunk_start, 1334 (uintptr_t)chunk_end, host_addr, 1335 (uintptr_t)block->local_host_addr, 1336 rdma->total_registrations); 1337 return -1; 1338 } 1339 rdma->total_registrations++; 1340 1341 if (lkey) { 1342 *lkey = block->pmr[chunk]->lkey; 1343 } 1344 if (rkey) { 1345 *rkey = block->pmr[chunk]->rkey; 1346 } 1347 return 0; 1348 } 1349 1350 /* 1351 * Register (at connection time) the memory used for control 1352 * channel messages. 1353 */ 1354 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1355 { 1356 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1357 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1358 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1359 if (rdma->wr_data[idx].control_mr) { 1360 rdma->total_registrations++; 1361 return 0; 1362 } 1363 error_report("qemu_rdma_reg_control failed"); 1364 return -1; 1365 } 1366 1367 const char *print_wrid(int wrid) 1368 { 1369 if (wrid >= RDMA_WRID_RECV_CONTROL) { 1370 return wrid_desc[RDMA_WRID_RECV_CONTROL]; 1371 } 1372 return wrid_desc[wrid]; 1373 } 1374 1375 /* 1376 * Perform a non-optimized memory unregistration after every transfer 1377 * for demonstration purposes, only if pin-all is not requested. 1378 * 1379 * Potential optimizations: 1380 * 1. Start a new thread to run this function continuously 1381 - for bit clearing 1382 - and for receipt of unregister messages 1383 * 2. Use an LRU. 1384 * 3. Use workload hints. 1385 */ 1386 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1387 { 1388 while (rdma->unregistrations[rdma->unregister_current]) { 1389 int ret; 1390 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1391 uint64_t chunk = 1392 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1393 uint64_t index = 1394 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1395 RDMALocalBlock *block = 1396 &(rdma->local_ram_blocks.block[index]); 1397 RDMARegister reg = { .current_index = index }; 1398 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1399 }; 1400 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1401 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1402 .repeat = 1, 1403 }; 1404 1405 trace_qemu_rdma_unregister_waiting_proc(chunk, 1406 rdma->unregister_current); 1407 1408 rdma->unregistrations[rdma->unregister_current] = 0; 1409 rdma->unregister_current++; 1410 1411 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1412 rdma->unregister_current = 0; 1413 } 1414 1415 1416 /* 1417 * Unregistration is speculative (because migration is single-threaded 1418 * and we cannot break the protocol's inifinband message ordering). 1419 * Thus, if the memory is currently being used for transmission, 1420 * then abort the attempt to unregister and try again 1421 * later the next time a completion is received for this memory. 1422 */ 1423 clear_bit(chunk, block->unregister_bitmap); 1424 1425 if (test_bit(chunk, block->transit_bitmap)) { 1426 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1427 continue; 1428 } 1429 1430 trace_qemu_rdma_unregister_waiting_send(chunk); 1431 1432 ret = ibv_dereg_mr(block->pmr[chunk]); 1433 block->pmr[chunk] = NULL; 1434 block->remote_keys[chunk] = 0; 1435 1436 if (ret != 0) { 1437 perror("unregistration chunk failed"); 1438 return -ret; 1439 } 1440 rdma->total_registrations--; 1441 1442 reg.key.chunk = chunk; 1443 register_to_network(rdma, ®); 1444 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1445 &resp, NULL, NULL); 1446 if (ret < 0) { 1447 return ret; 1448 } 1449 1450 trace_qemu_rdma_unregister_waiting_complete(chunk); 1451 } 1452 1453 return 0; 1454 } 1455 1456 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1457 uint64_t chunk) 1458 { 1459 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1460 1461 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1462 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1463 1464 return result; 1465 } 1466 1467 /* 1468 * Consult the connection manager to see a work request 1469 * (of any kind) has completed. 1470 * Return the work request ID that completed. 1471 */ 1472 static uint64_t qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq, 1473 uint64_t *wr_id_out, uint32_t *byte_len) 1474 { 1475 int ret; 1476 struct ibv_wc wc; 1477 uint64_t wr_id; 1478 1479 ret = ibv_poll_cq(cq, 1, &wc); 1480 1481 if (!ret) { 1482 *wr_id_out = RDMA_WRID_NONE; 1483 return 0; 1484 } 1485 1486 if (ret < 0) { 1487 error_report("ibv_poll_cq return %d", ret); 1488 return ret; 1489 } 1490 1491 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1492 1493 if (wc.status != IBV_WC_SUCCESS) { 1494 fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n", 1495 wc.status, ibv_wc_status_str(wc.status)); 1496 fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]); 1497 1498 return -1; 1499 } 1500 1501 if (rdma->control_ready_expected && 1502 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1503 trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL], 1504 wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent); 1505 rdma->control_ready_expected = 0; 1506 } 1507 1508 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1509 uint64_t chunk = 1510 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1511 uint64_t index = 1512 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1513 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1514 1515 trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent, 1516 index, chunk, block->local_host_addr, 1517 (void *)(uintptr_t)block->remote_host_addr); 1518 1519 clear_bit(chunk, block->transit_bitmap); 1520 1521 if (rdma->nb_sent > 0) { 1522 rdma->nb_sent--; 1523 } 1524 } else { 1525 trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent); 1526 } 1527 1528 *wr_id_out = wc.wr_id; 1529 if (byte_len) { 1530 *byte_len = wc.byte_len; 1531 } 1532 1533 return 0; 1534 } 1535 1536 /* Wait for activity on the completion channel. 1537 * Returns 0 on success, none-0 on error. 1538 */ 1539 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma, 1540 struct ibv_comp_channel *comp_channel) 1541 { 1542 struct rdma_cm_event *cm_event; 1543 int ret = -1; 1544 1545 /* 1546 * Coroutine doesn't start until migration_fd_process_incoming() 1547 * so don't yield unless we know we're running inside of a coroutine. 1548 */ 1549 if (rdma->migration_started_on_destination && 1550 migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) { 1551 yield_until_fd_readable(comp_channel->fd); 1552 } else { 1553 /* This is the source side, we're in a separate thread 1554 * or destination prior to migration_fd_process_incoming() 1555 * after postcopy, the destination also in a separate thread. 1556 * we can't yield; so we have to poll the fd. 1557 * But we need to be able to handle 'cancel' or an error 1558 * without hanging forever. 1559 */ 1560 while (!rdma->error_state && !rdma->received_error) { 1561 GPollFD pfds[2]; 1562 pfds[0].fd = comp_channel->fd; 1563 pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1564 pfds[0].revents = 0; 1565 1566 pfds[1].fd = rdma->channel->fd; 1567 pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1568 pfds[1].revents = 0; 1569 1570 /* 0.1s timeout, should be fine for a 'cancel' */ 1571 switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) { 1572 case 2: 1573 case 1: /* fd active */ 1574 if (pfds[0].revents) { 1575 return 0; 1576 } 1577 1578 if (pfds[1].revents) { 1579 ret = rdma_get_cm_event(rdma->channel, &cm_event); 1580 if (ret) { 1581 error_report("failed to get cm event while wait " 1582 "completion channel"); 1583 return -EPIPE; 1584 } 1585 1586 error_report("receive cm event while wait comp channel," 1587 "cm event is %d", cm_event->event); 1588 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 1589 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 1590 rdma_ack_cm_event(cm_event); 1591 return -EPIPE; 1592 } 1593 rdma_ack_cm_event(cm_event); 1594 } 1595 break; 1596 1597 case 0: /* Timeout, go around again */ 1598 break; 1599 1600 default: /* Error of some type - 1601 * I don't trust errno from qemu_poll_ns 1602 */ 1603 error_report("%s: poll failed", __func__); 1604 return -EPIPE; 1605 } 1606 1607 if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) { 1608 /* Bail out and let the cancellation happen */ 1609 return -EPIPE; 1610 } 1611 } 1612 } 1613 1614 if (rdma->received_error) { 1615 return -EPIPE; 1616 } 1617 return rdma->error_state; 1618 } 1619 1620 static struct ibv_comp_channel *to_channel(RDMAContext *rdma, int wrid) 1621 { 1622 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel : 1623 rdma->recv_comp_channel; 1624 } 1625 1626 static struct ibv_cq *to_cq(RDMAContext *rdma, int wrid) 1627 { 1628 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq; 1629 } 1630 1631 /* 1632 * Block until the next work request has completed. 1633 * 1634 * First poll to see if a work request has already completed, 1635 * otherwise block. 1636 * 1637 * If we encounter completed work requests for IDs other than 1638 * the one we're interested in, then that's generally an error. 1639 * 1640 * The only exception is actual RDMA Write completions. These 1641 * completions only need to be recorded, but do not actually 1642 * need further processing. 1643 */ 1644 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested, 1645 uint32_t *byte_len) 1646 { 1647 int num_cq_events = 0, ret = 0; 1648 struct ibv_cq *cq; 1649 void *cq_ctx; 1650 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1651 struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested); 1652 struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested); 1653 1654 if (ibv_req_notify_cq(poll_cq, 0)) { 1655 return -1; 1656 } 1657 /* poll cq first */ 1658 while (wr_id != wrid_requested) { 1659 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len); 1660 if (ret < 0) { 1661 return ret; 1662 } 1663 1664 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1665 1666 if (wr_id == RDMA_WRID_NONE) { 1667 break; 1668 } 1669 if (wr_id != wrid_requested) { 1670 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1671 wrid_requested, print_wrid(wr_id), wr_id); 1672 } 1673 } 1674 1675 if (wr_id == wrid_requested) { 1676 return 0; 1677 } 1678 1679 while (1) { 1680 ret = qemu_rdma_wait_comp_channel(rdma, ch); 1681 if (ret) { 1682 goto err_block_for_wrid; 1683 } 1684 1685 ret = ibv_get_cq_event(ch, &cq, &cq_ctx); 1686 if (ret) { 1687 perror("ibv_get_cq_event"); 1688 goto err_block_for_wrid; 1689 } 1690 1691 num_cq_events++; 1692 1693 ret = -ibv_req_notify_cq(cq, 0); 1694 if (ret) { 1695 goto err_block_for_wrid; 1696 } 1697 1698 while (wr_id != wrid_requested) { 1699 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len); 1700 if (ret < 0) { 1701 goto err_block_for_wrid; 1702 } 1703 1704 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1705 1706 if (wr_id == RDMA_WRID_NONE) { 1707 break; 1708 } 1709 if (wr_id != wrid_requested) { 1710 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested), 1711 wrid_requested, print_wrid(wr_id), wr_id); 1712 } 1713 } 1714 1715 if (wr_id == wrid_requested) { 1716 goto success_block_for_wrid; 1717 } 1718 } 1719 1720 success_block_for_wrid: 1721 if (num_cq_events) { 1722 ibv_ack_cq_events(cq, num_cq_events); 1723 } 1724 return 0; 1725 1726 err_block_for_wrid: 1727 if (num_cq_events) { 1728 ibv_ack_cq_events(cq, num_cq_events); 1729 } 1730 1731 rdma->error_state = ret; 1732 return ret; 1733 } 1734 1735 /* 1736 * Post a SEND message work request for the control channel 1737 * containing some data and block until the post completes. 1738 */ 1739 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1740 RDMAControlHeader *head) 1741 { 1742 int ret = 0; 1743 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1744 struct ibv_send_wr *bad_wr; 1745 struct ibv_sge sge = { 1746 .addr = (uintptr_t)(wr->control), 1747 .length = head->len + sizeof(RDMAControlHeader), 1748 .lkey = wr->control_mr->lkey, 1749 }; 1750 struct ibv_send_wr send_wr = { 1751 .wr_id = RDMA_WRID_SEND_CONTROL, 1752 .opcode = IBV_WR_SEND, 1753 .send_flags = IBV_SEND_SIGNALED, 1754 .sg_list = &sge, 1755 .num_sge = 1, 1756 }; 1757 1758 trace_qemu_rdma_post_send_control(control_desc(head->type)); 1759 1760 /* 1761 * We don't actually need to do a memcpy() in here if we used 1762 * the "sge" properly, but since we're only sending control messages 1763 * (not RAM in a performance-critical path), then its OK for now. 1764 * 1765 * The copy makes the RDMAControlHeader simpler to manipulate 1766 * for the time being. 1767 */ 1768 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1769 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1770 control_to_network((void *) wr->control); 1771 1772 if (buf) { 1773 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1774 } 1775 1776 1777 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1778 1779 if (ret > 0) { 1780 error_report("Failed to use post IB SEND for control"); 1781 return -ret; 1782 } 1783 1784 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1785 if (ret < 0) { 1786 error_report("rdma migration: send polling control error"); 1787 } 1788 1789 return ret; 1790 } 1791 1792 /* 1793 * Post a RECV work request in anticipation of some future receipt 1794 * of data on the control channel. 1795 */ 1796 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx) 1797 { 1798 struct ibv_recv_wr *bad_wr; 1799 struct ibv_sge sge = { 1800 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1801 .length = RDMA_CONTROL_MAX_BUFFER, 1802 .lkey = rdma->wr_data[idx].control_mr->lkey, 1803 }; 1804 1805 struct ibv_recv_wr recv_wr = { 1806 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1807 .sg_list = &sge, 1808 .num_sge = 1, 1809 }; 1810 1811 1812 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1813 return -1; 1814 } 1815 1816 return 0; 1817 } 1818 1819 /* 1820 * Block and wait for a RECV control channel message to arrive. 1821 */ 1822 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1823 RDMAControlHeader *head, int expecting, int idx) 1824 { 1825 uint32_t byte_len; 1826 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1827 &byte_len); 1828 1829 if (ret < 0) { 1830 error_report("rdma migration: recv polling control error!"); 1831 return ret; 1832 } 1833 1834 network_to_control((void *) rdma->wr_data[idx].control); 1835 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1836 1837 trace_qemu_rdma_exchange_get_response_start(control_desc(expecting)); 1838 1839 if (expecting == RDMA_CONTROL_NONE) { 1840 trace_qemu_rdma_exchange_get_response_none(control_desc(head->type), 1841 head->type); 1842 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1843 error_report("Was expecting a %s (%d) control message" 1844 ", but got: %s (%d), length: %d", 1845 control_desc(expecting), expecting, 1846 control_desc(head->type), head->type, head->len); 1847 if (head->type == RDMA_CONTROL_ERROR) { 1848 rdma->received_error = true; 1849 } 1850 return -EIO; 1851 } 1852 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1853 error_report("too long length: %d", head->len); 1854 return -EINVAL; 1855 } 1856 if (sizeof(*head) + head->len != byte_len) { 1857 error_report("Malformed length: %d byte_len %d", head->len, byte_len); 1858 return -EINVAL; 1859 } 1860 1861 return 0; 1862 } 1863 1864 /* 1865 * When a RECV work request has completed, the work request's 1866 * buffer is pointed at the header. 1867 * 1868 * This will advance the pointer to the data portion 1869 * of the control message of the work request's buffer that 1870 * was populated after the work request finished. 1871 */ 1872 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1873 RDMAControlHeader *head) 1874 { 1875 rdma->wr_data[idx].control_len = head->len; 1876 rdma->wr_data[idx].control_curr = 1877 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1878 } 1879 1880 /* 1881 * This is an 'atomic' high-level operation to deliver a single, unified 1882 * control-channel message. 1883 * 1884 * Additionally, if the user is expecting some kind of reply to this message, 1885 * they can request a 'resp' response message be filled in by posting an 1886 * additional work request on behalf of the user and waiting for an additional 1887 * completion. 1888 * 1889 * The extra (optional) response is used during registration to us from having 1890 * to perform an *additional* exchange of message just to provide a response by 1891 * instead piggy-backing on the acknowledgement. 1892 */ 1893 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1894 uint8_t *data, RDMAControlHeader *resp, 1895 int *resp_idx, 1896 int (*callback)(RDMAContext *rdma)) 1897 { 1898 int ret = 0; 1899 1900 /* 1901 * Wait until the dest is ready before attempting to deliver the message 1902 * by waiting for a READY message. 1903 */ 1904 if (rdma->control_ready_expected) { 1905 RDMAControlHeader resp_ignored; 1906 1907 ret = qemu_rdma_exchange_get_response(rdma, &resp_ignored, 1908 RDMA_CONTROL_READY, 1909 RDMA_WRID_READY); 1910 if (ret < 0) { 1911 return ret; 1912 } 1913 } 1914 1915 /* 1916 * If the user is expecting a response, post a WR in anticipation of it. 1917 */ 1918 if (resp) { 1919 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA); 1920 if (ret) { 1921 error_report("rdma migration: error posting" 1922 " extra control recv for anticipated result!"); 1923 return ret; 1924 } 1925 } 1926 1927 /* 1928 * Post a WR to replace the one we just consumed for the READY message. 1929 */ 1930 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 1931 if (ret) { 1932 error_report("rdma migration: error posting first control recv!"); 1933 return ret; 1934 } 1935 1936 /* 1937 * Deliver the control message that was requested. 1938 */ 1939 ret = qemu_rdma_post_send_control(rdma, data, head); 1940 1941 if (ret < 0) { 1942 error_report("Failed to send control buffer!"); 1943 return ret; 1944 } 1945 1946 /* 1947 * If we're expecting a response, block and wait for it. 1948 */ 1949 if (resp) { 1950 if (callback) { 1951 trace_qemu_rdma_exchange_send_issue_callback(); 1952 ret = callback(rdma); 1953 if (ret < 0) { 1954 return ret; 1955 } 1956 } 1957 1958 trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type)); 1959 ret = qemu_rdma_exchange_get_response(rdma, resp, 1960 resp->type, RDMA_WRID_DATA); 1961 1962 if (ret < 0) { 1963 return ret; 1964 } 1965 1966 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1967 if (resp_idx) { 1968 *resp_idx = RDMA_WRID_DATA; 1969 } 1970 trace_qemu_rdma_exchange_send_received(control_desc(resp->type)); 1971 } 1972 1973 rdma->control_ready_expected = 1; 1974 1975 return 0; 1976 } 1977 1978 /* 1979 * This is an 'atomic' high-level operation to receive a single, unified 1980 * control-channel message. 1981 */ 1982 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 1983 int expecting) 1984 { 1985 RDMAControlHeader ready = { 1986 .len = 0, 1987 .type = RDMA_CONTROL_READY, 1988 .repeat = 1, 1989 }; 1990 int ret; 1991 1992 /* 1993 * Inform the source that we're ready to receive a message. 1994 */ 1995 ret = qemu_rdma_post_send_control(rdma, NULL, &ready); 1996 1997 if (ret < 0) { 1998 error_report("Failed to send control buffer!"); 1999 return ret; 2000 } 2001 2002 /* 2003 * Block and wait for the message. 2004 */ 2005 ret = qemu_rdma_exchange_get_response(rdma, head, 2006 expecting, RDMA_WRID_READY); 2007 2008 if (ret < 0) { 2009 return ret; 2010 } 2011 2012 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 2013 2014 /* 2015 * Post a new RECV work request to replace the one we just consumed. 2016 */ 2017 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2018 if (ret) { 2019 error_report("rdma migration: error posting second control recv!"); 2020 return ret; 2021 } 2022 2023 return 0; 2024 } 2025 2026 /* 2027 * Write an actual chunk of memory using RDMA. 2028 * 2029 * If we're using dynamic registration on the dest-side, we have to 2030 * send a registration command first. 2031 */ 2032 static int qemu_rdma_write_one(RDMAContext *rdma, 2033 int current_index, uint64_t current_addr, 2034 uint64_t length) 2035 { 2036 struct ibv_sge sge; 2037 struct ibv_send_wr send_wr = { 0 }; 2038 struct ibv_send_wr *bad_wr; 2039 int reg_result_idx, ret, count = 0; 2040 uint64_t chunk, chunks; 2041 uint8_t *chunk_start, *chunk_end; 2042 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 2043 RDMARegister reg; 2044 RDMARegisterResult *reg_result; 2045 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 2046 RDMAControlHeader head = { .len = sizeof(RDMARegister), 2047 .type = RDMA_CONTROL_REGISTER_REQUEST, 2048 .repeat = 1, 2049 }; 2050 2051 retry: 2052 sge.addr = (uintptr_t)(block->local_host_addr + 2053 (current_addr - block->offset)); 2054 sge.length = length; 2055 2056 chunk = ram_chunk_index(block->local_host_addr, 2057 (uint8_t *)(uintptr_t)sge.addr); 2058 chunk_start = ram_chunk_start(block, chunk); 2059 2060 if (block->is_ram_block) { 2061 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 2062 2063 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 2064 chunks--; 2065 } 2066 } else { 2067 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 2068 2069 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 2070 chunks--; 2071 } 2072 } 2073 2074 trace_qemu_rdma_write_one_top(chunks + 1, 2075 (chunks + 1) * 2076 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 2077 2078 chunk_end = ram_chunk_end(block, chunk + chunks); 2079 2080 2081 while (test_bit(chunk, block->transit_bitmap)) { 2082 (void)count; 2083 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 2084 sge.addr, length, rdma->nb_sent, block->nb_chunks); 2085 2086 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2087 2088 if (ret < 0) { 2089 error_report("Failed to Wait for previous write to complete " 2090 "block %d chunk %" PRIu64 2091 " current %" PRIu64 " len %" PRIu64 " %d", 2092 current_index, chunk, sge.addr, length, rdma->nb_sent); 2093 return ret; 2094 } 2095 } 2096 2097 if (!rdma->pin_all || !block->is_ram_block) { 2098 if (!block->remote_keys[chunk]) { 2099 /* 2100 * This chunk has not yet been registered, so first check to see 2101 * if the entire chunk is zero. If so, tell the other size to 2102 * memset() + madvise() the entire chunk without RDMA. 2103 */ 2104 2105 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) { 2106 RDMACompress comp = { 2107 .offset = current_addr, 2108 .value = 0, 2109 .block_idx = current_index, 2110 .length = length, 2111 }; 2112 2113 head.len = sizeof(comp); 2114 head.type = RDMA_CONTROL_COMPRESS; 2115 2116 trace_qemu_rdma_write_one_zero(chunk, sge.length, 2117 current_index, current_addr); 2118 2119 compress_to_network(rdma, &comp); 2120 ret = qemu_rdma_exchange_send(rdma, &head, 2121 (uint8_t *) &comp, NULL, NULL, NULL); 2122 2123 if (ret < 0) { 2124 return -EIO; 2125 } 2126 2127 /* 2128 * TODO: Here we are sending something, but we are not 2129 * accounting for anything transferred. The following is wrong: 2130 * 2131 * stat64_add(&mig_stats.rdma_bytes, sge.length); 2132 * 2133 * because we are using some kind of compression. I 2134 * would think that head.len would be the more similar 2135 * thing to a correct value. 2136 */ 2137 stat64_add(&mig_stats.zero_pages, 2138 sge.length / qemu_target_page_size()); 2139 return 1; 2140 } 2141 2142 /* 2143 * Otherwise, tell other side to register. 2144 */ 2145 reg.current_index = current_index; 2146 if (block->is_ram_block) { 2147 reg.key.current_addr = current_addr; 2148 } else { 2149 reg.key.chunk = chunk; 2150 } 2151 reg.chunks = chunks; 2152 2153 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 2154 current_addr); 2155 2156 register_to_network(rdma, ®); 2157 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 2158 &resp, ®_result_idx, NULL); 2159 if (ret < 0) { 2160 return ret; 2161 } 2162 2163 /* try to overlap this single registration with the one we sent. */ 2164 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2165 &sge.lkey, NULL, chunk, 2166 chunk_start, chunk_end)) { 2167 error_report("cannot get lkey"); 2168 return -EINVAL; 2169 } 2170 2171 reg_result = (RDMARegisterResult *) 2172 rdma->wr_data[reg_result_idx].control_curr; 2173 2174 network_to_result(reg_result); 2175 2176 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 2177 reg_result->rkey, chunk); 2178 2179 block->remote_keys[chunk] = reg_result->rkey; 2180 block->remote_host_addr = reg_result->host_addr; 2181 } else { 2182 /* already registered before */ 2183 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2184 &sge.lkey, NULL, chunk, 2185 chunk_start, chunk_end)) { 2186 error_report("cannot get lkey!"); 2187 return -EINVAL; 2188 } 2189 } 2190 2191 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 2192 } else { 2193 send_wr.wr.rdma.rkey = block->remote_rkey; 2194 2195 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2196 &sge.lkey, NULL, chunk, 2197 chunk_start, chunk_end)) { 2198 error_report("cannot get lkey!"); 2199 return -EINVAL; 2200 } 2201 } 2202 2203 /* 2204 * Encode the ram block index and chunk within this wrid. 2205 * We will use this information at the time of completion 2206 * to figure out which bitmap to check against and then which 2207 * chunk in the bitmap to look for. 2208 */ 2209 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 2210 current_index, chunk); 2211 2212 send_wr.opcode = IBV_WR_RDMA_WRITE; 2213 send_wr.send_flags = IBV_SEND_SIGNALED; 2214 send_wr.sg_list = &sge; 2215 send_wr.num_sge = 1; 2216 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2217 (current_addr - block->offset); 2218 2219 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2220 sge.length); 2221 2222 /* 2223 * ibv_post_send() does not return negative error numbers, 2224 * per the specification they are positive - no idea why. 2225 */ 2226 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2227 2228 if (ret == ENOMEM) { 2229 trace_qemu_rdma_write_one_queue_full(); 2230 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2231 if (ret < 0) { 2232 error_report("rdma migration: failed to make " 2233 "room in full send queue! %d", ret); 2234 return ret; 2235 } 2236 2237 goto retry; 2238 2239 } else if (ret > 0) { 2240 perror("rdma migration: post rdma write failed"); 2241 return -ret; 2242 } 2243 2244 set_bit(chunk, block->transit_bitmap); 2245 stat64_add(&mig_stats.normal_pages, sge.length / qemu_target_page_size()); 2246 /* 2247 * We are adding to transferred the amount of data written, but no 2248 * overhead at all. I will asume that RDMA is magicaly and don't 2249 * need to transfer (at least) the addresses where it wants to 2250 * write the pages. Here it looks like it should be something 2251 * like: 2252 * sizeof(send_wr) + sge.length 2253 * but this being RDMA, who knows. 2254 */ 2255 stat64_add(&mig_stats.rdma_bytes, sge.length); 2256 ram_transferred_add(sge.length); 2257 rdma->total_writes++; 2258 2259 return 0; 2260 } 2261 2262 /* 2263 * Push out any unwritten RDMA operations. 2264 * 2265 * We support sending out multiple chunks at the same time. 2266 * Not all of them need to get signaled in the completion queue. 2267 */ 2268 static int qemu_rdma_write_flush(RDMAContext *rdma) 2269 { 2270 int ret; 2271 2272 if (!rdma->current_length) { 2273 return 0; 2274 } 2275 2276 ret = qemu_rdma_write_one(rdma, 2277 rdma->current_index, rdma->current_addr, rdma->current_length); 2278 2279 if (ret < 0) { 2280 return ret; 2281 } 2282 2283 if (ret == 0) { 2284 rdma->nb_sent++; 2285 trace_qemu_rdma_write_flush(rdma->nb_sent); 2286 } 2287 2288 rdma->current_length = 0; 2289 rdma->current_addr = 0; 2290 2291 return 0; 2292 } 2293 2294 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma, 2295 uint64_t offset, uint64_t len) 2296 { 2297 RDMALocalBlock *block; 2298 uint8_t *host_addr; 2299 uint8_t *chunk_end; 2300 2301 if (rdma->current_index < 0) { 2302 return 0; 2303 } 2304 2305 if (rdma->current_chunk < 0) { 2306 return 0; 2307 } 2308 2309 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2310 host_addr = block->local_host_addr + (offset - block->offset); 2311 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2312 2313 if (rdma->current_length == 0) { 2314 return 0; 2315 } 2316 2317 /* 2318 * Only merge into chunk sequentially. 2319 */ 2320 if (offset != (rdma->current_addr + rdma->current_length)) { 2321 return 0; 2322 } 2323 2324 if (offset < block->offset) { 2325 return 0; 2326 } 2327 2328 if ((offset + len) > (block->offset + block->length)) { 2329 return 0; 2330 } 2331 2332 if ((host_addr + len) > chunk_end) { 2333 return 0; 2334 } 2335 2336 return 1; 2337 } 2338 2339 /* 2340 * We're not actually writing here, but doing three things: 2341 * 2342 * 1. Identify the chunk the buffer belongs to. 2343 * 2. If the chunk is full or the buffer doesn't belong to the current 2344 * chunk, then start a new chunk and flush() the old chunk. 2345 * 3. To keep the hardware busy, we also group chunks into batches 2346 * and only require that a batch gets acknowledged in the completion 2347 * queue instead of each individual chunk. 2348 */ 2349 static int qemu_rdma_write(RDMAContext *rdma, 2350 uint64_t block_offset, uint64_t offset, 2351 uint64_t len) 2352 { 2353 uint64_t current_addr = block_offset + offset; 2354 uint64_t index = rdma->current_index; 2355 uint64_t chunk = rdma->current_chunk; 2356 int ret; 2357 2358 /* If we cannot merge it, we flush the current buffer first. */ 2359 if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) { 2360 ret = qemu_rdma_write_flush(rdma); 2361 if (ret) { 2362 return ret; 2363 } 2364 rdma->current_length = 0; 2365 rdma->current_addr = current_addr; 2366 2367 ret = qemu_rdma_search_ram_block(rdma, block_offset, 2368 offset, len, &index, &chunk); 2369 if (ret) { 2370 error_report("ram block search failed"); 2371 return ret; 2372 } 2373 rdma->current_index = index; 2374 rdma->current_chunk = chunk; 2375 } 2376 2377 /* merge it */ 2378 rdma->current_length += len; 2379 2380 /* flush it if buffer is too large */ 2381 if (rdma->current_length >= RDMA_MERGE_MAX) { 2382 return qemu_rdma_write_flush(rdma); 2383 } 2384 2385 return 0; 2386 } 2387 2388 static void qemu_rdma_cleanup(RDMAContext *rdma) 2389 { 2390 int idx; 2391 2392 if (rdma->cm_id && rdma->connected) { 2393 if ((rdma->error_state || 2394 migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) && 2395 !rdma->received_error) { 2396 RDMAControlHeader head = { .len = 0, 2397 .type = RDMA_CONTROL_ERROR, 2398 .repeat = 1, 2399 }; 2400 error_report("Early error. Sending error."); 2401 qemu_rdma_post_send_control(rdma, NULL, &head); 2402 } 2403 2404 rdma_disconnect(rdma->cm_id); 2405 trace_qemu_rdma_cleanup_disconnect(); 2406 rdma->connected = false; 2407 } 2408 2409 if (rdma->channel) { 2410 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 2411 } 2412 g_free(rdma->dest_blocks); 2413 rdma->dest_blocks = NULL; 2414 2415 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2416 if (rdma->wr_data[idx].control_mr) { 2417 rdma->total_registrations--; 2418 ibv_dereg_mr(rdma->wr_data[idx].control_mr); 2419 } 2420 rdma->wr_data[idx].control_mr = NULL; 2421 } 2422 2423 if (rdma->local_ram_blocks.block) { 2424 while (rdma->local_ram_blocks.nb_blocks) { 2425 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); 2426 } 2427 } 2428 2429 if (rdma->qp) { 2430 rdma_destroy_qp(rdma->cm_id); 2431 rdma->qp = NULL; 2432 } 2433 if (rdma->recv_cq) { 2434 ibv_destroy_cq(rdma->recv_cq); 2435 rdma->recv_cq = NULL; 2436 } 2437 if (rdma->send_cq) { 2438 ibv_destroy_cq(rdma->send_cq); 2439 rdma->send_cq = NULL; 2440 } 2441 if (rdma->recv_comp_channel) { 2442 ibv_destroy_comp_channel(rdma->recv_comp_channel); 2443 rdma->recv_comp_channel = NULL; 2444 } 2445 if (rdma->send_comp_channel) { 2446 ibv_destroy_comp_channel(rdma->send_comp_channel); 2447 rdma->send_comp_channel = NULL; 2448 } 2449 if (rdma->pd) { 2450 ibv_dealloc_pd(rdma->pd); 2451 rdma->pd = NULL; 2452 } 2453 if (rdma->cm_id) { 2454 rdma_destroy_id(rdma->cm_id); 2455 rdma->cm_id = NULL; 2456 } 2457 2458 /* the destination side, listen_id and channel is shared */ 2459 if (rdma->listen_id) { 2460 if (!rdma->is_return_path) { 2461 rdma_destroy_id(rdma->listen_id); 2462 } 2463 rdma->listen_id = NULL; 2464 2465 if (rdma->channel) { 2466 if (!rdma->is_return_path) { 2467 rdma_destroy_event_channel(rdma->channel); 2468 } 2469 rdma->channel = NULL; 2470 } 2471 } 2472 2473 if (rdma->channel) { 2474 rdma_destroy_event_channel(rdma->channel); 2475 rdma->channel = NULL; 2476 } 2477 g_free(rdma->host); 2478 g_free(rdma->host_port); 2479 rdma->host = NULL; 2480 rdma->host_port = NULL; 2481 } 2482 2483 2484 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp) 2485 { 2486 int ret, idx; 2487 Error *local_err = NULL, **temp = &local_err; 2488 2489 /* 2490 * Will be validated against destination's actual capabilities 2491 * after the connect() completes. 2492 */ 2493 rdma->pin_all = pin_all; 2494 2495 ret = qemu_rdma_resolve_host(rdma, temp); 2496 if (ret) { 2497 goto err_rdma_source_init; 2498 } 2499 2500 ret = qemu_rdma_alloc_pd_cq(rdma); 2501 if (ret) { 2502 ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()" 2503 " limits may be too low. Please check $ ulimit -a # and " 2504 "search for 'ulimit -l' in the output"); 2505 goto err_rdma_source_init; 2506 } 2507 2508 ret = qemu_rdma_alloc_qp(rdma); 2509 if (ret) { 2510 ERROR(temp, "rdma migration: error allocating qp!"); 2511 goto err_rdma_source_init; 2512 } 2513 2514 ret = qemu_rdma_init_ram_blocks(rdma); 2515 if (ret) { 2516 ERROR(temp, "rdma migration: error initializing ram blocks!"); 2517 goto err_rdma_source_init; 2518 } 2519 2520 /* Build the hash that maps from offset to RAMBlock */ 2521 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 2522 for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) { 2523 g_hash_table_insert(rdma->blockmap, 2524 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset, 2525 &rdma->local_ram_blocks.block[idx]); 2526 } 2527 2528 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2529 ret = qemu_rdma_reg_control(rdma, idx); 2530 if (ret) { 2531 ERROR(temp, "rdma migration: error registering %d control!", 2532 idx); 2533 goto err_rdma_source_init; 2534 } 2535 } 2536 2537 return 0; 2538 2539 err_rdma_source_init: 2540 error_propagate(errp, local_err); 2541 qemu_rdma_cleanup(rdma); 2542 return -1; 2543 } 2544 2545 static int qemu_get_cm_event_timeout(RDMAContext *rdma, 2546 struct rdma_cm_event **cm_event, 2547 long msec, Error **errp) 2548 { 2549 int ret; 2550 struct pollfd poll_fd = { 2551 .fd = rdma->channel->fd, 2552 .events = POLLIN, 2553 .revents = 0 2554 }; 2555 2556 do { 2557 ret = poll(&poll_fd, 1, msec); 2558 } while (ret < 0 && errno == EINTR); 2559 2560 if (ret == 0) { 2561 ERROR(errp, "poll cm event timeout"); 2562 return -1; 2563 } else if (ret < 0) { 2564 ERROR(errp, "failed to poll cm event, errno=%i", errno); 2565 return -1; 2566 } else if (poll_fd.revents & POLLIN) { 2567 return rdma_get_cm_event(rdma->channel, cm_event); 2568 } else { 2569 ERROR(errp, "no POLLIN event, revent=%x", poll_fd.revents); 2570 return -1; 2571 } 2572 } 2573 2574 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path) 2575 { 2576 RDMACapabilities cap = { 2577 .version = RDMA_CONTROL_VERSION_CURRENT, 2578 .flags = 0, 2579 }; 2580 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2581 .retry_count = 5, 2582 .private_data = &cap, 2583 .private_data_len = sizeof(cap), 2584 }; 2585 struct rdma_cm_event *cm_event; 2586 int ret; 2587 2588 /* 2589 * Only negotiate the capability with destination if the user 2590 * on the source first requested the capability. 2591 */ 2592 if (rdma->pin_all) { 2593 trace_qemu_rdma_connect_pin_all_requested(); 2594 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2595 } 2596 2597 caps_to_network(&cap); 2598 2599 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 2600 if (ret) { 2601 ERROR(errp, "posting second control recv"); 2602 goto err_rdma_source_connect; 2603 } 2604 2605 ret = rdma_connect(rdma->cm_id, &conn_param); 2606 if (ret) { 2607 perror("rdma_connect"); 2608 ERROR(errp, "connecting to destination!"); 2609 goto err_rdma_source_connect; 2610 } 2611 2612 if (return_path) { 2613 ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp); 2614 } else { 2615 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2616 } 2617 if (ret) { 2618 perror("rdma_get_cm_event after rdma_connect"); 2619 ERROR(errp, "connecting to destination!"); 2620 goto err_rdma_source_connect; 2621 } 2622 2623 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2624 error_report("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect"); 2625 ERROR(errp, "connecting to destination!"); 2626 rdma_ack_cm_event(cm_event); 2627 goto err_rdma_source_connect; 2628 } 2629 rdma->connected = true; 2630 2631 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2632 network_to_caps(&cap); 2633 2634 /* 2635 * Verify that the *requested* capabilities are supported by the destination 2636 * and disable them otherwise. 2637 */ 2638 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2639 ERROR(errp, "Server cannot support pinning all memory. " 2640 "Will register memory dynamically."); 2641 rdma->pin_all = false; 2642 } 2643 2644 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2645 2646 rdma_ack_cm_event(cm_event); 2647 2648 rdma->control_ready_expected = 1; 2649 rdma->nb_sent = 0; 2650 return 0; 2651 2652 err_rdma_source_connect: 2653 qemu_rdma_cleanup(rdma); 2654 return -1; 2655 } 2656 2657 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2658 { 2659 int ret, idx; 2660 struct rdma_cm_id *listen_id; 2661 char ip[40] = "unknown"; 2662 struct rdma_addrinfo *res, *e; 2663 char port_str[16]; 2664 int reuse = 1; 2665 2666 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2667 rdma->wr_data[idx].control_len = 0; 2668 rdma->wr_data[idx].control_curr = NULL; 2669 } 2670 2671 if (!rdma->host || !rdma->host[0]) { 2672 ERROR(errp, "RDMA host is not set!"); 2673 rdma->error_state = -EINVAL; 2674 return -1; 2675 } 2676 /* create CM channel */ 2677 rdma->channel = rdma_create_event_channel(); 2678 if (!rdma->channel) { 2679 ERROR(errp, "could not create rdma event channel"); 2680 rdma->error_state = -EINVAL; 2681 return -1; 2682 } 2683 2684 /* create CM id */ 2685 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2686 if (ret) { 2687 ERROR(errp, "could not create cm_id!"); 2688 goto err_dest_init_create_listen_id; 2689 } 2690 2691 snprintf(port_str, 16, "%d", rdma->port); 2692 port_str[15] = '\0'; 2693 2694 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2695 if (ret < 0) { 2696 ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host); 2697 goto err_dest_init_bind_addr; 2698 } 2699 2700 ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR, 2701 &reuse, sizeof reuse); 2702 if (ret) { 2703 ERROR(errp, "Error: could not set REUSEADDR option"); 2704 goto err_dest_init_bind_addr; 2705 } 2706 for (e = res; e != NULL; e = e->ai_next) { 2707 inet_ntop(e->ai_family, 2708 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2709 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2710 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2711 if (ret) { 2712 continue; 2713 } 2714 if (e->ai_family == AF_INET6) { 2715 ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp); 2716 if (ret) { 2717 continue; 2718 } 2719 } 2720 break; 2721 } 2722 2723 rdma_freeaddrinfo(res); 2724 if (!e) { 2725 ERROR(errp, "Error: could not rdma_bind_addr!"); 2726 goto err_dest_init_bind_addr; 2727 } 2728 2729 rdma->listen_id = listen_id; 2730 qemu_rdma_dump_gid("dest_init", listen_id); 2731 return 0; 2732 2733 err_dest_init_bind_addr: 2734 rdma_destroy_id(listen_id); 2735 err_dest_init_create_listen_id: 2736 rdma_destroy_event_channel(rdma->channel); 2737 rdma->channel = NULL; 2738 rdma->error_state = ret; 2739 return ret; 2740 2741 } 2742 2743 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path, 2744 RDMAContext *rdma) 2745 { 2746 int idx; 2747 2748 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 2749 rdma_return_path->wr_data[idx].control_len = 0; 2750 rdma_return_path->wr_data[idx].control_curr = NULL; 2751 } 2752 2753 /*the CM channel and CM id is shared*/ 2754 rdma_return_path->channel = rdma->channel; 2755 rdma_return_path->listen_id = rdma->listen_id; 2756 2757 rdma->return_path = rdma_return_path; 2758 rdma_return_path->return_path = rdma; 2759 rdma_return_path->is_return_path = true; 2760 } 2761 2762 static void *qemu_rdma_data_init(const char *host_port, Error **errp) 2763 { 2764 RDMAContext *rdma = NULL; 2765 InetSocketAddress *addr; 2766 2767 if (host_port) { 2768 rdma = g_new0(RDMAContext, 1); 2769 rdma->current_index = -1; 2770 rdma->current_chunk = -1; 2771 2772 addr = g_new(InetSocketAddress, 1); 2773 if (!inet_parse(addr, host_port, NULL)) { 2774 rdma->port = atoi(addr->port); 2775 rdma->host = g_strdup(addr->host); 2776 rdma->host_port = g_strdup(host_port); 2777 } else { 2778 ERROR(errp, "bad RDMA migration address '%s'", host_port); 2779 g_free(rdma); 2780 rdma = NULL; 2781 } 2782 2783 qapi_free_InetSocketAddress(addr); 2784 } 2785 2786 return rdma; 2787 } 2788 2789 /* 2790 * QEMUFile interface to the control channel. 2791 * SEND messages for control only. 2792 * VM's ram is handled with regular RDMA messages. 2793 */ 2794 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, 2795 const struct iovec *iov, 2796 size_t niov, 2797 int *fds, 2798 size_t nfds, 2799 int flags, 2800 Error **errp) 2801 { 2802 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2803 RDMAContext *rdma; 2804 int ret; 2805 ssize_t done = 0; 2806 size_t i; 2807 size_t len = 0; 2808 2809 RCU_READ_LOCK_GUARD(); 2810 rdma = qatomic_rcu_read(&rioc->rdmaout); 2811 2812 if (!rdma) { 2813 error_setg(errp, "RDMA control channel output is not set"); 2814 return -1; 2815 } 2816 2817 CHECK_ERROR_STATE(); 2818 2819 /* 2820 * Push out any writes that 2821 * we're queued up for VM's ram. 2822 */ 2823 ret = qemu_rdma_write_flush(rdma); 2824 if (ret < 0) { 2825 rdma->error_state = ret; 2826 error_setg(errp, "qemu_rdma_write_flush returned %d", ret); 2827 return -1; 2828 } 2829 2830 for (i = 0; i < niov; i++) { 2831 size_t remaining = iov[i].iov_len; 2832 uint8_t * data = (void *)iov[i].iov_base; 2833 while (remaining) { 2834 RDMAControlHeader head = {}; 2835 2836 len = MIN(remaining, RDMA_SEND_INCREMENT); 2837 remaining -= len; 2838 2839 head.len = len; 2840 head.type = RDMA_CONTROL_QEMU_FILE; 2841 2842 ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); 2843 2844 if (ret < 0) { 2845 rdma->error_state = ret; 2846 error_setg(errp, "qemu_rdma_exchange_send returned %d", ret); 2847 return -1; 2848 } 2849 2850 data += len; 2851 done += len; 2852 } 2853 } 2854 2855 return done; 2856 } 2857 2858 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2859 size_t size, int idx) 2860 { 2861 size_t len = 0; 2862 2863 if (rdma->wr_data[idx].control_len) { 2864 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2865 2866 len = MIN(size, rdma->wr_data[idx].control_len); 2867 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2868 rdma->wr_data[idx].control_curr += len; 2869 rdma->wr_data[idx].control_len -= len; 2870 } 2871 2872 return len; 2873 } 2874 2875 /* 2876 * QEMUFile interface to the control channel. 2877 * RDMA links don't use bytestreams, so we have to 2878 * return bytes to QEMUFile opportunistically. 2879 */ 2880 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, 2881 const struct iovec *iov, 2882 size_t niov, 2883 int **fds, 2884 size_t *nfds, 2885 int flags, 2886 Error **errp) 2887 { 2888 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2889 RDMAContext *rdma; 2890 RDMAControlHeader head; 2891 int ret = 0; 2892 ssize_t i; 2893 size_t done = 0; 2894 2895 RCU_READ_LOCK_GUARD(); 2896 rdma = qatomic_rcu_read(&rioc->rdmain); 2897 2898 if (!rdma) { 2899 error_setg(errp, "RDMA control channel input is not set"); 2900 return -1; 2901 } 2902 2903 CHECK_ERROR_STATE(); 2904 2905 for (i = 0; i < niov; i++) { 2906 size_t want = iov[i].iov_len; 2907 uint8_t *data = (void *)iov[i].iov_base; 2908 2909 /* 2910 * First, we hold on to the last SEND message we 2911 * were given and dish out the bytes until we run 2912 * out of bytes. 2913 */ 2914 ret = qemu_rdma_fill(rdma, data, want, 0); 2915 done += ret; 2916 want -= ret; 2917 /* Got what we needed, so go to next iovec */ 2918 if (want == 0) { 2919 continue; 2920 } 2921 2922 /* If we got any data so far, then don't wait 2923 * for more, just return what we have */ 2924 if (done > 0) { 2925 break; 2926 } 2927 2928 2929 /* We've got nothing at all, so lets wait for 2930 * more to arrive 2931 */ 2932 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); 2933 2934 if (ret < 0) { 2935 rdma->error_state = ret; 2936 error_setg(errp, "qemu_rdma_exchange_recv returned %d", ret); 2937 return -1; 2938 } 2939 2940 /* 2941 * SEND was received with new bytes, now try again. 2942 */ 2943 ret = qemu_rdma_fill(rdma, data, want, 0); 2944 done += ret; 2945 want -= ret; 2946 2947 /* Still didn't get enough, so lets just return */ 2948 if (want) { 2949 if (done == 0) { 2950 return QIO_CHANNEL_ERR_BLOCK; 2951 } else { 2952 break; 2953 } 2954 } 2955 } 2956 return done; 2957 } 2958 2959 /* 2960 * Block until all the outstanding chunks have been delivered by the hardware. 2961 */ 2962 static int qemu_rdma_drain_cq(RDMAContext *rdma) 2963 { 2964 int ret; 2965 2966 if (qemu_rdma_write_flush(rdma) < 0) { 2967 return -EIO; 2968 } 2969 2970 while (rdma->nb_sent) { 2971 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2972 if (ret < 0) { 2973 error_report("rdma migration: complete polling error!"); 2974 return -EIO; 2975 } 2976 } 2977 2978 qemu_rdma_unregister_waiting(rdma); 2979 2980 return 0; 2981 } 2982 2983 2984 static int qio_channel_rdma_set_blocking(QIOChannel *ioc, 2985 bool blocking, 2986 Error **errp) 2987 { 2988 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2989 /* XXX we should make readv/writev actually honour this :-) */ 2990 rioc->blocking = blocking; 2991 return 0; 2992 } 2993 2994 2995 typedef struct QIOChannelRDMASource QIOChannelRDMASource; 2996 struct QIOChannelRDMASource { 2997 GSource parent; 2998 QIOChannelRDMA *rioc; 2999 GIOCondition condition; 3000 }; 3001 3002 static gboolean 3003 qio_channel_rdma_source_prepare(GSource *source, 3004 gint *timeout) 3005 { 3006 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 3007 RDMAContext *rdma; 3008 GIOCondition cond = 0; 3009 *timeout = -1; 3010 3011 RCU_READ_LOCK_GUARD(); 3012 if (rsource->condition == G_IO_IN) { 3013 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 3014 } else { 3015 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 3016 } 3017 3018 if (!rdma) { 3019 error_report("RDMAContext is NULL when prepare Gsource"); 3020 return FALSE; 3021 } 3022 3023 if (rdma->wr_data[0].control_len) { 3024 cond |= G_IO_IN; 3025 } 3026 cond |= G_IO_OUT; 3027 3028 return cond & rsource->condition; 3029 } 3030 3031 static gboolean 3032 qio_channel_rdma_source_check(GSource *source) 3033 { 3034 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 3035 RDMAContext *rdma; 3036 GIOCondition cond = 0; 3037 3038 RCU_READ_LOCK_GUARD(); 3039 if (rsource->condition == G_IO_IN) { 3040 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 3041 } else { 3042 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 3043 } 3044 3045 if (!rdma) { 3046 error_report("RDMAContext is NULL when check Gsource"); 3047 return FALSE; 3048 } 3049 3050 if (rdma->wr_data[0].control_len) { 3051 cond |= G_IO_IN; 3052 } 3053 cond |= G_IO_OUT; 3054 3055 return cond & rsource->condition; 3056 } 3057 3058 static gboolean 3059 qio_channel_rdma_source_dispatch(GSource *source, 3060 GSourceFunc callback, 3061 gpointer user_data) 3062 { 3063 QIOChannelFunc func = (QIOChannelFunc)callback; 3064 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 3065 RDMAContext *rdma; 3066 GIOCondition cond = 0; 3067 3068 RCU_READ_LOCK_GUARD(); 3069 if (rsource->condition == G_IO_IN) { 3070 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 3071 } else { 3072 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 3073 } 3074 3075 if (!rdma) { 3076 error_report("RDMAContext is NULL when dispatch Gsource"); 3077 return FALSE; 3078 } 3079 3080 if (rdma->wr_data[0].control_len) { 3081 cond |= G_IO_IN; 3082 } 3083 cond |= G_IO_OUT; 3084 3085 return (*func)(QIO_CHANNEL(rsource->rioc), 3086 (cond & rsource->condition), 3087 user_data); 3088 } 3089 3090 static void 3091 qio_channel_rdma_source_finalize(GSource *source) 3092 { 3093 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source; 3094 3095 object_unref(OBJECT(ssource->rioc)); 3096 } 3097 3098 GSourceFuncs qio_channel_rdma_source_funcs = { 3099 qio_channel_rdma_source_prepare, 3100 qio_channel_rdma_source_check, 3101 qio_channel_rdma_source_dispatch, 3102 qio_channel_rdma_source_finalize 3103 }; 3104 3105 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, 3106 GIOCondition condition) 3107 { 3108 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3109 QIOChannelRDMASource *ssource; 3110 GSource *source; 3111 3112 source = g_source_new(&qio_channel_rdma_source_funcs, 3113 sizeof(QIOChannelRDMASource)); 3114 ssource = (QIOChannelRDMASource *)source; 3115 3116 ssource->rioc = rioc; 3117 object_ref(OBJECT(rioc)); 3118 3119 ssource->condition = condition; 3120 3121 return source; 3122 } 3123 3124 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, 3125 AioContext *read_ctx, 3126 IOHandler *io_read, 3127 AioContext *write_ctx, 3128 IOHandler *io_write, 3129 void *opaque) 3130 { 3131 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3132 if (io_read) { 3133 aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd, 3134 io_read, io_write, NULL, NULL, opaque); 3135 aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd, 3136 io_read, io_write, NULL, NULL, opaque); 3137 } else { 3138 aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd, 3139 io_read, io_write, NULL, NULL, opaque); 3140 aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd, 3141 io_read, io_write, NULL, NULL, opaque); 3142 } 3143 } 3144 3145 struct rdma_close_rcu { 3146 struct rcu_head rcu; 3147 RDMAContext *rdmain; 3148 RDMAContext *rdmaout; 3149 }; 3150 3151 /* callback from qio_channel_rdma_close via call_rcu */ 3152 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu) 3153 { 3154 if (rcu->rdmain) { 3155 qemu_rdma_cleanup(rcu->rdmain); 3156 } 3157 3158 if (rcu->rdmaout) { 3159 qemu_rdma_cleanup(rcu->rdmaout); 3160 } 3161 3162 g_free(rcu->rdmain); 3163 g_free(rcu->rdmaout); 3164 g_free(rcu); 3165 } 3166 3167 static int qio_channel_rdma_close(QIOChannel *ioc, 3168 Error **errp) 3169 { 3170 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3171 RDMAContext *rdmain, *rdmaout; 3172 struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1); 3173 3174 trace_qemu_rdma_close(); 3175 3176 rdmain = rioc->rdmain; 3177 if (rdmain) { 3178 qatomic_rcu_set(&rioc->rdmain, NULL); 3179 } 3180 3181 rdmaout = rioc->rdmaout; 3182 if (rdmaout) { 3183 qatomic_rcu_set(&rioc->rdmaout, NULL); 3184 } 3185 3186 rcu->rdmain = rdmain; 3187 rcu->rdmaout = rdmaout; 3188 call_rcu(rcu, qio_channel_rdma_close_rcu, rcu); 3189 3190 return 0; 3191 } 3192 3193 static int 3194 qio_channel_rdma_shutdown(QIOChannel *ioc, 3195 QIOChannelShutdown how, 3196 Error **errp) 3197 { 3198 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 3199 RDMAContext *rdmain, *rdmaout; 3200 3201 RCU_READ_LOCK_GUARD(); 3202 3203 rdmain = qatomic_rcu_read(&rioc->rdmain); 3204 rdmaout = qatomic_rcu_read(&rioc->rdmain); 3205 3206 switch (how) { 3207 case QIO_CHANNEL_SHUTDOWN_READ: 3208 if (rdmain) { 3209 rdmain->error_state = -1; 3210 } 3211 break; 3212 case QIO_CHANNEL_SHUTDOWN_WRITE: 3213 if (rdmaout) { 3214 rdmaout->error_state = -1; 3215 } 3216 break; 3217 case QIO_CHANNEL_SHUTDOWN_BOTH: 3218 default: 3219 if (rdmain) { 3220 rdmain->error_state = -1; 3221 } 3222 if (rdmaout) { 3223 rdmaout->error_state = -1; 3224 } 3225 break; 3226 } 3227 3228 return 0; 3229 } 3230 3231 /* 3232 * Parameters: 3233 * @offset == 0 : 3234 * This means that 'block_offset' is a full virtual address that does not 3235 * belong to a RAMBlock of the virtual machine and instead 3236 * represents a private malloc'd memory area that the caller wishes to 3237 * transfer. 3238 * 3239 * @offset != 0 : 3240 * Offset is an offset to be added to block_offset and used 3241 * to also lookup the corresponding RAMBlock. 3242 * 3243 * @size : Number of bytes to transfer 3244 * 3245 * @pages_sent : User-specificed pointer to indicate how many pages were 3246 * sent. Usually, this will not be more than a few bytes of 3247 * the protocol because most transfers are sent asynchronously. 3248 */ 3249 static int qemu_rdma_save_page(QEMUFile *f, ram_addr_t block_offset, 3250 ram_addr_t offset, size_t size) 3251 { 3252 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3253 RDMAContext *rdma; 3254 int ret; 3255 3256 if (migration_in_postcopy()) { 3257 return RAM_SAVE_CONTROL_NOT_SUPP; 3258 } 3259 3260 RCU_READ_LOCK_GUARD(); 3261 rdma = qatomic_rcu_read(&rioc->rdmaout); 3262 3263 if (!rdma) { 3264 return -EIO; 3265 } 3266 3267 CHECK_ERROR_STATE(); 3268 3269 qemu_fflush(f); 3270 3271 /* 3272 * Add this page to the current 'chunk'. If the chunk 3273 * is full, or the page doesn't belong to the current chunk, 3274 * an actual RDMA write will occur and a new chunk will be formed. 3275 */ 3276 ret = qemu_rdma_write(rdma, block_offset, offset, size); 3277 if (ret < 0) { 3278 error_report("rdma migration: write error! %d", ret); 3279 goto err; 3280 } 3281 3282 /* 3283 * Drain the Completion Queue if possible, but do not block, 3284 * just poll. 3285 * 3286 * If nothing to poll, the end of the iteration will do this 3287 * again to make sure we don't overflow the request queue. 3288 */ 3289 while (1) { 3290 uint64_t wr_id, wr_id_in; 3291 ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL); 3292 3293 if (ret < 0) { 3294 error_report("rdma migration: polling error! %d", ret); 3295 goto err; 3296 } 3297 3298 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 3299 3300 if (wr_id == RDMA_WRID_NONE) { 3301 break; 3302 } 3303 } 3304 3305 while (1) { 3306 uint64_t wr_id, wr_id_in; 3307 ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL); 3308 3309 if (ret < 0) { 3310 error_report("rdma migration: polling error! %d", ret); 3311 goto err; 3312 } 3313 3314 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 3315 3316 if (wr_id == RDMA_WRID_NONE) { 3317 break; 3318 } 3319 } 3320 3321 return RAM_SAVE_CONTROL_DELAYED; 3322 err: 3323 rdma->error_state = ret; 3324 return ret; 3325 } 3326 3327 static void rdma_accept_incoming_migration(void *opaque); 3328 3329 static void rdma_cm_poll_handler(void *opaque) 3330 { 3331 RDMAContext *rdma = opaque; 3332 int ret; 3333 struct rdma_cm_event *cm_event; 3334 MigrationIncomingState *mis = migration_incoming_get_current(); 3335 3336 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3337 if (ret) { 3338 error_report("get_cm_event failed %d", errno); 3339 return; 3340 } 3341 3342 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 3343 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 3344 if (!rdma->error_state && 3345 migration_incoming_get_current()->state != 3346 MIGRATION_STATUS_COMPLETED) { 3347 error_report("receive cm event, cm event is %d", cm_event->event); 3348 rdma->error_state = -EPIPE; 3349 if (rdma->return_path) { 3350 rdma->return_path->error_state = -EPIPE; 3351 } 3352 } 3353 rdma_ack_cm_event(cm_event); 3354 if (mis->loadvm_co) { 3355 qemu_coroutine_enter(mis->loadvm_co); 3356 } 3357 return; 3358 } 3359 rdma_ack_cm_event(cm_event); 3360 } 3361 3362 static int qemu_rdma_accept(RDMAContext *rdma) 3363 { 3364 RDMACapabilities cap; 3365 struct rdma_conn_param conn_param = { 3366 .responder_resources = 2, 3367 .private_data = &cap, 3368 .private_data_len = sizeof(cap), 3369 }; 3370 RDMAContext *rdma_return_path = NULL; 3371 struct rdma_cm_event *cm_event; 3372 struct ibv_context *verbs; 3373 int ret = -EINVAL; 3374 int idx; 3375 3376 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3377 if (ret) { 3378 goto err_rdma_dest_wait; 3379 } 3380 3381 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 3382 rdma_ack_cm_event(cm_event); 3383 goto err_rdma_dest_wait; 3384 } 3385 3386 /* 3387 * initialize the RDMAContext for return path for postcopy after first 3388 * connection request reached. 3389 */ 3390 if ((migrate_postcopy() || migrate_return_path()) 3391 && !rdma->is_return_path) { 3392 rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL); 3393 if (rdma_return_path == NULL) { 3394 rdma_ack_cm_event(cm_event); 3395 goto err_rdma_dest_wait; 3396 } 3397 3398 qemu_rdma_return_path_dest_init(rdma_return_path, rdma); 3399 } 3400 3401 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 3402 3403 network_to_caps(&cap); 3404 3405 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 3406 error_report("Unknown source RDMA version: %d, bailing...", 3407 cap.version); 3408 rdma_ack_cm_event(cm_event); 3409 goto err_rdma_dest_wait; 3410 } 3411 3412 /* 3413 * Respond with only the capabilities this version of QEMU knows about. 3414 */ 3415 cap.flags &= known_capabilities; 3416 3417 /* 3418 * Enable the ones that we do know about. 3419 * Add other checks here as new ones are introduced. 3420 */ 3421 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 3422 rdma->pin_all = true; 3423 } 3424 3425 rdma->cm_id = cm_event->id; 3426 verbs = cm_event->id->verbs; 3427 3428 rdma_ack_cm_event(cm_event); 3429 3430 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 3431 3432 caps_to_network(&cap); 3433 3434 trace_qemu_rdma_accept_pin_verbsc(verbs); 3435 3436 if (!rdma->verbs) { 3437 rdma->verbs = verbs; 3438 } else if (rdma->verbs != verbs) { 3439 error_report("ibv context not matching %p, %p!", rdma->verbs, 3440 verbs); 3441 goto err_rdma_dest_wait; 3442 } 3443 3444 qemu_rdma_dump_id("dest_init", verbs); 3445 3446 ret = qemu_rdma_alloc_pd_cq(rdma); 3447 if (ret) { 3448 error_report("rdma migration: error allocating pd and cq!"); 3449 goto err_rdma_dest_wait; 3450 } 3451 3452 ret = qemu_rdma_alloc_qp(rdma); 3453 if (ret) { 3454 error_report("rdma migration: error allocating qp!"); 3455 goto err_rdma_dest_wait; 3456 } 3457 3458 ret = qemu_rdma_init_ram_blocks(rdma); 3459 if (ret) { 3460 error_report("rdma migration: error initializing ram blocks!"); 3461 goto err_rdma_dest_wait; 3462 } 3463 3464 for (idx = 0; idx < RDMA_WRID_MAX; idx++) { 3465 ret = qemu_rdma_reg_control(rdma, idx); 3466 if (ret) { 3467 error_report("rdma: error registering %d control", idx); 3468 goto err_rdma_dest_wait; 3469 } 3470 } 3471 3472 /* Accept the second connection request for return path */ 3473 if ((migrate_postcopy() || migrate_return_path()) 3474 && !rdma->is_return_path) { 3475 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3476 NULL, 3477 (void *)(intptr_t)rdma->return_path); 3478 } else { 3479 qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, 3480 NULL, rdma); 3481 } 3482 3483 ret = rdma_accept(rdma->cm_id, &conn_param); 3484 if (ret) { 3485 error_report("rdma_accept returns %d", ret); 3486 goto err_rdma_dest_wait; 3487 } 3488 3489 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3490 if (ret) { 3491 error_report("rdma_accept get_cm_event failed %d", ret); 3492 goto err_rdma_dest_wait; 3493 } 3494 3495 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 3496 error_report("rdma_accept not event established"); 3497 rdma_ack_cm_event(cm_event); 3498 goto err_rdma_dest_wait; 3499 } 3500 3501 rdma_ack_cm_event(cm_event); 3502 rdma->connected = true; 3503 3504 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY); 3505 if (ret) { 3506 error_report("rdma migration: error posting second control recv"); 3507 goto err_rdma_dest_wait; 3508 } 3509 3510 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 3511 3512 return 0; 3513 3514 err_rdma_dest_wait: 3515 rdma->error_state = ret; 3516 qemu_rdma_cleanup(rdma); 3517 g_free(rdma_return_path); 3518 return ret; 3519 } 3520 3521 static int dest_ram_sort_func(const void *a, const void *b) 3522 { 3523 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index; 3524 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index; 3525 3526 return (a_index < b_index) ? -1 : (a_index != b_index); 3527 } 3528 3529 /* 3530 * During each iteration of the migration, we listen for instructions 3531 * by the source VM to perform dynamic page registrations before they 3532 * can perform RDMA operations. 3533 * 3534 * We respond with the 'rkey'. 3535 * 3536 * Keep doing this until the source tells us to stop. 3537 */ 3538 static int qemu_rdma_registration_handle(QEMUFile *f) 3539 { 3540 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 3541 .type = RDMA_CONTROL_REGISTER_RESULT, 3542 .repeat = 0, 3543 }; 3544 RDMAControlHeader unreg_resp = { .len = 0, 3545 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 3546 .repeat = 0, 3547 }; 3548 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 3549 .repeat = 1 }; 3550 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3551 RDMAContext *rdma; 3552 RDMALocalBlocks *local; 3553 RDMAControlHeader head; 3554 RDMARegister *reg, *registers; 3555 RDMACompress *comp; 3556 RDMARegisterResult *reg_result; 3557 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 3558 RDMALocalBlock *block; 3559 void *host_addr; 3560 int ret = 0; 3561 int idx = 0; 3562 int count = 0; 3563 int i = 0; 3564 3565 RCU_READ_LOCK_GUARD(); 3566 rdma = qatomic_rcu_read(&rioc->rdmain); 3567 3568 if (!rdma) { 3569 return -EIO; 3570 } 3571 3572 CHECK_ERROR_STATE(); 3573 3574 local = &rdma->local_ram_blocks; 3575 do { 3576 trace_qemu_rdma_registration_handle_wait(); 3577 3578 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE); 3579 3580 if (ret < 0) { 3581 break; 3582 } 3583 3584 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 3585 error_report("rdma: Too many requests in this message (%d)." 3586 "Bailing.", head.repeat); 3587 ret = -EIO; 3588 break; 3589 } 3590 3591 switch (head.type) { 3592 case RDMA_CONTROL_COMPRESS: 3593 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 3594 network_to_compress(comp); 3595 3596 trace_qemu_rdma_registration_handle_compress(comp->length, 3597 comp->block_idx, 3598 comp->offset); 3599 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) { 3600 error_report("rdma: 'compress' bad block index %u (vs %d)", 3601 (unsigned int)comp->block_idx, 3602 rdma->local_ram_blocks.nb_blocks); 3603 ret = -EIO; 3604 goto out; 3605 } 3606 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 3607 3608 host_addr = block->local_host_addr + 3609 (comp->offset - block->offset); 3610 3611 ram_handle_compressed(host_addr, comp->value, comp->length); 3612 break; 3613 3614 case RDMA_CONTROL_REGISTER_FINISHED: 3615 trace_qemu_rdma_registration_handle_finished(); 3616 goto out; 3617 3618 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 3619 trace_qemu_rdma_registration_handle_ram_blocks(); 3620 3621 /* Sort our local RAM Block list so it's the same as the source, 3622 * we can do this since we've filled in a src_index in the list 3623 * as we received the RAMBlock list earlier. 3624 */ 3625 qsort(rdma->local_ram_blocks.block, 3626 rdma->local_ram_blocks.nb_blocks, 3627 sizeof(RDMALocalBlock), dest_ram_sort_func); 3628 for (i = 0; i < local->nb_blocks; i++) { 3629 local->block[i].index = i; 3630 } 3631 3632 if (rdma->pin_all) { 3633 ret = qemu_rdma_reg_whole_ram_blocks(rdma); 3634 if (ret) { 3635 error_report("rdma migration: error dest " 3636 "registering ram blocks"); 3637 goto out; 3638 } 3639 } 3640 3641 /* 3642 * Dest uses this to prepare to transmit the RAMBlock descriptions 3643 * to the source VM after connection setup. 3644 * Both sides use the "remote" structure to communicate and update 3645 * their "local" descriptions with what was sent. 3646 */ 3647 for (i = 0; i < local->nb_blocks; i++) { 3648 rdma->dest_blocks[i].remote_host_addr = 3649 (uintptr_t)(local->block[i].local_host_addr); 3650 3651 if (rdma->pin_all) { 3652 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 3653 } 3654 3655 rdma->dest_blocks[i].offset = local->block[i].offset; 3656 rdma->dest_blocks[i].length = local->block[i].length; 3657 3658 dest_block_to_network(&rdma->dest_blocks[i]); 3659 trace_qemu_rdma_registration_handle_ram_blocks_loop( 3660 local->block[i].block_name, 3661 local->block[i].offset, 3662 local->block[i].length, 3663 local->block[i].local_host_addr, 3664 local->block[i].src_index); 3665 } 3666 3667 blocks.len = rdma->local_ram_blocks.nb_blocks 3668 * sizeof(RDMADestBlock); 3669 3670 3671 ret = qemu_rdma_post_send_control(rdma, 3672 (uint8_t *) rdma->dest_blocks, &blocks); 3673 3674 if (ret < 0) { 3675 error_report("rdma migration: error sending remote info"); 3676 goto out; 3677 } 3678 3679 break; 3680 case RDMA_CONTROL_REGISTER_REQUEST: 3681 trace_qemu_rdma_registration_handle_register(head.repeat); 3682 3683 reg_resp.repeat = head.repeat; 3684 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3685 3686 for (count = 0; count < head.repeat; count++) { 3687 uint64_t chunk; 3688 uint8_t *chunk_start, *chunk_end; 3689 3690 reg = ®isters[count]; 3691 network_to_register(reg); 3692 3693 reg_result = &results[count]; 3694 3695 trace_qemu_rdma_registration_handle_register_loop(count, 3696 reg->current_index, reg->key.current_addr, reg->chunks); 3697 3698 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) { 3699 error_report("rdma: 'register' bad block index %u (vs %d)", 3700 (unsigned int)reg->current_index, 3701 rdma->local_ram_blocks.nb_blocks); 3702 ret = -ENOENT; 3703 goto out; 3704 } 3705 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3706 if (block->is_ram_block) { 3707 if (block->offset > reg->key.current_addr) { 3708 error_report("rdma: bad register address for block %s" 3709 " offset: %" PRIx64 " current_addr: %" PRIx64, 3710 block->block_name, block->offset, 3711 reg->key.current_addr); 3712 ret = -ERANGE; 3713 goto out; 3714 } 3715 host_addr = (block->local_host_addr + 3716 (reg->key.current_addr - block->offset)); 3717 chunk = ram_chunk_index(block->local_host_addr, 3718 (uint8_t *) host_addr); 3719 } else { 3720 chunk = reg->key.chunk; 3721 host_addr = block->local_host_addr + 3722 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3723 /* Check for particularly bad chunk value */ 3724 if (host_addr < (void *)block->local_host_addr) { 3725 error_report("rdma: bad chunk for block %s" 3726 " chunk: %" PRIx64, 3727 block->block_name, reg->key.chunk); 3728 ret = -ERANGE; 3729 goto out; 3730 } 3731 } 3732 chunk_start = ram_chunk_start(block, chunk); 3733 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3734 /* avoid "-Waddress-of-packed-member" warning */ 3735 uint32_t tmp_rkey = 0; 3736 if (qemu_rdma_register_and_get_keys(rdma, block, 3737 (uintptr_t)host_addr, NULL, &tmp_rkey, 3738 chunk, chunk_start, chunk_end)) { 3739 error_report("cannot get rkey"); 3740 ret = -EINVAL; 3741 goto out; 3742 } 3743 reg_result->rkey = tmp_rkey; 3744 3745 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3746 3747 trace_qemu_rdma_registration_handle_register_rkey( 3748 reg_result->rkey); 3749 3750 result_to_network(reg_result); 3751 } 3752 3753 ret = qemu_rdma_post_send_control(rdma, 3754 (uint8_t *) results, ®_resp); 3755 3756 if (ret < 0) { 3757 error_report("Failed to send control buffer"); 3758 goto out; 3759 } 3760 break; 3761 case RDMA_CONTROL_UNREGISTER_REQUEST: 3762 trace_qemu_rdma_registration_handle_unregister(head.repeat); 3763 unreg_resp.repeat = head.repeat; 3764 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3765 3766 for (count = 0; count < head.repeat; count++) { 3767 reg = ®isters[count]; 3768 network_to_register(reg); 3769 3770 trace_qemu_rdma_registration_handle_unregister_loop(count, 3771 reg->current_index, reg->key.chunk); 3772 3773 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3774 3775 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3776 block->pmr[reg->key.chunk] = NULL; 3777 3778 if (ret != 0) { 3779 perror("rdma unregistration chunk failed"); 3780 ret = -ret; 3781 goto out; 3782 } 3783 3784 rdma->total_registrations--; 3785 3786 trace_qemu_rdma_registration_handle_unregister_success( 3787 reg->key.chunk); 3788 } 3789 3790 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp); 3791 3792 if (ret < 0) { 3793 error_report("Failed to send control buffer"); 3794 goto out; 3795 } 3796 break; 3797 case RDMA_CONTROL_REGISTER_RESULT: 3798 error_report("Invalid RESULT message at dest."); 3799 ret = -EIO; 3800 goto out; 3801 default: 3802 error_report("Unknown control message %s", control_desc(head.type)); 3803 ret = -EIO; 3804 goto out; 3805 } 3806 } while (1); 3807 out: 3808 if (ret < 0) { 3809 rdma->error_state = ret; 3810 } 3811 return ret; 3812 } 3813 3814 /* Destination: 3815 * Called via a ram_control_load_hook during the initial RAM load section which 3816 * lists the RAMBlocks by name. This lets us know the order of the RAMBlocks 3817 * on the source. 3818 * We've already built our local RAMBlock list, but not yet sent the list to 3819 * the source. 3820 */ 3821 static int 3822 rdma_block_notification_handle(QEMUFile *f, const char *name) 3823 { 3824 RDMAContext *rdma; 3825 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3826 int curr; 3827 int found = -1; 3828 3829 RCU_READ_LOCK_GUARD(); 3830 rdma = qatomic_rcu_read(&rioc->rdmain); 3831 3832 if (!rdma) { 3833 return -EIO; 3834 } 3835 3836 /* Find the matching RAMBlock in our local list */ 3837 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { 3838 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { 3839 found = curr; 3840 break; 3841 } 3842 } 3843 3844 if (found == -1) { 3845 error_report("RAMBlock '%s' not found on destination", name); 3846 return -ENOENT; 3847 } 3848 3849 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index; 3850 trace_rdma_block_notification_handle(name, rdma->next_src_index); 3851 rdma->next_src_index++; 3852 3853 return 0; 3854 } 3855 3856 static int rdma_load_hook(QEMUFile *f, uint64_t flags, void *data) 3857 { 3858 switch (flags) { 3859 case RAM_CONTROL_BLOCK_REG: 3860 return rdma_block_notification_handle(f, data); 3861 3862 case RAM_CONTROL_HOOK: 3863 return qemu_rdma_registration_handle(f); 3864 3865 default: 3866 /* Shouldn't be called with any other values */ 3867 abort(); 3868 } 3869 } 3870 3871 static int qemu_rdma_registration_start(QEMUFile *f, 3872 uint64_t flags, void *data) 3873 { 3874 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3875 RDMAContext *rdma; 3876 3877 if (migration_in_postcopy()) { 3878 return 0; 3879 } 3880 3881 RCU_READ_LOCK_GUARD(); 3882 rdma = qatomic_rcu_read(&rioc->rdmaout); 3883 if (!rdma) { 3884 return -EIO; 3885 } 3886 3887 CHECK_ERROR_STATE(); 3888 3889 trace_qemu_rdma_registration_start(flags); 3890 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3891 qemu_fflush(f); 3892 3893 return 0; 3894 } 3895 3896 /* 3897 * Inform dest that dynamic registrations are done for now. 3898 * First, flush writes, if any. 3899 */ 3900 static int qemu_rdma_registration_stop(QEMUFile *f, 3901 uint64_t flags, void *data) 3902 { 3903 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3904 RDMAContext *rdma; 3905 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3906 int ret = 0; 3907 3908 if (migration_in_postcopy()) { 3909 return 0; 3910 } 3911 3912 RCU_READ_LOCK_GUARD(); 3913 rdma = qatomic_rcu_read(&rioc->rdmaout); 3914 if (!rdma) { 3915 return -EIO; 3916 } 3917 3918 CHECK_ERROR_STATE(); 3919 3920 qemu_fflush(f); 3921 ret = qemu_rdma_drain_cq(rdma); 3922 3923 if (ret < 0) { 3924 goto err; 3925 } 3926 3927 if (flags == RAM_CONTROL_SETUP) { 3928 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3929 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3930 int reg_result_idx, i, nb_dest_blocks; 3931 3932 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3933 trace_qemu_rdma_registration_stop_ram(); 3934 3935 /* 3936 * Make sure that we parallelize the pinning on both sides. 3937 * For very large guests, doing this serially takes a really 3938 * long time, so we have to 'interleave' the pinning locally 3939 * with the control messages by performing the pinning on this 3940 * side before we receive the control response from the other 3941 * side that the pinning has completed. 3942 */ 3943 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3944 ®_result_idx, rdma->pin_all ? 3945 qemu_rdma_reg_whole_ram_blocks : NULL); 3946 if (ret < 0) { 3947 fprintf(stderr, "receiving remote info!"); 3948 return ret; 3949 } 3950 3951 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3952 3953 /* 3954 * The protocol uses two different sets of rkeys (mutually exclusive): 3955 * 1. One key to represent the virtual address of the entire ram block. 3956 * (dynamic chunk registration disabled - pin everything with one rkey.) 3957 * 2. One to represent individual chunks within a ram block. 3958 * (dynamic chunk registration enabled - pin individual chunks.) 3959 * 3960 * Once the capability is successfully negotiated, the destination transmits 3961 * the keys to use (or sends them later) including the virtual addresses 3962 * and then propagates the remote ram block descriptions to his local copy. 3963 */ 3964 3965 if (local->nb_blocks != nb_dest_blocks) { 3966 fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) " 3967 "Your QEMU command line parameters are probably " 3968 "not identical on both the source and destination.", 3969 local->nb_blocks, nb_dest_blocks); 3970 rdma->error_state = -EINVAL; 3971 return -EINVAL; 3972 } 3973 3974 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 3975 memcpy(rdma->dest_blocks, 3976 rdma->wr_data[reg_result_idx].control_curr, resp.len); 3977 for (i = 0; i < nb_dest_blocks; i++) { 3978 network_to_dest_block(&rdma->dest_blocks[i]); 3979 3980 /* We require that the blocks are in the same order */ 3981 if (rdma->dest_blocks[i].length != local->block[i].length) { 3982 fprintf(stderr, "Block %s/%d has a different length %" PRIu64 3983 "vs %" PRIu64, local->block[i].block_name, i, 3984 local->block[i].length, 3985 rdma->dest_blocks[i].length); 3986 rdma->error_state = -EINVAL; 3987 return -EINVAL; 3988 } 3989 local->block[i].remote_host_addr = 3990 rdma->dest_blocks[i].remote_host_addr; 3991 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; 3992 } 3993 } 3994 3995 trace_qemu_rdma_registration_stop(flags); 3996 3997 head.type = RDMA_CONTROL_REGISTER_FINISHED; 3998 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL); 3999 4000 if (ret < 0) { 4001 goto err; 4002 } 4003 4004 return 0; 4005 err: 4006 rdma->error_state = ret; 4007 return ret; 4008 } 4009 4010 static const QEMUFileHooks rdma_read_hooks = { 4011 .hook_ram_load = rdma_load_hook, 4012 }; 4013 4014 static const QEMUFileHooks rdma_write_hooks = { 4015 .before_ram_iterate = qemu_rdma_registration_start, 4016 .after_ram_iterate = qemu_rdma_registration_stop, 4017 .save_page = qemu_rdma_save_page, 4018 }; 4019 4020 4021 static void qio_channel_rdma_finalize(Object *obj) 4022 { 4023 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); 4024 if (rioc->rdmain) { 4025 qemu_rdma_cleanup(rioc->rdmain); 4026 g_free(rioc->rdmain); 4027 rioc->rdmain = NULL; 4028 } 4029 if (rioc->rdmaout) { 4030 qemu_rdma_cleanup(rioc->rdmaout); 4031 g_free(rioc->rdmaout); 4032 rioc->rdmaout = NULL; 4033 } 4034 } 4035 4036 static void qio_channel_rdma_class_init(ObjectClass *klass, 4037 void *class_data G_GNUC_UNUSED) 4038 { 4039 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 4040 4041 ioc_klass->io_writev = qio_channel_rdma_writev; 4042 ioc_klass->io_readv = qio_channel_rdma_readv; 4043 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; 4044 ioc_klass->io_close = qio_channel_rdma_close; 4045 ioc_klass->io_create_watch = qio_channel_rdma_create_watch; 4046 ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler; 4047 ioc_klass->io_shutdown = qio_channel_rdma_shutdown; 4048 } 4049 4050 static const TypeInfo qio_channel_rdma_info = { 4051 .parent = TYPE_QIO_CHANNEL, 4052 .name = TYPE_QIO_CHANNEL_RDMA, 4053 .instance_size = sizeof(QIOChannelRDMA), 4054 .instance_finalize = qio_channel_rdma_finalize, 4055 .class_init = qio_channel_rdma_class_init, 4056 }; 4057 4058 static void qio_channel_rdma_register_types(void) 4059 { 4060 type_register_static(&qio_channel_rdma_info); 4061 } 4062 4063 type_init(qio_channel_rdma_register_types); 4064 4065 static QEMUFile *rdma_new_input(RDMAContext *rdma) 4066 { 4067 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); 4068 4069 rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc)); 4070 rioc->rdmain = rdma; 4071 rioc->rdmaout = rdma->return_path; 4072 qemu_file_set_hooks(rioc->file, &rdma_read_hooks); 4073 4074 return rioc->file; 4075 } 4076 4077 static QEMUFile *rdma_new_output(RDMAContext *rdma) 4078 { 4079 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); 4080 4081 rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc)); 4082 rioc->rdmaout = rdma; 4083 rioc->rdmain = rdma->return_path; 4084 qemu_file_set_hooks(rioc->file, &rdma_write_hooks); 4085 4086 return rioc->file; 4087 } 4088 4089 static void rdma_accept_incoming_migration(void *opaque) 4090 { 4091 RDMAContext *rdma = opaque; 4092 int ret; 4093 QEMUFile *f; 4094 Error *local_err = NULL; 4095 4096 trace_qemu_rdma_accept_incoming_migration(); 4097 ret = qemu_rdma_accept(rdma); 4098 4099 if (ret) { 4100 fprintf(stderr, "RDMA ERROR: Migration initialization failed\n"); 4101 return; 4102 } 4103 4104 trace_qemu_rdma_accept_incoming_migration_accepted(); 4105 4106 if (rdma->is_return_path) { 4107 return; 4108 } 4109 4110 f = rdma_new_input(rdma); 4111 if (f == NULL) { 4112 fprintf(stderr, "RDMA ERROR: could not open RDMA for input\n"); 4113 qemu_rdma_cleanup(rdma); 4114 return; 4115 } 4116 4117 rdma->migration_started_on_destination = 1; 4118 migration_fd_process_incoming(f, &local_err); 4119 if (local_err) { 4120 error_reportf_err(local_err, "RDMA ERROR:"); 4121 } 4122 } 4123 4124 void rdma_start_incoming_migration(const char *host_port, Error **errp) 4125 { 4126 int ret; 4127 RDMAContext *rdma; 4128 Error *local_err = NULL; 4129 4130 trace_rdma_start_incoming_migration(); 4131 4132 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 4133 if (ram_block_discard_is_required()) { 4134 error_setg(errp, "RDMA: cannot disable RAM discard"); 4135 return; 4136 } 4137 4138 rdma = qemu_rdma_data_init(host_port, &local_err); 4139 if (rdma == NULL) { 4140 goto err; 4141 } 4142 4143 ret = qemu_rdma_dest_init(rdma, &local_err); 4144 4145 if (ret) { 4146 goto err; 4147 } 4148 4149 trace_rdma_start_incoming_migration_after_dest_init(); 4150 4151 ret = rdma_listen(rdma->listen_id, 5); 4152 4153 if (ret) { 4154 ERROR(errp, "listening on socket!"); 4155 goto cleanup_rdma; 4156 } 4157 4158 trace_rdma_start_incoming_migration_after_rdma_listen(); 4159 4160 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 4161 NULL, (void *)(intptr_t)rdma); 4162 return; 4163 4164 cleanup_rdma: 4165 qemu_rdma_cleanup(rdma); 4166 err: 4167 error_propagate(errp, local_err); 4168 if (rdma) { 4169 g_free(rdma->host); 4170 g_free(rdma->host_port); 4171 } 4172 g_free(rdma); 4173 } 4174 4175 void rdma_start_outgoing_migration(void *opaque, 4176 const char *host_port, Error **errp) 4177 { 4178 MigrationState *s = opaque; 4179 RDMAContext *rdma_return_path = NULL; 4180 RDMAContext *rdma; 4181 int ret = 0; 4182 4183 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 4184 if (ram_block_discard_is_required()) { 4185 error_setg(errp, "RDMA: cannot disable RAM discard"); 4186 return; 4187 } 4188 4189 rdma = qemu_rdma_data_init(host_port, errp); 4190 if (rdma == NULL) { 4191 goto err; 4192 } 4193 4194 ret = qemu_rdma_source_init(rdma, migrate_rdma_pin_all(), errp); 4195 4196 if (ret) { 4197 goto err; 4198 } 4199 4200 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 4201 ret = qemu_rdma_connect(rdma, errp, false); 4202 4203 if (ret) { 4204 goto err; 4205 } 4206 4207 /* RDMA postcopy need a separate queue pair for return path */ 4208 if (migrate_postcopy() || migrate_return_path()) { 4209 rdma_return_path = qemu_rdma_data_init(host_port, errp); 4210 4211 if (rdma_return_path == NULL) { 4212 goto return_path_err; 4213 } 4214 4215 ret = qemu_rdma_source_init(rdma_return_path, 4216 migrate_rdma_pin_all(), errp); 4217 4218 if (ret) { 4219 goto return_path_err; 4220 } 4221 4222 ret = qemu_rdma_connect(rdma_return_path, errp, true); 4223 4224 if (ret) { 4225 goto return_path_err; 4226 } 4227 4228 rdma->return_path = rdma_return_path; 4229 rdma_return_path->return_path = rdma; 4230 rdma_return_path->is_return_path = true; 4231 } 4232 4233 trace_rdma_start_outgoing_migration_after_rdma_connect(); 4234 4235 s->to_dst_file = rdma_new_output(rdma); 4236 migrate_fd_connect(s, NULL); 4237 return; 4238 return_path_err: 4239 qemu_rdma_cleanup(rdma); 4240 err: 4241 g_free(rdma); 4242 g_free(rdma_return_path); 4243 } 4244